メインコンテンツまでスキップ

AI Functionsを活用したドキュメントインテリジェンス

このチュートリアルでは、3 つのDatabricks AI Functionsを使用したエンドツーエンドの インテリジェント ドキュメント処理 (IDP) パイプラインを示します。

Text
┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ BRONZE SILVER GOLD │
│ │
│ ┌── gold_consulting_agreements (scope, compensation, ...) │
│ ├── gold_marketing_agreements (territory, campaign, ...) │
│ raw_contracts ──▶ parsed_contracts ──▶ classified_contracts ──▶ extracted fields ──▶ ├── gold_hosting_agreements (SLA, uptime, fees, ...) │
│ (Auto Loader) (ai_parse_document) (ai_classify) (ai_extract) └── gold_affiliate_agreements (commission, terms, ...) │
│ │
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

このパイプラインは、SECに提出された法的契約書を処理し、それぞれを5つのカテゴリ(アフィリエイト、マーケティング、コンサルティング、ホスティング、エスクロー)のいずれかに分類し、当事者名、日付、報酬の詳細などの関連用語を抽出します。

前提条件

  • AI Functionsを有効にしたサーバーレス コンピュートまたはSQLウェアハウス
  • サンプルデータボリュームへのアクセスはこちら /Volumes/samples/sec/contracts/
注記

samples.sec.contractsデータセットは、デフォルトで全てのワークスペースで利用可能です。独自の PDF を処理するには、次のセルのSOURCE_PATHを、ファイルを含むUnity Catalogボリュームに変更してください。 SharePointやGoogle Driveなどの外部からPDFを取り込むには、 Databricks LakeFlow Connect推奨しています。

Python
from pyspark.sql import functions as F
import json
import uuid

# Source path — point this at a Unity Catalog volume containing your PDF files.
# The sample path below contains SEC-filed legal agreements.
SOURCE_PATH = "/Volumes/samples/sec/contracts/"

# Serverless compute does not support .cache(), so intermediate results are
# materialized to temp tables instead. A random suffix avoids collisions
# if multiple users run the tutorial concurrently.
_TMP_SUFFIX = uuid.uuid4().hex[:8]

構成

分類ラベルは、 ai_classifyがどのカテゴリから選択するかを示します。各抽出スキーマは、 ai_extractその契約タイプに対して取得する、短く型付けされたフィールドを定義します。

これらの項目を修正して、パイプラインを独自のドキュメントタイプに合わせて調整してください。

Python
# One-line descriptions used by ai_classify to pick the best label per document.
CLASSIFICATION_LABELS = json.dumps(
{
"affiliate_agreement": "One party refers customers or resells products for commissions or revenue share.",
"marketing_agreement": "One party provides marketing, promotion, distribution, or advertising services.",
"consulting_agreement": "An individual or firm provides advisory or professional services as an independent contractor.",
"hosting_agreement": "One party provides web hosting, server hosting, application hosting, or managed infrastructure.",
"escrow_agreement": "A third-party agent holds materials (source code, documentation) with defined release triggers.",
}
)

# Instructions passed to ai_classify. Filenames carry strong signal for these
# SEC filings, so the classifier is told to trust them unless content disagrees.
CLASSIFICATION_INSTRUCTIONS = """
You are classifying SEC-filed legal agreements into exactly one of five labels.
Read the contract and assign exactly one category:
affiliate_agreement, marketing_agreement, consulting_agreement, hosting_agreement, escrow_agreement.

Decision rules:
* Give strong weight to the contract title and filename when they contain explicit type keywords such as Affiliate Agreement, Marketing Agreement, Consulting Agreement, Hosting Agreement, or Escrow Agreement.
* Only override the filename when the document content clearly and unambiguously describes a different category.

Return only the single best label.
""".strip().replace("\n", " ")
CLASSIFICATION_INSTRUCTIONS_SQL = CLASSIFICATION_INSTRUCTIONS.replace("'", "\\'")

# Shared extraction instructions appended to every per-type prompt.
EXTRACTION_BASE_INSTRUCTIONS = (
"The input is an SEC-filed legal agreement. "
"Use all available context in the input, including any document metadata. "
"Do not extract full sentences, clauses, or paragraph-length descriptions. "
"If a dollar amount is redacted (e.g., [***]), extract the surrounding structure (e.g., [***]% of revenue above [***] threshold). "
)

