ステップ 6 (パイプライン)。 データパイプラインの修正を実装する

データパイプライン

次の手順に従ってデータパイプラインを変更し、実行します。

  1. 新しいベクトルインデックスを作成します。

  2. データパイプラインのメタデータを使用してMLflow実行を作成します。

結果として得られる MLflow 実行は、`B_quality_iteration/02_evaluate_fixes`ノートブックによって参照されます。

データパイプラインを変更するには、次の 2 つの方法があります。

  • 一度に 1 つの修正を実装するこのアプローチでは、一度に 1 つのデータパイプラインを構成して実行します。 このモードは、1 つの埋め込みモデルを試し、1 つの新しいパーサーをテストする場合に最適です。 Databricks では、これらのノートブックに慣れるためにここから始めることをお勧めします。

  • 一度に複数の修正を実装するこのアプローチ (スイープとも呼ばれます) では、それぞれ異なる構成を持つ複数のデータパイプラインを並行して実行します。 このモードは、3 つの PDF パーサーを評価したり、多くの異なるチャンク サイズを評価したりするなど、さまざまな戦略を「スイープ」する場合に最適です。

このセクションのサンプル コードについては、GitHub リポジトリを参照してください。

アプローチ 1: 一度に 1 つの修正を実装する

  1. B_quality_iteration/data_pipeline_fixes/single_fix/00_configノートブックを開きます。

  2. 以下のいずれかの指示に従ってください。

  3. 次のいずれかの方法でパイプラインを実行します。

  4. B_quality_iteration/02_evaluate_fixesノートブックのDATA_PIPELINE_FIXES_RUN_NAMES変数に出力される結果の MLflow 実行の名前を追加します。

注:

データ準備パイプラインは、 Spark構造化ストリーミングを使用して、ファイルを段階的にロードして処理します。 これにより、既に読み込まれて準備されたファイルはチェックポイントで追跡され、再処理されません。 新しく追加されたファイルのみがロードされ、準備され、対応するテーブルに追加されます。

したがって、パイプライン全体を最初から再実行し、すべてのドキュメントを再処理する場合は、チェックポイントとテーブルを削除する必要があります。 リセットタイムラインを使用することでこれを実現できます。

アプローチ 2: 複数の修正を一度に実装する

  1. B_quality_iteration/data_pipeline_fixes/multiple_fixes/実行データベースを開きます。

  2. ジャーナルの指示に従って、データパイプラインの 2 つ以上の構成を実行に追加します。

  3. これらのパイプラインを実行するには、ノートブックを実行します。

  4. B_quality_iteration/02_evaluate_fixesノートブックのDATA_PIPELINE_FIXES_RUN_NAMES変数に出力される結果の MLflow 実行の名前を追加します。

虫垂

注:

一度に 1 つの修正を実装するか、複数の修正を実装するかに応じて、以下で参照されるノートブックはsingle_fixディレクトリとmultiple_fixesディレクトリにあります。

構成設定の詳細

データパイプラインに事前に実装されているさまざまな構成オプションを以下に示します。 または、 カスタム パーサー/チャンカーを実装することもできます。

  • vectorsearch_config:ベクトル検索エンドポイント (稼働中である必要があります) と作成するインデックスの名前を指定します。 さらに、ソース テーブルとインデックス間の同期タイプを定義します (デフォルトはTRIGGEREDです)。

  • embedding_config: 使用する埋め込みモデルとトークナイザーを指定します。 オプションの完全なリストについては、`supporting_configs/embedding_models`ノートブックを参照してください。 埋め込みモデルは、実行中のモデルサービング エンドポイントにデプロイする必要があります。 チャンク化戦略に応じて、トークナイザーは分割中にチャンクが埋め込みモデルのトークン制限を超えないようにします。 ここでトークナイザーは、テキスト チャンク内のトークンの数をカウントして、選択した埋め込みモデルの最大コンテキスト長を超えないようにするために使用されます。

以下は、HuggingFaceのトークナイザーを示しています。

    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }

以下は、TikTokenのトークナイザーを示しています。

"embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
  • pipeline_config: ファイル パーサー、チャンカー、およびソース フィールドへのパスを定義します。 パーサーとチャンカーは、それぞれparser_libraryノートブックとchunker_libraryノートブックで定義されています。 これらは、 single_fix ディレクトリと multiple_fixes ディレクトリにあります。 オプションの完全なリストについては、単一修正ディレクトリと複数修正ディレクトリの両方で使用できるsupporting_configs/parser_chunker_strategiesノートブックを参照してください。 異なるパーサーまたはチャンカーでは異なる構成の 引数 が必要になる場合があります。ここで、 <param x>特定のチャンカーに必要な潜在的な 引数 を表します。 パーサーには、同じ形式を使用して構成値を渡すこともできます。

    "chunker": {
        "name": <chunker-name>,
        "config": {
            "<param 1>": "...",
            "<param 2>": "...",
            ...
        }
    }

カスタムパーサー/チャンカーの実装

このプロジェクトは、データ準備パイプラインへのカスタム パーサーまたはチャンカーの追加を容易にするように設計されています。

新しいパーサーを追加する

