ステップ 6 (パイプライン)。 データパイプラインの修正を実装する

データパイプライン

これらのステップに従って、データパイプラインを変更し、次の目的で実行します。

  1. 新しいベクトルインデックスを作成します。

  2. データパイプラインのメタデータを使用して MLflow 実行を作成します。

結果の MLflow 実行は、 'B_quality_iteration/02_evaluate_fixes' ノートブックによって参照されます。

データパイプラインを変更するには、次の 2 つの方法があります。

  • 一度に 1 つの修正を実装する このアプローチでは、1 つのデータパイプラインを一度に構成して実行します。 このモードは、1 つの埋め込みモデルを試し、1 つの新しいパーサーをテストする場合に最適です。 Databricks では、これらのノートブックに慣れるために、ここから始めることを提案しています。

  • 一度に複数の修正を実装する このアプローチ (スイープとも呼ばれます) では、それぞれ異なる構成を持つ複数のデータ パイプラインを並列に実行します。 このモードは、3 つの PDF パーサーを評価したり、多くの異なるチャンク サイズを評価したりするなど、多くの異なる戦略を "スイープ" する場合に最適です。

このセクションのサンプル コードについては、 GitHub リポジトリ を参照してください。

アプローチ 1: 一度に 1 つの修正を実装する

  1. B_quality_iteration/data_pipeline_fixes/single_fix/00_config ノートブックを開きます

  2. 以下のいずれかの指示に従います。

  3. 次のいずれかの方法でパイプラインを実行します。

  4. B_quality_iteration/02_evaluate_fixes ノートブックの 変数に出力される結果の実行 の名前を追加しますMLflowDATA_PIPELINE_FIXES_RUN_NAMES

注:

データ準備パイプラインでは、 Spark 構造化ストリーミングを使用して、ファイルを段階的に読み込み、処理します。 これは、すでにロードされ準備されたファイルがチェックポイントで追跡され、再処理されないことを意味します。 新しく追加されたファイルのみがロードされ、準備され、対応するテーブルに追加されます。

したがって、 パイプライン全体を最初から再実行 し、すべてのドキュメントを再処理する場合は、チェックポイントとテーブルを削除する必要があります。 これは、 リセット ノートブックを使用して実現できます。

アプローチ 2: 複数の修正を一度に実装する

  1. B_quality_iteration/data_pipeline_fixes/multiple_fixes/実行 ノートブックを開きます。

  2. ノートブックの指示に従って、実行するデータパイプラインの 2 つ以上の構成を追加します。

  3. ノートブックを実行して、これらのパイプラインを実行します。

  4. B_quality_iteration/02_evaluate_fixes ノートブックの DATA_PIPELINE_FIXES_RUN_NAMES 変数に出力される結果の MLflow 実行の名前を追加します。

虫垂

注:

以下で参照するノートブックは、一度に 1 つの修正を実装するか、複数の修正を実装するかに応じて、 single_fix ディレクトリと multiple_fixes ディレクトリにあります。

構成設定の詳細

データパイプラインに事前に実装されたさまざまな設定オプションを以下に示します。 または、 カスタムパーサー/チャンカーを実装することもできます。

  • vectorsearch_config: ベクトル検索 エンドポイント (稼働中である必要があります) と、作成するインデックスの名前を指定します。 さらに、ソース・テーブルとインデックスの間の 同期 タイプを定義します (デフォルトは TRIGGEREDです)。

  • embedding_config: 使用する埋め込みモデルとトークナイザーを指定します。 オプションの完全なリストについては、「 supporting_configs/embedding_models」 ノートブックを参照してください。 埋め込みモデルは、実行中のモデルサービング エンドポイントにデプロイする必要があります。 チャンク戦略によっては、トークナイザーは分割中にもチャンクが埋め込みモデルのトークン制限を超えないようにします。 ここでは、トークナイザーを使用して、テキストチャンク内のトークンの数をカウントし、選択した埋め込みモデルの最大コンテキスト長を超えないようにします。

以下は、HuggingFace のトークナイザーを示しています。

    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }

以下は、TikTokenのトークナイザーを示しています。

"embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
  • pipeline_config: ファイルパーサー、チャンカー、およびソースフィールドへのパスを定義します。 パーサーとチャンカーは、それぞれ parser_library ノートブックと chunker_library ノートブックで定義されています。 これらは、 single_fix ディレクトリと multiple_fixes ディレクトリにあります。 オプションの完全なリストについては、 supporting_configs/parser_chunker_strategies ノートブックを参照してください。ノートブックは、単一フィックス・ディレクトリーと複数修正ディレクトリーの両方で再び使用可能です。 パーサーまたはチャンカーが異なれば、 <param x> 特定のチャンカーに必要な可能性のあるパラメーターを表す、異なる構成パラメーターが必要になる場合があります。 パーサーには、同じ形式を使用して構成値を渡すこともできます。

    "chunker": {
        "name": <chunker-name>,
        "config": {
            "<param 1>": "...",
            "<param 2>": "...",
            ...
        }
    }

カスタムパーサー/チャンカーの実装

このプロジェクトは、データ準備パイプラインへのカスタムパーサーまたはチャンカーの追加を容易にするように構成されています。

新しいパーサーを追加する

