Skip to main content

Document Intelligence powered by AI Functions

This tutorial demonstrates an end-to-end Intelligent Document Processing (IDP) pipeline using three Databricks AI Functions.

Text
┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ BRONZE SILVER GOLD │
│ │
│ ┌── gold_consulting_agreements (scope, compensation, ...) │
│ ├── gold_marketing_agreements (territory, campaign, ...) │
│ raw_contracts ──▶ parsed_contracts ──▶ classified_contracts ──▶ extracted fields ──▶ ├── gold_hosting_agreements (SLA, uptime, fees, ...) │
│ (Auto Loader) (ai_parse_document) (ai_classify) (ai_extract) └── gold_affiliate_agreements (commission, terms, ...) │
│ │
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

The pipeline processes SEC-filed legal agreements, classifying each into one of five categories (affiliate, marketing, consulting, hosting, escrow) and extracting relevant terms like party names, dates, and compensation details.

Prerequisites

  • Serverless compute or a SQL warehouse with AI Functions enabled
  • Access to the sample data volume at /Volumes/samples/sec/contracts/
note

The samples.sec.contracts data set is available in all workspaces by default. To process your own PDFs, change SOURCE_PATH in the next cell to a Unity Catalog volume that contains your files. For ingesting PDFs from external sources such as SharePoint or Google Drive, Databricks recommends Lakeflow Connect.

Python
from pyspark.sql import functions as F
import json
import uuid

# Source path — point this at a Unity Catalog volume containing your PDF files.
# The sample path below contains SEC-filed legal agreements.
SOURCE_PATH = "/Volumes/samples/sec/contracts/"

# Serverless compute does not support .cache(), so intermediate results are
# materialized to temp tables instead. A random suffix avoids collisions
# if multiple users run the tutorial concurrently.
_TMP_SUFFIX = uuid.uuid4().hex[:8]

Configuration

Classification labels tell ai_classify which categories to choose from. Each extraction schema defines the short, typed fields that ai_extract will pull for that agreement type.

Modify these to adapt the pipeline to your own document types.

Python
# One-line descriptions used by ai_classify to pick the best label per document.
CLASSIFICATION_LABELS = json.dumps(
{
"affiliate_agreement": "One party refers customers or resells products for commissions or revenue share.",
"marketing_agreement": "One party provides marketing, promotion, distribution, or advertising services.",
"consulting_agreement": "An individual or firm provides advisory or professional services as an independent contractor.",
"hosting_agreement": "One party provides web hosting, server hosting, application hosting, or managed infrastructure.",
"escrow_agreement": "A third-party agent holds materials (source code, documentation) with defined release triggers.",
}
)

# Instructions passed to ai_classify. Filenames carry strong signal for these
# SEC filings, so the classifier is told to trust them unless content disagrees.
CLASSIFICATION_INSTRUCTIONS = """
You are classifying SEC-filed legal agreements into exactly one of five labels.
Read the contract and assign exactly one category:
affiliate_agreement, marketing_agreement, consulting_agreement, hosting_agreement, escrow_agreement.

Decision rules:
* Give strong weight to the contract title and filename when they contain explicit type keywords such as Affiliate Agreement, Marketing Agreement, Consulting Agreement, Hosting Agreement, or Escrow Agreement.
* Only override the filename when the document content clearly and unambiguously describes a different category.

Return only the single best label.
""".strip().replace("\n", " ")
CLASSIFICATION_INSTRUCTIONS_SQL = CLASSIFICATION_INSTRUCTIONS.replace("'", "\\'")

# Shared extraction instructions appended to every per-type prompt.
EXTRACTION_BASE_INSTRUCTIONS = (
"The input is an SEC-filed legal agreement. "
"Use all available context in the input, including any document metadata. "
"Do not extract full sentences, clauses, or paragraph-length descriptions. "
"If a dollar amount is redacted (e.g., [***]), extract the surrounding structure (e.g., [***]% of revenue above [***] threshold). "
)

