AI Functions を活用したドキュメント インテリジェンス
このチュートリアルでは、3つのDatabricks AI Functionsを使用して、エンドツーエンドの Intelligent Document Processing (IDP) パイプラインをデモンストレーションします。
┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ BRONZE SILVER GOLD │
│ │
│ ┌── gold_consulting_agreements (scope, compensation, ...) │
│ ├── gold_marketing_agreements (territory, campaign, ...) │
│ raw_contracts ──▶ parsed_contracts ──▶ classified_contracts ──▶ extracted fields ──▶ ├── gold_hosting_agreements (SLA, uptime, fees, ...) │
│ (Auto Loader) (ai_parse_document) (ai_classify) (ai_extract) └── gold_affiliate_agreements (commission, terms, ...) │
│ │
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
パイプラインは、SECに提出された法的契約書を処理し、それぞれを5つのカテゴリ(アフィリエイト、マーケティング、コンサルティング、ホスティング、エスクロー)のいずれかに分類し、当事者の氏名、日付、報酬の詳細などの関連用語を抽出します。
前提条件
- サーバレスコンピュート、またはAI Functionsが有効なSQLウェアハウス
- サンプルデータボリュームへのアクセス
/Volumes/samples/sec/contracts/
samples.sec.contracts データセットは、すべてのワークスペースでデフォルトで利用可能です。独自の PDF を処理するには、次のセルで SOURCE_PATH を、ファイルを含む Unity Catalog ボリュームに変更してください。SharePoint や Google Drive などの外部ソースから PDF を取り込むには、Databricks は Lakeflow Connect を推奨しています。
from pyspark.sql import functions as F
import json
import uuid
# Source path — point this at a Unity Catalog volume containing your PDF files.
# The sample path below contains SEC-filed legal agreements.
SOURCE_PATH = "/Volumes/samples/sec/contracts/"
# Serverless compute does not support .cache(), so intermediate results are
# materialized to temp tables instead. A random suffix avoids collisions
# if multiple users run the tutorial concurrently.
_TMP_SUFFIX = uuid.uuid4().hex[:8]
構成
分類ラベルは、ai_classify に選択すべきカテゴリを示します。各抽出スキーマは、その契約タイプのために ai_extract がプルする、短く型指定されたフィールドを定義します。
これらを変更して、パイプラインを自身のドキュメントタイプに適応させます。
# One-line descriptions used by ai_classify to pick the best label per document.
CLASSIFICATION_LABELS = json.dumps(
{
"affiliate_agreement": "One party refers customers or resells products for commissions or revenue share.",
"marketing_agreement": "One party provides marketing, promotion, distribution, or advertising services.",
"consulting_agreement": "An individual or firm provides advisory or professional services as an independent contractor.",
"hosting_agreement": "One party provides web hosting, server hosting, application hosting, or managed infrastructure.",
"escrow_agreement": "A third-party agent holds materials (source code, documentation) with defined release triggers.",
}
)
# Instructions passed to ai_classify. Filenames carry strong signal for these
# SEC filings, so the classifier is told to trust them unless content disagrees.
CLASSIFICATION_INSTRUCTIONS = """
You are classifying SEC-filed legal agreements into exactly one of five labels.
Read the contract and assign exactly one category:
affiliate_agreement, marketing_agreement, consulting_agreement, hosting_agreement, escrow_agreement.
Decision rules:
* Give strong weight to the contract title and filename when they contain explicit type keywords such as Affiliate Agreement, Marketing Agreement, Consulting Agreement, Hosting Agreement, or Escrow Agreement.
* Only override the filename when the document content clearly and unambiguously describes a different category.
Return only the single best label.
""".strip().replace("\n", " ")
CLASSIFICATION_INSTRUCTIONS_SQL = CLASSIFICATION_INSTRUCTIONS.replace("'", "\\'")
# Shared extraction instructions appended to every per-type prompt.
EXTRACTION_BASE_INSTRUCTIONS = (
"The input is an SEC-filed legal agreement. "
"Use all available context in the input, including any document metadata. "
"Do not extract full sentences, clauses, or paragraph-length descriptions. "
"If a dollar amount is redacted (e.g., [***]), extract the surrounding structure (e.g., [***]% of revenue above [***] threshold). "
)
# Per-type extraction schemas. Each schema lists short fields that ai_extract
# will populate for every document classified into that type.
EXTRACTION_CONFIGS = {
"affiliate_agreement": {
"schema": {
"party_1_name": {"type": "string", "description": "Legal name of the first party \u2014 must be an actual company or legal entity name, not a role or generic label (e.g., not 'Affiliate', 'Company', or 'Licensor'). Extract from the preamble, recitals, signature block, or document title. Every affiliate agreement involves exactly two parties \u2014 if not found in the body text, infer from the SEC filing entity or any other available context."},
"party_2_name": {"type": "string", "description": "Legal name of the second party \u2014 must be an actual company or legal entity name, not a role or generic label (e.g., not 'Affiliate', 'Company', or 'Licensor'). Extract from the preamble, recitals, signature block, or document title. Every affiliate agreement involves exactly two parties \u2014 if not found in the body text, infer from the SEC filing entity or any other available context."},
"commission_rate": {"type": "string", "description": "Primary rate or structure in a short phrase (e.g., 50/50 revenue share, 15-25% tiered discount, $55/referral)."},
"payment_frequency": {"type": "string", "description": "How often payments are made (e.g., Monthly, Net 30, Quarterly)."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is an affiliate agreement.",
},
"marketing_agreement": {
"schema": {
"party_1_name": {"type": "string", "description": "Legal name of the first party."},
"party_2_name": {"type": "string", "description": "Legal name of the second party."},
"effective_date": {"type": "string", "description": "Contract start date (e.g., January 30, 2000)."},
"territory": {"type": "string", "description": "Geographic scope as a place name only (e.g., United States, Texas, New York). Must be an actual geographic location. If the territory references an exhibit or schedule, or no specific place is named, return null."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is a marketing agreement.",
},
"consulting_agreement": {
"schema": {
"company_name": {"type": "string", "description": "Legal name of the company engaging the consultant."},
"consultant_name": {"type": "string", "description": "Legal name of the consultant or consulting firm."},
"compensation_amount": {"type": "string", "description": "Rate or total with currency and period (e.g., EUR 500/hour, $18,000/month, $250,000 lump sum)."},
"effective_date": {"type": "string", "description": "Contract start date (e.g., May 1, 2019)."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is a consulting agreement.",
},
"hosting_agreement": {
"schema": {
"provider_name": {"type": "string", "description": "Legal name of the hosting provider."},
"customer_name": {"type": "string", "description": "Legal name of the customer."},
"effective_date": {"type": "string", "description": "Contract start date (e.g., March 1, 2005)."},
"term_length": {"type": "string", "description": "Duration or term condition as a short phrase. May be a fixed period (e.g., 12 months, 2 years) or an event-dependent term (e.g., coterminous with License Agreement, until termination of Service Agreement). Always use digits for numbers, never words."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is a hosting agreement.",
},
"escrow_agreement": {
"schema": {
"owner_name": {"type": "string", "description": "Legal name of the depositor or software developer."},
"licensee_name": {"type": "string", "description": "Legal name of the beneficiary or licensee."},
"escrow_agent_name": {"type": "string", "description": "Legal name of the escrow agent."},
"software_name": {"type": "string", "description": "Name of the escrowed software or materials."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is an escrow agreement.",
},
}
print(f"Configured {len(json.loads(CLASSIFICATION_LABELS))} classification labels")
print(f"Configured {len(EXTRACTION_CONFIGS)} extraction schemas: {', '.join(EXTRACTION_CONFIGS.keys())}")
def _flatten_extraction(contract_type: str):
"""Return a transform that filters to `contract_type`, calls ai_extract on
the batch, and flattens the JSON response into typed columns."""
config = EXTRACTION_CONFIGS[contract_type]
schema_json = json.dumps(config["schema"]).replace("'", "\\'")
instructions = config["instructions"].replace("'", "\\'")
def transform(df):
# ai_extract runs once per batch — each row gets its own extraction,
# but Spark pushes the whole batch to the AI function in parallel.
extracted = (
df.filter(F.col("contract_type") == contract_type)
.select(
F.col("path"),
F.col("contract_type"),
F.col("parsed_content"),
F.expr(
f"""
ai_extract(
parsed_content,
'{schema_json}',
MAP('instructions', '{instructions}')
)
"""
).alias("extracted"),
)
)
# Flatten the nested JSON response into top-level STRING columns.
select_cols = [F.col("path"), F.col("contract_type")]
for field_name in config["schema"]:
select_cols.append(F.expr(f"extracted:response.{field_name}::STRING").alias(field_name))
return extracted.select(*select_cols)
return transform
ブロンズレイヤー:生PDFを取り込む
SparkのbinaryFile形式を使用して、PDFファイルをバイナリとして読み取ります。各行には、ファイルパス、生コンテンツバイト、長さ、および変更タイムスタンプが含まれています。
本番運用に関するヒント: 増分データ取り込みの場合は、
spark.readを Auto Loader (cloudFiles形式) に置き換えることで、実行ごとに新しいファイルのみが処理されます。
raw_contracts_df = spark.read.format("binaryFile").load(SOURCE_PATH)
print(f"Loaded {raw_contracts_df.count()} documents from {SOURCE_PATH}")
display(raw_contracts_df.select("path", "length", "modificationTime"))
シルバーレイヤー — 解析 & 分類
解析 — ai_parse_document は、生の PDF バイトを、ドキュメント要素、レイアウトメタデータ、ファイル情報を含む構造化された VARIANT に変換します。
分類 :ai_classifyはai_parse_documentからVARIANT出力を直接受け入れます。文字列にキャストする必要はありません。解析エラーのあるドキュメントは、分類前に除外されます。分類器は、ドキュメントの内容が明確に矛盾しない限り、ファイル名のキーワードを強く重視します。
parsed_contracts_df = raw_contracts_df.select(
F.col("path"),
F.expr("ai_parse_document(content, MAP('version', '2.0'))").alias("parsed_content"),
)
# Materialize parsed results to a temp table so downstream steps
# read from the table rather than re-invoking ai_parse_document.
_parsed_table = f"_tmp_idp_parsed_{_TMP_SUFFIX}"
parsed_contracts_df.write.mode("overwrite").saveAsTable(_parsed_table)
parsed_contracts_df = spark.table(_parsed_table)
num_parsed = parsed_contracts_df.count()
print(f"Parsed {num_parsed} documents")
display(parsed_contracts_df.limit(5))
classified_contracts_df = (
parsed_contracts_df
.filter("TRY_CAST(parsed_content:error_status AS STRING) IS NULL")
.select(
F.col("path"),
F.col("parsed_content"),
F.expr(
f"""
ai_classify(
parsed_content,
'{CLASSIFICATION_LABELS}',
MAP('instructions', '{CLASSIFICATION_INSTRUCTIONS_SQL}')
)
"""
).alias("classification"),
)
.select(
F.col("path"),
F.col("parsed_content"),
F.col("classification"),
F.expr("classification:response[0]::STRING").alias("contract_type"),
)
)
# Materialize classified results to a temp table so each gold-layer
# extraction reads from the table rather than re-invoking ai_classify.
_classified_table = f"_tmp_idp_classified_{_TMP_SUFFIX}"
classified_contracts_df.write.mode("overwrite").saveAsTable(_classified_table)
classified_contracts_df = spark.table(_classified_table)
num_classified = classified_contracts_df.count()
print(f"Classified {num_classified} documents")
display(classified_contracts_df.select("path", "contract_type"))
ゴールドレイヤー — 構造化されたフィールドの抽出
各契約タイプには、3〜4個の短いフィールドを持つ専用の抽出スキーマがあります。ai_extractは、分類された各ドキュメントから、当事者名、日付、金額、および短いフレーズを抽出します。以下のループは、これら5つのタイプすべてを処理し、結果をインラインで表示します。
gold_dfs = {}
for contract_type in EXTRACTION_CONFIGS:
transform = _flatten_extraction(contract_type)
gold_df = transform(classified_contracts_df)
gold_dfs[contract_type] = gold_df
print(f"\n{'=' * 60}")
print(f" {contract_type.replace('_', ' ').title()}")
print(f"{'=' * 60}")
display(gold_df)
(オプション) Deltaテーブルに永続化
ダウンストリームのワークフロー、アナリティクス、またはダッシュボード用に結果を保存するには、以下のセルをアンコメントし、ターゲットカタログとスキーマを設定します。解析されたドキュメントを永続化することをお勧めします。これにより、今後の実行では解析ステップをスキップしてテーブルから直接読み取ることができます。
# Uncomment and configure to persist tables
# TARGET_CATALOG = "your_catalog"
# TARGET_SCHEMA = "your_schema"
#
# # Parsed documents — persist to avoid re-running ai_parse_document
# parsed_contracts_df.write.mode("overwrite").saveAsTable(
# f"{TARGET_CATALOG}.{TARGET_SCHEMA}.parsed_contracts"
# )
# print(f"Wrote parsed contracts to {TARGET_CATALOG}.{TARGET_SCHEMA}.parsed_contracts")
#
# # Classified documents
# classified_contracts_df.select("path", "contract_type").write.mode("overwrite").saveAsTable(
# f"{TARGET_CATALOG}.{TARGET_SCHEMA}.classified_contracts"
# )
# print(f"Wrote classifications to {TARGET_CATALOG}.{TARGET_SCHEMA}.classified_contracts")
#
# # Gold tables — one per agreement type
# for contract_type, gold_df in gold_dfs.items():
# table_name = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.gold_{contract_type}s"
# gold_df.write.mode("overwrite").saveAsTable(table_name)
# print(f"Wrote to {table_name}")
#
# print("Done — all tables persisted.")