PyMuPDF ライブラリを使用して新しいパーサーを組み込み、解析されたテキストを Markdown 形式に変換するとします。以下の手順に従います。

  1. 必要な依存関係をインストールするには、single_fix ディレクトリまたは multiple_fix ディレクトリの parser_library ノートブックに次のコードを追加します。

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. single_fix ディレクトリまたは multiple_fix ディレクトリの parser_library ノートブックで、PyMuPdfMarkdown パーサーの新しいセクションを追加し、解析関数を実装します。関数の出力が、ノートブックの先頭で定義された ParserReturnValue クラスに準拠していることを確認します。 これにより、Spark UDFs との互換性が確保されます。 try ブロックまたは except ブロックは、single_fix または multiple_fix ディレクトリのノートブックでパーサーを UDF として適用するときに、個々のドキュメントのエラーが原因で Spark が解析ジョブ全体が失敗するのを防ぎます02_parse_docs。このノートブックは、ドキュメントの解析が失敗したかどうかを確認し、対応する行を検疫して警告を発生させます。

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    
  3. 新しい解析関数を single_fix ディレクトリまたは multiple_fix ディレクトリの parser_library ノートブックのparser_factoryに追加して、00_config ノートブックのpipeline_configで構成できるようにします。

  4. 02_parse_docs ノートブックでは、パーサー関数は Spark Python UDF (Databricks Runtime 14.0 以降用に矢印で最適化) に変換され、新しいバイナリ PDF ファイルを含むデータフレームに適用されます。テストと開発の場合は、 test-document.pdf ファイルを読み込み、正常な解析をアサートする簡単なテスト関数を parser_library ノートブック に追加します。

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

新しいチャンカーを追加する

新しいチャンカーを追加するプロセスは、新しいパーサーについて上記で説明したのと同様の手順をたどります。

  1. 必要な依存関係を chunker_library ノートブックに追加します。

  2. チャンカーの新しいセクションを追加し、関数 ( chunk_parsed_content_newchunkername. 新しい chunker 関数の出力は、chunker_library ノートブックの先頭で定義された ChunkerReturnValue クラスに準拠した Python ディクショナリである必要があります。この関数は、チャンクする解析されたテキストの少なくとも文字列を受け入れる必要があります。 チャンカーに追加のパラメーターが必要な場合は、それらを関数パラメーターとして追加できます。

  3. 新しいチャンカーを chunker_library ノートブックで定義されている chunker_factory 関数に追加します。関数が追加のパラメーターを受け入れる場合は、 functools の partial を使用してそれらを事前に構成します。 これは、UDF が 1 つの入力パラメーター (この場合は解析されたテキスト) のみを受け入れるために必要です。 このchunker_factoryを使用すると、 パイプライン でさまざまなチャンカーメソッドを設定でき、 Spark Python UDF を返します( Databricks Runtime 14.0以降に最適化されています)。

  4. 新しいチャンク関数の簡単なテストセクションを追加します。 このセクションでは、文字列として提供される定義済みのテキストをチャンクする必要があります。

パフォーマンスのチューニング

Spark はパーティションを使用して処理を並列化します。 データは行のチャンクに分割され、各パーティションはデフォルトで 1 つのコアによって処理されます。 ただし、Apache Spark によってデータが最初に読み取られるとき、特に解析およびチャンクタスクを実行する UDF に対して、目的の計算に最適化されたパーティションが作成されない場合があります。 効率的な並列化に十分な小ささのパーティションを作成することと、パーティションの管理オーバーヘッドがメリットを上回るほど小さくないパーティションを作成することのバランスを取ることが重要です。

パーティションの数は、 df.repartitions(<number of partitions>)を使用して調整できます。 UDF を適用するときは、ワーカー ノードで使用可能なコア数の倍数を目指します。 たとえば、 02_parse_docs ノートブックでは、使用可能なワーカー コアの数の 2 倍のパーティションを作成する df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism) を含めることができます。 通常、1 から 3 の倍数で十分なパフォーマンスが得られます。

パイプラインの手動実行

または、個々のノートブックを段階的に実行することもできます。

  1. 01_load_filesノートブックを使用してRAWファイルをロードします。これにより、各ドキュメント バイナリは、destination_tables_configで定義されたブロンズ テーブル (raw_files_table_name) に 1 つのレコードとして保存されます。ファイルは増分的にロードされ、前回の実行以降の新しいドキュメントのみが処理されます。

  2. 02_parse_docsノートブックでドキュメントを解析します。このノートブックは、 parser_library ノートブックを実行し (Python を再起動する最初のセルとして実行してください)、さまざまなパーサーと関連ユーティリティを利用できるようにします。 次に、 pipeline_config で指定されたパーサーを使用して、各ドキュメントをプレーンテキストに解析します。 例として、解析されたテキストと一緒に元のPDFのページ数などの関連するメタデータがキャプチャされます。 正常に解析されたドキュメントはシルバーテーブル(parsed_docs_table_name)に保存され、解析されなかったドキュメントは対応するテーブルに隔離されます。

  3. 解析されたドキュメントを 03_chunk_docs ノートブックを使用してチャンクします。 解析と同様に、このノートブックは chunker_library ノートブックを実行します (ここでも、最初のセルとして実行されます)。 解析された各ドキュメントは、 pipeline_configから指定されたチャンカーを使用して小さなチャンクに分割されます。 各チャンクには、ベクトル検索インデックスとの同期に必要な MD5 ハッシュを使用して一意の ID が割り当てられます。 最終的なチャンクはゴールドテーブル(chunked_docs_table_name)にロードされます。

  4. ベクトル検索インデックスを作成/同期します 04_vector_index. このノートブックは、 vectorsearch_configで指定されたベクトル検索エンドポイントの準備状況を確認します。 設定されたインデックスがすでに存在する場合は、ゴールドテーブルとの同期が開始されます。それ以外の場合は、インデックスが作成され、同期がトリガーされます。 ベクトル検索のエンドポイントとインデックスがまだ作成されていない場合は、時間がかかることが予想されます。