# Per-type extraction schemas. Each schema lists short fields that ai_extract
# will populate for every document classified into that type.
EXTRACTION_CONFIGS = {
"affiliate_agreement": {
"schema": {
"party_1_name": {"type": "string", "description": "Legal name of the first party \u2014 must be an actual company or legal entity name, not a role or generic label (e.g., not 'Affiliate', 'Company', or 'Licensor'). Extract from the preamble, recitals, signature block, or document title. Every affiliate agreement involves exactly two parties \u2014 if not found in the body text, infer from the SEC filing entity or any other available context."},
"party_2_name": {"type": "string", "description": "Legal name of the second party \u2014 must be an actual company or legal entity name, not a role or generic label (e.g., not 'Affiliate', 'Company', or 'Licensor'). Extract from the preamble, recitals, signature block, or document title. Every affiliate agreement involves exactly two parties \u2014 if not found in the body text, infer from the SEC filing entity or any other available context."},
"commission_rate": {"type": "string", "description": "Primary rate or structure in a short phrase (e.g., 50/50 revenue share, 15-25% tiered discount, $55/referral)."},
"payment_frequency": {"type": "string", "description": "How often payments are made (e.g., Monthly, Net 30, Quarterly)."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is an affiliate agreement.",
},
"marketing_agreement": {
"schema": {
"party_1_name": {"type": "string", "description": "Legal name of the first party."},
"party_2_name": {"type": "string", "description": "Legal name of the second party."},
"effective_date": {"type": "string", "description": "Contract start date (e.g., January 30, 2000)."},
"territory": {"type": "string", "description": "Geographic scope as a place name only (e.g., United States, Texas, New York). Must be an actual geographic location. If the territory references an exhibit or schedule, or no specific place is named, return null."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is a marketing agreement.",
},
"consulting_agreement": {
"schema": {
"company_name": {"type": "string", "description": "Legal name of the company engaging the consultant."},
"consultant_name": {"type": "string", "description": "Legal name of the consultant or consulting firm."},
"compensation_amount": {"type": "string", "description": "Rate or total with currency and period (e.g., EUR 500/hour, $18,000/month, $250,000 lump sum)."},
"effective_date": {"type": "string", "description": "Contract start date (e.g., May 1, 2019)."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is a consulting agreement.",
},
"hosting_agreement": {
"schema": {
"provider_name": {"type": "string", "description": "Legal name of the hosting provider."},
"customer_name": {"type": "string", "description": "Legal name of the customer."},
"effective_date": {"type": "string", "description": "Contract start date (e.g., March 1, 2005)."},
"term_length": {"type": "string", "description": "Duration or term condition as a short phrase. May be a fixed period (e.g., 12 months, 2 years) or an event-dependent term (e.g., coterminous with License Agreement, until termination of Service Agreement). Always use digits for numbers, never words."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is a hosting agreement.",
},
"escrow_agreement": {
"schema": {
"owner_name": {"type": "string", "description": "Legal name of the depositor or software developer."},
"licensee_name": {"type": "string", "description": "Legal name of the beneficiary or licensee."},
"escrow_agent_name": {"type": "string", "description": "Legal name of the escrow agent."},
"software_name": {"type": "string", "description": "Name of the escrowed software or materials."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is an escrow agreement.",
},
}

print(f"Configured {len(json.loads(CLASSIFICATION_LABELS))} classification labels")
print(f"Configured {len(EXTRACTION_CONFIGS)} extraction schemas: {', '.join(EXTRACTION_CONFIGS.keys())}")
Python
def _flatten_extraction(contract_type: str):
"""Return a transform that filters to `contract_type`, calls ai_extract on
the batch, and flattens the JSON response into typed columns."""
config = EXTRACTION_CONFIGS[contract_type]
schema_json = json.dumps(config["schema"]).replace("'", "\\'")
instructions = config["instructions"].replace("'", "\\'")

def transform(df):
# ai_extract runs once per batch — each row gets its own extraction,
# but Spark pushes the whole batch to the AI function in parallel.
extracted = (
df.filter(F.col("contract_type") == contract_type)
.select(
F.col("path"),
F.col("contract_type"),
F.col("parsed_content"),
F.expr(
f"""
ai_extract(
parsed_content,
'{schema_json}',
MAP('instructions', '{instructions}')
)
"""
).alias("extracted"),
)
)

# Flatten the nested JSON response into top-level STRING columns.
select_cols = [F.col("path"), F.col("contract_type")]
for field_name in config["schema"]:
select_cols.append(F.expr(f"extracted:response.{field_name}::STRING").alias(field_name))

return extracted.select(*select_cols)

return transform

Bronze Layer — Ingest Raw PDFs

Read PDF files as binary using Spark's binaryFile format. Each row contains the file path, raw content bytes, length, and modification timestamp.

Production tip: For incremental ingestion, replace spark.read with Auto Loader (cloudFiles format) so only new files are processed on each run.

Python
raw_contracts_df = spark.read.format("binaryFile").load(SOURCE_PATH)

print(f"Loaded {raw_contracts_df.count()} documents from {SOURCE_PATH}")
display(raw_contracts_df.select("path", "length", "modificationTime"))

Silver Layer — Parse & Classify

Parseai_parse_document converts raw PDF bytes into a structured VARIANT containing document elements, layout metadata, and file information.

Classifyai_classify accepts the VARIANT output directly from ai_parse_document — no need to cast to string. Documents with parsing errors are filtered out before classification. The classifier gives strong weight to filename keywords unless the document content clearly contradicts them.

Python
parsed_contracts_df = raw_contracts_df.select(
F.col("path"),
F.expr("ai_parse_document(content, MAP('version', '2.0'))").alias("parsed_content"),
)

# Materialize parsed results to a temp table so downstream steps
# read from the table rather than re-invoking ai_parse_document.
_parsed_table = f"_tmp_idp_parsed_{_TMP_SUFFIX}"
parsed_contracts_df.write.mode("overwrite").saveAsTable(_parsed_table)
parsed_contracts_df = spark.table(_parsed_table)

num_parsed = parsed_contracts_df.count()
print(f"Parsed {num_parsed} documents")
display(parsed_contracts_df.limit(5))
Python
classified_contracts_df = (
parsed_contracts_df
.filter("TRY_CAST(parsed_content:error_status AS STRING) IS NULL")
.select(
F.col("path"),
F.col("parsed_content"),
F.expr(
f"""
ai_classify(
parsed_content,
'{CLASSIFICATION_LABELS}',
MAP('instructions', '{CLASSIFICATION_INSTRUCTIONS_SQL}')
)
"""
).alias("classification"),
)
.select(
F.col("path"),
F.col("parsed_content"),
F.col("classification"),
F.expr("classification:response[0]::STRING").alias("contract_type"),
)
)

# Materialize classified results to a temp table so each gold-layer
# extraction reads from the table rather than re-invoking ai_classify.
_classified_table = f"_tmp_idp_classified_{_TMP_SUFFIX}"
classified_contracts_df.write.mode("overwrite").saveAsTable(_classified_table)
classified_contracts_df = spark.table(_classified_table)

num_classified = classified_contracts_df.count()
print(f"Classified {num_classified} documents")
display(classified_contracts_df.select("path", "contract_type"))

Gold Layer — Extract Structured Fields

Each agreement type has a dedicated extraction schema with three to four short fields. ai_extract pulls party names, dates, dollar amounts, and brief phrases from each classified document. The loop below processes all five types and displays results inline.

Python
gold_dfs = {}

for contract_type in EXTRACTION_CONFIGS:
transform = _flatten_extraction(contract_type)
gold_df = transform(classified_contracts_df)
gold_dfs[contract_type] = gold_df

print(f"\n{'=' * 60}")
print(f" {contract_type.replace('_', ' ').title()}")
print(f"{'=' * 60}")
display(gold_df)

(Optional) Persist to Delta Tables

To save results for downstream workflows, analytics or dashboards, uncomment the cell below and set your target catalog and schema. Persisting the parsed documents is recommended — it allows future runs to skip the parsing step and read directly from the table.

Python
# Uncomment and configure to persist tables
# TARGET_CATALOG = "your_catalog"
# TARGET_SCHEMA = "your_schema"
#
# # Parsed documents — persist to avoid re-running ai_parse_document
# parsed_contracts_df.write.mode("overwrite").saveAsTable(
# f"{TARGET_CATALOG}.{TARGET_SCHEMA}.parsed_contracts"
# )
# print(f"Wrote parsed contracts to {TARGET_CATALOG}.{TARGET_SCHEMA}.parsed_contracts")
#
# # Classified documents
# classified_contracts_df.select("path", "contract_type").write.mode("overwrite").saveAsTable(
# f"{TARGET_CATALOG}.{TARGET_SCHEMA}.classified_contracts"
# )
# print(f"Wrote classifications to {TARGET_CATALOG}.{TARGET_SCHEMA}.classified_contracts")
#
# # Gold tables — one per agreement type
# for contract_type, gold_df in gold_dfs.items():
# table_name = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.gold_{contract_type}s"
# gold_df.write.mode("overwrite").saveAsTable(table_name)
# print(f"Wrote to {table_name}")
#
# print("Done — all tables persisted.")

Example notebook

Document Intelligence powered by AI Functions

Open notebook in new tab