# Per-type extraction schemas. Each schema lists short fields that ai_extract
# will populate for every document classified into that type.
EXTRACTION_CONFIGS = {
"affiliate_agreement": {
"schema": {
"party_1_name": {"type": "string", "description": "Legal name of the first party \u2014 must be an actual company or legal entity name, not a role or generic label (e.g., not 'Affiliate', 'Company', or 'Licensor'). Extract from the preamble, recitals, signature block, or document title. Every affiliate agreement involves exactly two parties \u2014 if not found in the body text, infer from the SEC filing entity or any other available context."},
"party_2_name": {"type": "string", "description": "Legal name of the second party \u2014 must be an actual company or legal entity name, not a role or generic label (e.g., not 'Affiliate', 'Company', or 'Licensor'). Extract from the preamble, recitals, signature block, or document title. Every affiliate agreement involves exactly two parties \u2014 if not found in the body text, infer from the SEC filing entity or any other available context."},
"commission_rate": {"type": "string", "description": "Primary rate or structure in a short phrase (e.g., 50/50 revenue share, 15-25% tiered discount, $55/referral)."},
"payment_frequency": {"type": "string", "description": "How often payments are made (e.g., Monthly, Net 30, Quarterly)."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is an affiliate agreement.",
},
"marketing_agreement": {
"schema": {
"party_1_name": {"type": "string", "description": "Legal name of the first party."},
"party_2_name": {"type": "string", "description": "Legal name of the second party."},
"effective_date": {"type": "string", "description": "Contract start date (e.g., January 30, 2000)."},
"territory": {"type": "string", "description": "Geographic scope as a place name only (e.g., United States, Texas, New York). Must be an actual geographic location. If the territory references an exhibit or schedule, or no specific place is named, return null."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is a marketing agreement.",
},
"consulting_agreement": {
"schema": {
"company_name": {"type": "string", "description": "Legal name of the company engaging the consultant."},
"consultant_name": {"type": "string", "description": "Legal name of the consultant or consulting firm."},
"compensation_amount": {"type": "string", "description": "Rate or total with currency and period (e.g., EUR 500/hour, $18,000/month, $250,000 lump sum)."},
"effective_date": {"type": "string", "description": "Contract start date (e.g., May 1, 2019)."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is a consulting agreement.",
},
"hosting_agreement": {
"schema": {
"provider_name": {"type": "string", "description": "Legal name of the hosting provider."},
"customer_name": {"type": "string", "description": "Legal name of the customer."},
"effective_date": {"type": "string", "description": "Contract start date (e.g., March 1, 2005)."},
"term_length": {"type": "string", "description": "Duration or term condition as a short phrase. May be a fixed period (e.g., 12 months, 2 years) or an event-dependent term (e.g., coterminous with License Agreement, until termination of Service Agreement). Always use digits for numbers, never words."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is a hosting agreement.",
},
"escrow_agreement": {
"schema": {
"owner_name": {"type": "string", "description": "Legal name of the depositor or software developer."},
"licensee_name": {"type": "string", "description": "Legal name of the beneficiary or licensee."},
"escrow_agent_name": {"type": "string", "description": "Legal name of the escrow agent."},
"software_name": {"type": "string", "description": "Name of the escrowed software or materials."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is an escrow agreement.",
},
}

print(f"Configured {len(json.loads(CLASSIFICATION_LABELS))} classification labels")
print(f"Configured {len(EXTRACTION_CONFIGS)} extraction schemas: {', '.join(EXTRACTION_CONFIGS.keys())}")
Python
def _flatten_extraction(contract_type: str):
"""Return a transform that filters to `contract_type`, calls ai_extract on
the batch, and flattens the JSON response into typed columns."""
config = EXTRACTION_CONFIGS[contract_type]
schema_json = json.dumps(config["schema"]).replace("'", "\\'")
instructions = config["instructions"].replace("'", "\\'")

def transform(df):
# ai_extract runs once per batch — each row gets its own extraction,
# but Spark pushes the whole batch to the AI function in parallel.
extracted = (
df.filter(F.col("contract_type") == contract_type)
.select(
F.col("path"),
F.col("contract_type"),
F.col("parsed_content"),
F.expr(
f"""
ai_extract(
parsed_content,
'{schema_json}',
MAP('instructions', '{instructions}')
)
"""
).alias("extracted"),
)
)

# Flatten the nested JSON response into top-level STRING columns.
select_cols = [F.col("path"), F.col("contract_type")]
for field_name in config["schema"]:
select_cols.append(F.expr(f"extracted:response.{field_name}::STRING").alias(field_name))

return extracted.select(*select_cols)

return transform

ブロンズレイヤー — 生のPDFを取り込む

SparkのbinaryFile形式を使用してPDFファイルをバイナリとして読み込みます。各行には、ファイルパス、生コンテンツのバイト数、長さ、および更新日時が含まれています。

本番運用のヒント: 増分取り込みの場合は、実行ごとに新しいファイルのみが処理されるように、 spark.read Auto Loader ( cloudFiles形式) に置き換えます。

Python
raw_contracts_df = spark.read.format("binaryFile").load(SOURCE_PATH)

print(f"Loaded {raw_contracts_df.count()} documents from {SOURCE_PATH}")
display(raw_contracts_df.select("path", "length", "modificationTime"))

シルバーレイヤー — 解析と分類

Parseai_parse_document 、生の PDF バイトを、ドキュメント要素、レイアウトメタデータ、およびファイル情報を含む構造化された VARIANT に変換します。

分類ai_classify ai_parse_documentから VARIANT 出力を直接受け取ります — 文字列にキャストする必要はありません。解析エラーのある文書は、分類前に除外されます。分類器は、文書の内容がファイル名のキーワードと明らかに矛盾しない限り、ファイル名のキーワードに高い重み付けをする。

Python
parsed_contracts_df = raw_contracts_df.select(
F.col("path"),
F.expr("ai_parse_document(content, MAP('version', '2.0'))").alias("parsed_content"),
)

# Materialize parsed results to a temp table so downstream steps
# read from the table rather than re-invoking ai_parse_document.
_parsed_table = f"_tmp_idp_parsed_{_TMP_SUFFIX}"
parsed_contracts_df.write.mode("overwrite").saveAsTable(_parsed_table)
parsed_contracts_df = spark.table(_parsed_table)

num_parsed = parsed_contracts_df.count()
print(f"Parsed {num_parsed} documents")
display(parsed_contracts_df.limit(5))
Python
classified_contracts_df = (
parsed_contracts_df
.filter("TRY_CAST(parsed_content:error_status AS STRING) IS NULL")
.select(
F.col("path"),
F.col("parsed_content"),
F.expr(
f"""
ai_classify(
parsed_content,
'{CLASSIFICATION_LABELS}',
MAP('instructions', '{CLASSIFICATION_INSTRUCTIONS_SQL}')
)
"""
).alias("classification"),
)
.select(
F.col("path"),
F.col("parsed_content"),
F.col("classification"),
F.expr("classification:response[0]::STRING").alias("contract_type"),
)
)

# Materialize classified results to a temp table so each gold-layer
# extraction reads from the table rather than re-invoking ai_classify.
_classified_table = f"_tmp_idp_classified_{_TMP_SUFFIX}"
classified_contracts_df.write.mode("overwrite").saveAsTable(_classified_table)
classified_contracts_df = spark.table(_classified_table)

num_classified = classified_contracts_df.count()
print(f"Classified {num_classified} documents")
display(classified_contracts_df.select("path", "contract_type"))

ゴールドレイヤー - 構造化フィールドの抽出

各契約タイプには、3~4個の短いフィールドからなる専用の抽出スキーマが用意されています。ai_extract各機密文書から当事者名、日付、金額、および短いフレーズを抽出します。以下のループは5種類すべての型を処理し、結果をインラインで表示します。

Python
gold_dfs = {}

for contract_type in EXTRACTION_CONFIGS:
transform = _flatten_extraction(contract_type)
gold_df = transform(classified_contracts_df)
gold_dfs[contract_type] = gold_df

print(f"\n{'=' * 60}")
print(f" {contract_type.replace('_', ' ').title()}")
print(f"{'=' * 60}")
display(gold_df)

(オプション) Deltaテーブルに永続化する

結果をダウンストリームのワークフロー、アナリティクス、またはダッシュボードに保存するには、以下のセルのコメントを解除し、ターゲット カタログとスキーマを設定します。 解析済みのドキュメントを永続化することをお勧めします。これにより、次回以降の実行時に解析ステップをスキップして、テーブルから直接読み込むことができます。

Python
# Uncomment and configure to persist tables
# TARGET_CATALOG = "your_catalog"
# TARGET_SCHEMA = "your_schema"
#
# # Parsed documents — persist to avoid re-running ai_parse_document
# parsed_contracts_df.write.mode("overwrite").saveAsTable(
# f"{TARGET_CATALOG}.{TARGET_SCHEMA}.parsed_contracts"
# )
# print(f"Wrote parsed contracts to {TARGET_CATALOG}.{TARGET_SCHEMA}.parsed_contracts")
#
# # Classified documents
# classified_contracts_df.select("path", "contract_type").write.mode("overwrite").saveAsTable(
# f"{TARGET_CATALOG}.{TARGET_SCHEMA}.classified_contracts"
# )
# print(f"Wrote classifications to {TARGET_CATALOG}.{TARGET_SCHEMA}.classified_contracts")
#
# # Gold tables — one per agreement type
# for contract_type, gold_df in gold_dfs.items():
# table_name = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.gold_{contract_type}s"
# gold_df.write.mode("overwrite").saveAsTable(table_name)
# print(f"Wrote to {table_name}")
#
# print("Done — all tables persisted.")

サンプルノートブック

AI Functionsを活用したドキュメントインテリジェンス

ノートブックを新しいタブで開く