Inteligência de documentos com tecnologia AI Functions
Este tutorial demonstra um pipeline de Processamento Inteligente de Documentos (IDP) completo, utilizando três AI Functions Databricks .
┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ BRONZE SILVER GOLD │
│ │
│ ┌── gold_consulting_agreements (scope, compensation, ...) │
│ ├── gold_marketing_agreements (territory, campaign, ...) │
│ raw_contracts ──▶ parsed_contracts ──▶ classified_contracts ──▶ extracted fields ──▶ ├── gold_hosting_agreements (SLA, uptime, fees, ...) │
│ (Auto Loader) (ai_parse_document) (ai_classify) (ai_extract) └── gold_affiliate_agreements (commission, terms, ...) │
│ │
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
O pipeline processa contratos legais arquivados na SEC, classificando cada um em uma das cinco categorias (afiliados, marketing, consultoria, hospedagem, garantia) e extraindo termos relevantes como nomes das partes, datas e detalhes de remuneração.
Pré-requisitos
- compute sem servidor ou um SQL warehouse com AI Functions habilitadas
- Acesso ao volume de dados de amostra em
/Volumes/samples/sec/contracts/
O conjunto de dados samples.sec.contracts está disponível em todos os espaços de trabalho por default. Para processar seus próprios PDFs, altere SOURCE_PATH na próxima célula para um volume Unity Catalog que contenha seus arquivos. Para importar PDFs de fontes externas, como SharePoint ou Google Drive, Databricks recomenda LakeFlow Connect.
from pyspark.sql import functions as F
import json
import uuid
# Source path — point this at a Unity Catalog volume containing your PDF files.
# The sample path below contains SEC-filed legal agreements.
SOURCE_PATH = "/Volumes/samples/sec/contracts/"
# Serverless compute does not support .cache(), so intermediate results are
# materialized to temp tables instead. A random suffix avoids collisions
# if multiple users run the tutorial concurrently.
_TMP_SUFFIX = uuid.uuid4().hex[:8]
Configuração
O rótulo de classificação informa ai_classify quais categorias escolher. Cada esquema de extração define os campos curtos e tipados que ai_extract irá extrair para esse tipo de acordo.
Modifique-os para adaptar o pipeline aos seus próprios tipos de documento.
# One-line descriptions used by ai_classify to pick the best label per document.
CLASSIFICATION_LABELS = json.dumps(
{
"affiliate_agreement": "One party refers customers or resells products for commissions or revenue share.",
"marketing_agreement": "One party provides marketing, promotion, distribution, or advertising services.",
"consulting_agreement": "An individual or firm provides advisory or professional services as an independent contractor.",
"hosting_agreement": "One party provides web hosting, server hosting, application hosting, or managed infrastructure.",
"escrow_agreement": "A third-party agent holds materials (source code, documentation) with defined release triggers.",
}
)
# Instructions passed to ai_classify. Filenames carry strong signal for these
# SEC filings, so the classifier is told to trust them unless content disagrees.
CLASSIFICATION_INSTRUCTIONS = """
You are classifying SEC-filed legal agreements into exactly one of five labels.
Read the contract and assign exactly one category:
affiliate_agreement, marketing_agreement, consulting_agreement, hosting_agreement, escrow_agreement.
Decision rules:
* Give strong weight to the contract title and filename when they contain explicit type keywords such as Affiliate Agreement, Marketing Agreement, Consulting Agreement, Hosting Agreement, or Escrow Agreement.
* Only override the filename when the document content clearly and unambiguously describes a different category.
Return only the single best label.
""".strip().replace("\n", " ")
CLASSIFICATION_INSTRUCTIONS_SQL = CLASSIFICATION_INSTRUCTIONS.replace("'", "\\'")
# Shared extraction instructions appended to every per-type prompt.
EXTRACTION_BASE_INSTRUCTIONS = (
"The input is an SEC-filed legal agreement. "
"Use all available context in the input, including any document metadata. "
"Do not extract full sentences, clauses, or paragraph-length descriptions. "
"If a dollar amount is redacted (e.g., [***]), extract the surrounding structure (e.g., [***]% of revenue above [***] threshold). "
)
# Per-type extraction schemas. Each schema lists short fields that ai_extract
# will populate for every document classified into that type.
EXTRACTION_CONFIGS = {
"affiliate_agreement": {
"schema": {
"party_1_name": {"type": "string", "description": "Legal name of the first party \u2014 must be an actual company or legal entity name, not a role or generic label (e.g., not 'Affiliate', 'Company', or 'Licensor'). Extract from the preamble, recitals, signature block, or document title. Every affiliate agreement involves exactly two parties \u2014 if not found in the body text, infer from the SEC filing entity or any other available context."},
"party_2_name": {"type": "string", "description": "Legal name of the second party \u2014 must be an actual company or legal entity name, not a role or generic label (e.g., not 'Affiliate', 'Company', or 'Licensor'). Extract from the preamble, recitals, signature block, or document title. Every affiliate agreement involves exactly two parties \u2014 if not found in the body text, infer from the SEC filing entity or any other available context."},
"commission_rate": {"type": "string", "description": "Primary rate or structure in a short phrase (e.g., 50/50 revenue share, 15-25% tiered discount, $55/referral)."},
"payment_frequency": {"type": "string", "description": "How often payments are made (e.g., Monthly, Net 30, Quarterly)."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is an affiliate agreement.",
},
"marketing_agreement": {
"schema": {
"party_1_name": {"type": "string", "description": "Legal name of the first party."},
"party_2_name": {"type": "string", "description": "Legal name of the second party."},
"effective_date": {"type": "string", "description": "Contract start date (e.g., January 30, 2000)."},
"territory": {"type": "string", "description": "Geographic scope as a place name only (e.g., United States, Texas, New York). Must be an actual geographic location. If the territory references an exhibit or schedule, or no specific place is named, return null."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is a marketing agreement.",
},
"consulting_agreement": {
"schema": {
"company_name": {"type": "string", "description": "Legal name of the company engaging the consultant."},
"consultant_name": {"type": "string", "description": "Legal name of the consultant or consulting firm."},
"compensation_amount": {"type": "string", "description": "Rate or total with currency and period (e.g., EUR 500/hour, $18,000/month, $250,000 lump sum)."},
"effective_date": {"type": "string", "description": "Contract start date (e.g., May 1, 2019)."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is a consulting agreement.",
},
"hosting_agreement": {
"schema": {
"provider_name": {"type": "string", "description": "Legal name of the hosting provider."},
"customer_name": {"type": "string", "description": "Legal name of the customer."},
"effective_date": {"type": "string", "description": "Contract start date (e.g., March 1, 2005)."},
"term_length": {"type": "string", "description": "Duration or term condition as a short phrase. May be a fixed period (e.g., 12 months, 2 years) or an event-dependent term (e.g., coterminous with License Agreement, until termination of Service Agreement). Always use digits for numbers, never words."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is a hosting agreement.",
},
"escrow_agreement": {
"schema": {
"owner_name": {"type": "string", "description": "Legal name of the depositor or software developer."},
"licensee_name": {"type": "string", "description": "Legal name of the beneficiary or licensee."},
"escrow_agent_name": {"type": "string", "description": "Legal name of the escrow agent."},
"software_name": {"type": "string", "description": "Name of the escrowed software or materials."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is an escrow agreement.",
},
}
print(f"Configured {len(json.loads(CLASSIFICATION_LABELS))} classification labels")
print(f"Configured {len(EXTRACTION_CONFIGS)} extraction schemas: {', '.join(EXTRACTION_CONFIGS.keys())}")
def _flatten_extraction(contract_type: str):
"""Return a transform that filters to `contract_type`, calls ai_extract on
the batch, and flattens the JSON response into typed columns."""
config = EXTRACTION_CONFIGS[contract_type]
schema_json = json.dumps(config["schema"]).replace("'", "\\'")
instructions = config["instructions"].replace("'", "\\'")
def transform(df):
# ai_extract runs once per batch — each row gets its own extraction,
# but Spark pushes the whole batch to the AI function in parallel.
extracted = (
df.filter(F.col("contract_type") == contract_type)
.select(
F.col("path"),
F.col("contract_type"),
F.col("parsed_content"),
F.expr(
f"""
ai_extract(
parsed_content,
'{schema_json}',
MAP('instructions', '{instructions}')
)
"""
).alias("extracted"),
)
)
# Flatten the nested JSON response into top-level STRING columns.
select_cols = [F.col("path"), F.col("contract_type")]
for field_name in config["schema"]:
select_cols.append(F.expr(f"extracted:response.{field_name}::STRING").alias(field_name))
return extracted.select(*select_cols)
return transform
Camada de bronze — Ingerir PDFs brutos
Leia arquivos PDF como binários usando o formato binaryFile do Spark. Cada linha contém o caminho do arquivo, o conteúdo bruto em bytes, o tamanho e o carimbo de data/hora da modificação.
Dica de produção: Para ingestão incremental, substitua
spark.readpor Auto Loader (formatocloudFiles) para que apenas os novos arquivos sejam processados em cada execução.
raw_contracts_df = spark.read.format("binaryFile").load(SOURCE_PATH)
print(f"Loaded {raw_contracts_df.count()} documents from {SOURCE_PATH}")
display(raw_contracts_df.select("path", "length", "modificationTime"))
Camada Prateada — Analisar e Classificar
Parse — ai_parse_document converte bytes PDF brutos em uma VARIANT estruturada contendo elementos do documento, metadados de provisão e informações do arquivo.
Classificar — ai_classify aceita a saída VARIANT diretamente de ai_parse_document — não há necessidade de converter para strings. Documentos com erros de análise sintática são filtrados antes da classificação. O classificador atribui grande peso às palavras-chave dos nomes de arquivo, a menos que o conteúdo do documento as contradiga claramente.
parsed_contracts_df = raw_contracts_df.select(
F.col("path"),
F.expr("ai_parse_document(content, MAP('version', '2.0'))").alias("parsed_content"),
)
# Materialize parsed results to a temp table so downstream steps
# read from the table rather than re-invoking ai_parse_document.
_parsed_table = f"_tmp_idp_parsed_{_TMP_SUFFIX}"
parsed_contracts_df.write.mode("overwrite").saveAsTable(_parsed_table)
parsed_contracts_df = spark.table(_parsed_table)
num_parsed = parsed_contracts_df.count()
print(f"Parsed {num_parsed} documents")
display(parsed_contracts_df.limit(5))
classified_contracts_df = (
parsed_contracts_df
.filter("TRY_CAST(parsed_content:error_status AS STRING) IS NULL")
.select(
F.col("path"),
F.col("parsed_content"),
F.expr(
f"""
ai_classify(
parsed_content,
'{CLASSIFICATION_LABELS}',
MAP('instructions', '{CLASSIFICATION_INSTRUCTIONS_SQL}')
)
"""
).alias("classification"),
)
.select(
F.col("path"),
F.col("parsed_content"),
F.col("classification"),
F.expr("classification:response[0]::STRING").alias("contract_type"),
)
)
# Materialize classified results to a temp table so each gold-layer
# extraction reads from the table rather than re-invoking ai_classify.
_classified_table = f"_tmp_idp_classified_{_TMP_SUFFIX}"
classified_contracts_df.write.mode("overwrite").saveAsTable(_classified_table)
classified_contracts_df = spark.table(_classified_table)
num_classified = classified_contracts_df.count()
print(f"Classified {num_classified} documents")
display(classified_contracts_df.select("path", "contract_type"))
Camada de ouro — Extrair campos estruturados
Cada tipo de acordo possui um esquema de extração específico com três a quatro campos curtos. ai_extract extrai nomes de partes, datas, valores em dólares e frases curtas de cada documento classificado. O loop abaixo processa todos os cinco tipos e exibe os resultados diretamente na tela.
gold_dfs = {}
for contract_type in EXTRACTION_CONFIGS:
transform = _flatten_extraction(contract_type)
gold_df = transform(classified_contracts_df)
gold_dfs[contract_type] = gold_df
print(f"\n{'=' * 60}")
print(f" {contract_type.replace('_', ' ').title()}")
print(f"{'=' * 60}")
display(gold_df)
(Opcional) Persistir em tabelas Delta
Para salvar resultados para fluxo de trabalho downstream, analítico ou dashboards, remova o comentário da célula abaixo e defina seu catálogo e esquema de destino. É recomendável persistir os documentos analisados — isso permite que execuções futuras ignorem a etapa de análise e leiam diretamente da tabela.
# Uncomment and configure to persist tables
# TARGET_CATALOG = "your_catalog"
# TARGET_SCHEMA = "your_schema"
#
# # Parsed documents — persist to avoid re-running ai_parse_document
# parsed_contracts_df.write.mode("overwrite").saveAsTable(
# f"{TARGET_CATALOG}.{TARGET_SCHEMA}.parsed_contracts"
# )
# print(f"Wrote parsed contracts to {TARGET_CATALOG}.{TARGET_SCHEMA}.parsed_contracts")
#
# # Classified documents
# classified_contracts_df.select("path", "contract_type").write.mode("overwrite").saveAsTable(
# f"{TARGET_CATALOG}.{TARGET_SCHEMA}.classified_contracts"
# )
# print(f"Wrote classifications to {TARGET_CATALOG}.{TARGET_SCHEMA}.classified_contracts")
#
# # Gold tables — one per agreement type
# for contract_type, gold_df in gold_dfs.items():
# table_name = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.gold_{contract_type}s"
# gold_df.write.mode("overwrite").saveAsTable(table_name)
# print(f"Wrote to {table_name}")
#
# print("Done — all tables persisted.")