PyMuPDF ライブラリを使用して新しいパーサーを組み込み、解析されたテキストを Markdown 形式に変換するとします。 以下の手順に従ってください:

  1. 次のコードをsingle_fixまたはmultiple_fixディレクトリのparser_libraryノートブックに追加して、必要な依存関係をインストールします。

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. single_fixまたはmultiple_fixディレクトリのparser_libraryノートブックに、 PyMuPdfMarkdownパーサーの新しいセクションを追加し、解析関数を実装します。 関数の出力がノートブックの先頭で定義されたParserReturnValueクラスに準拠していることを確認します。 これにより、Spark UDF との互換性が確保されます。 tryまたはexceptブロックは、 single_fixまたはmultiple_fixディレクトリの02_parse_docsノートブックでパーサーを UDF として適用するときに、個々のドキュメントのエラーが原因で Spark が解析ジョブ全体を失敗することを防ぎます。 このノートブックは、ドキュメントの解析が失敗したかどうかを確認し、対応する行を隔離して警告を発します。

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    
  3. 新しい解析関数をsingle_fixまたはmultiple_fixディレクトリのparser_libraryノートブックのparser_factoryに追加して、 00_configノートブックのpipeline_configで構成できるようにします。

  4. 02_parse_docsノートブックでは、パーサー関数が Spark Python UDF (Databricks Runtime 14.0 以降用に矢印最適化) に変換され、新しいバイナリ PDF ファイルを含むデータフレームに適用されます。 テストと開発のために、 test-document.pdf ファイルを読み込み、解析が成功したことを確認する簡単なテスト関数を parser_library ノートブック に追加します。

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

新しいチャンカーを追加する

新しいチャンカーを追加するプロセスは、新しいパーサーについて上で説明した手順と同様です。

  1. chunker_libraryノートブックに必要な依存関係を追加します。

  2. チャンカーの新しいセクションを追加し、関数を実装します (例: chunk_parsed_content_newchunkername. 新しい chunker 関数の出力は、 chunker_libraryノートブックの先頭で定義されたChunkerReturnValueクラスに準拠する Python 辞書である必要があります。 この関数は、チャンク化される解析済みテキストの文字列を少なくとも受け入れる必要があります。 チャンカーに追加の要件がある場合は、関数 コンテナーとして追加できます。

  3. chunker_libraryノートブックで定義されたchunker_factory関数に新しいチャンカーを追加します。 関数が追加の引数を受け入れる場合、 functools の partialを使用してそれらを事前構成します。 これが必要なのは、UDF が 1 つの入力 引数 (この場合は解析されたテキスト) のみを受け入れるためです。 chunker_factory使用すると、パイプラインでさまざまなチャンカー メソッドを構成し、 Spark Python UDF ( Databricks Runtime 14.0 以降向けに最適化) を返すことができます。

  4. 新しいチャンク関数の簡単なテストセクションを追加します。 このセクションでは、文字列として提供される定義済みテキストをチャンクする必要があります。

パフォーマンスのチューニング

Spark はパーティションを利用して処理を並列化します。 データは行のチャンクに分割され、各パーティションはデフォルトで 1 つのコアによって処理されます。 ただし、Apache Spark によってデータが最初に読み取られるときに、特に解析およびチャンク化タスクを実行する UDF の場合、目的の計算に最適化されたパーティションが作成されない可能性があります。 効率的な並列化のために十分に小さいパーティションを作成することと、パーティションを管理するオーバーヘッドが利点を上回るほど小さくならないパーティションを作成することのバランスを取ることが重要です。

パーティションの数は、 df.repartitions(<number of partitions>)を使用して調整できます。 UDF を適用するときは、ワーカー ノードで使用可能なコア数の倍数を目指します。 たとえば、 02_parse_docsノートブックでは、 df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)を含めることで、使用可能なワーカー コアの数の 2 倍のパーティションを作成できます。 通常、1 から 3 の倍数で満足のいくパフォーマンスが得られます。

パイプラインを手動で実行する

あるいは、各ノートブックを段階的に実行することもできます。

  1. 01_load_filesノートブックを使用して生のファイルを読み込みます。 これにより、各ドキュメントバイナリが 1 つのレコードとして、destination_tables_configで定義されたブロンズテーブル(raw_files_table_name)に保存されます。ファイルは増分的に読み込まれ、前回の実行以降の新しいドキュメントのみが処理されます。

  2. 02_parse_docsノートブックを使用してドキュメントを解析します。 このノートブックはparser_libraryノートブックを実行し ( Python を再起動する最初のセルとしてこれを必ず実行してください)、さまざまなパーサーと関連ユーティリティを利用できるようにします。 次に、 pipeline_config で指定されたパーサーを使用して、各ドキュメントをプレーンテキストに解析します。 たとえば、元のPDFのページ数や解析されたテキストなどの関連メタデータがキャプチャされます。 正常に解析されたドキュメントはシルバーテーブル(parsed_docs_table_name)に格納され、解析されなかったドキュメントは対応するテーブルに隔離されます。

  3. 03_chunk_docsノートブックを使用して解析されたドキュメントをチャンク化します。 解析と同様に、このノートブックはchunker_libraryノートブックを実行します (ここでも、最初のセルとして実行されます)。 解析された各ドキュメントは、 pipeline_configから指定されたチャンカーを使用して、より小さなチャンクに分割されます。 各チャンクには、ベクトル検索インデックスとの同期に必要な、MD5 ハッシュを使用した一意の ID が割り当てられます。 最後のチャンクはゴールドテーブル( chunked_docs_table_name )にロードされます。

  4. 04_vector_indexを使用して検索インデックスを作成/同期します。 このノートブックは、 vectorsearch_config内の指定されたベクトル検索エンドポイントの準備状況を確認します。 構成されたインデックスがすでに存在する場合は、ゴールド テーブルとの同期を開始します。それ以外の場合は、インデックスを作成して同期をトリガーします。 検索エンドポイントとインデックスがまだ作成されていない場合は、時間がかかることが予想されます。

次のステップ

ステップ 7. デプロイと監視に進みます。