Pular para o conteúdo principal

Inteligência de Documentos com tecnologia de AI Functions

Este tutorial demonstra um pipeline completo de **Processamento Inteligente de Documentos (IDP)** usando três AI Functions do Databricks.

Text
┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ BRONZE SILVER GOLD │
│ │
│ ┌── gold_consulting_agreements (scope, compensation, ...) │
│ ├── gold_marketing_agreements (territory, campaign, ...) │
│ raw_contracts ──▶ parsed_contracts ──▶ classified_contracts ──▶ extracted fields ──▶ ├── gold_hosting_agreements (SLA, uptime, fees, ...) │
│ (Auto Loader) (ai_parse_document) (ai_classify) (ai_extract) └── gold_affiliate_agreements (commission, terms, ...) │
│ │
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

O pipeline processa acordos legais protocolados na SEC, classificando cada um em uma das cinco categorias (afiliado, marketing, consultoria, hospedagem, garantia) e extraindo termos relevantes como nomes de partes, datas e detalhes de compensação.

Pré-requisitos

  • Compute serverless ou um SQL warehouse com AI Functions habilitadas
  • Acesso ao volume de dados de exemplo em /Volumes/samples/sec/contracts/
nota

O conjunto de dados samples.sec.contracts está disponível em todos os workspaces por default. Para processar seus próprios PDFs, altere SOURCE_PATH na próxima célula para um volume do Unity Catalog que contém seus arquivos. Para ingestão de PDFs de fontes externas como SharePoint ou Google Drive, o Databricks recomenda o Lakeflow Connect.

Python
from pyspark.sql import functions as F
import json
import uuid

# Source path — point this at a Unity Catalog volume containing your PDF files.
# The sample path below contains SEC-filed legal agreements.
SOURCE_PATH = "/Volumes/samples/sec/contracts/"

# Serverless compute does not support .cache(), so intermediate results are
# materialized to temp tables instead. A random suffix avoids collisions
# if multiple users run the tutorial concurrently.
_TMP_SUFFIX = uuid.uuid4().hex[:8]

Configuração

Os rótulos de classificação informam ai_classify quais categorias escolher. Cada esquema de extração define os campos curtos e tipados que ai_extract extrairá para esse tipo de acordo.

Modifique-os para adaptar o pipeline aos seus próprios tipos de documento.

Python
# One-line descriptions used by ai_classify to pick the best label per document.
CLASSIFICATION_LABELS = json.dumps(
{
"affiliate_agreement": "One party refers customers or resells products for commissions or revenue share.",
"marketing_agreement": "One party provides marketing, promotion, distribution, or advertising services.",
"consulting_agreement": "An individual or firm provides advisory or professional services as an independent contractor.",
"hosting_agreement": "One party provides web hosting, server hosting, application hosting, or managed infrastructure.",
"escrow_agreement": "A third-party agent holds materials (source code, documentation) with defined release triggers.",
}
)

# Instructions passed to ai_classify. Filenames carry strong signal for these
# SEC filings, so the classifier is told to trust them unless content disagrees.
CLASSIFICATION_INSTRUCTIONS = """
You are classifying SEC-filed legal agreements into exactly one of five labels.
Read the contract and assign exactly one category:
affiliate_agreement, marketing_agreement, consulting_agreement, hosting_agreement, escrow_agreement.

Decision rules:
* Give strong weight to the contract title and filename when they contain explicit type keywords such as Affiliate Agreement, Marketing Agreement, Consulting Agreement, Hosting Agreement, or Escrow Agreement.
* Only override the filename when the document content clearly and unambiguously describes a different category.

Return only the single best label.
""".strip().replace("\n", " ")
CLASSIFICATION_INSTRUCTIONS_SQL = CLASSIFICATION_INSTRUCTIONS.replace("'", "\\'")

# Shared extraction instructions appended to every per-type prompt.
EXTRACTION_BASE_INSTRUCTIONS = (
"The input is an SEC-filed legal agreement. "
"Use all available context in the input, including any document metadata. "
"Do not extract full sentences, clauses, or paragraph-length descriptions. "
"If a dollar amount is redacted (e.g., [***]), extract the surrounding structure (e.g., [***]% of revenue above [***] threshold). "
)

# Per-type extraction schemas. Each schema lists short fields that ai_extract
# will populate for every document classified into that type.
EXTRACTION_CONFIGS = {
"affiliate_agreement": {
"schema": {
"party_1_name": {"type": "string", "description": "Legal name of the first party \u2014 must be an actual company or legal entity name, not a role or generic label (e.g., not 'Affiliate', 'Company', or 'Licensor'). Extract from the preamble, recitals, signature block, or document title. Every affiliate agreement involves exactly two parties \u2014 if not found in the body text, infer from the SEC filing entity or any other available context."},
"party_2_name": {"type": "string", "description": "Legal name of the second party \u2014 must be an actual company or legal entity name, not a role or generic label (e.g., not 'Affiliate', 'Company', or 'Licensor'). Extract from the preamble, recitals, signature block, or document title. Every affiliate agreement involves exactly two parties \u2014 if not found in the body text, infer from the SEC filing entity or any other available context."},
"commission_rate": {"type": "string", "description": "Primary rate or structure in a short phrase (e.g., 50/50 revenue share, 15-25% tiered discount, $55/referral)."},
"payment_frequency": {"type": "string", "description": "How often payments are made (e.g., Monthly, Net 30, Quarterly)."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is an affiliate agreement.",
},
"marketing_agreement": {
"schema": {
"party_1_name": {"type": "string", "description": "Legal name of the first party."},
"party_2_name": {"type": "string", "description": "Legal name of the second party."},
"effective_date": {"type": "string", "description": "Contract start date (e.g., January 30, 2000)."},
"territory": {"type": "string", "description": "Geographic scope as a place name only (e.g., United States, Texas, New York). Must be an actual geographic location. If the territory references an exhibit or schedule, or no specific place is named, return null."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is a marketing agreement.",
},
"consulting_agreement": {
"schema": {
"company_name": {"type": "string", "description": "Legal name of the company engaging the consultant."},
"consultant_name": {"type": "string", "description": "Legal name of the consultant or consulting firm."},
"compensation_amount": {"type": "string", "description": "Rate or total with currency and period (e.g., EUR 500/hour, $18,000/month, $250,000 lump sum)."},
"effective_date": {"type": "string", "description": "Contract start date (e.g., May 1, 2019)."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is a consulting agreement.",
},
"hosting_agreement": {
"schema": {
"provider_name": {"type": "string", "description": "Legal name of the hosting provider."},
"customer_name": {"type": "string", "description": "Legal name of the customer."},
"effective_date": {"type": "string", "description": "Contract start date (e.g., March 1, 2005)."},
"term_length": {"type": "string", "description": "Duration or term condition as a short phrase. May be a fixed period (e.g., 12 months, 2 years) or an event-dependent term (e.g., coterminous with License Agreement, until termination of Service Agreement). Always use digits for numbers, never words."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is a hosting agreement.",
},
"escrow_agreement": {
"schema": {
"owner_name": {"type": "string", "description": "Legal name of the depositor or software developer."},
"licensee_name": {"type": "string", "description": "Legal name of the beneficiary or licensee."},
"escrow_agent_name": {"type": "string", "description": "Legal name of the escrow agent."},
"software_name": {"type": "string", "description": "Name of the escrowed software or materials."},
},
"instructions": f"{EXTRACTION_BASE_INSTRUCTIONS} This is an escrow agreement.",
},
}

print(f"Configured {len(json.loads(CLASSIFICATION_LABELS))} classification labels")
print(f"Configured {len(EXTRACTION_CONFIGS)} extraction schemas: {', '.join(EXTRACTION_CONFIGS.keys())}")
Python
def _flatten_extraction(contract_type: str):
"""Return a transform that filters to `contract_type`, calls ai_extract on
the batch, and flattens the JSON response into typed columns."""
config = EXTRACTION_CONFIGS[contract_type]
schema_json = json.dumps(config["schema"]).replace("'", "\\'")
instructions = config["instructions"].replace("'", "\\'")

def transform(df):
# ai_extract runs once per batch — each row gets its own extraction,
# but Spark pushes the whole batch to the AI function in parallel.
extracted = (
df.filter(F.col("contract_type") == contract_type)
.select(
F.col("path"),
F.col("contract_type"),
F.col("parsed_content"),
F.expr(
f"""
ai_extract(
parsed_content,
'{schema_json}',
MAP('instructions', '{instructions}')
)
"""
).alias("extracted"),
)
)

# Flatten the nested JSON response into top-level STRING columns.
select_cols = [F.col("path"), F.col("contract_type")]
for field_name in config["schema"]:
select_cols.append(F.expr(f"extracted:response.{field_name}::STRING").alias(field_name))

return extracted.select(*select_cols)

return transform

Camada Bronze — Ingerir PDFs Brutos

Leia arquivos PDF como binário usando o formato binaryFile do Spark. Cada linha contém o caminho do arquivo, bytes de conteúdo brutos, comprimento e carimbo de data/hora de modificação.

Dica de produção: para ingestão incremental, substitua spark.read por Auto Loader (formato cloudFiles) para que apenas novos arquivos sejam processados a cada execução.

Python
raw_contracts_df = spark.read.format("binaryFile").load(SOURCE_PATH)

print(f"Loaded {raw_contracts_df.count()} documents from {SOURCE_PATH}")
display(raw_contracts_df.select("path", "length", "modificationTime"))

Camada Silver — Análise e Classificação

Analisarai_parse_document converte bytes brutos de PDF em um VARIANT estruturado contendo elementos do documento, metadados de disposição e informações do arquivo.

Classifyai_classify aceita a saída VARIANT diretamente de ai_parse_document — sem a necessidade de converter para strings. Documentos com erros de análise são filtrados antes da classificação. O classificador dá um peso forte às palavras-chave de nome de arquivo, a menos que o conteúdo do documento as contradiga claramente.

Python
parsed_contracts_df = raw_contracts_df.select(
F.col("path"),
F.expr("ai_parse_document(content, MAP('version', '2.0'))").alias("parsed_content"),
)

# Materialize parsed results to a temp table so downstream steps
# read from the table rather than re-invoking ai_parse_document.
_parsed_table = f"_tmp_idp_parsed_{_TMP_SUFFIX}"
parsed_contracts_df.write.mode("overwrite").saveAsTable(_parsed_table)
parsed_contracts_df = spark.table(_parsed_table)

num_parsed = parsed_contracts_df.count()
print(f"Parsed {num_parsed} documents")
display(parsed_contracts_df.limit(5))
Python
classified_contracts_df = (
parsed_contracts_df
.filter("TRY_CAST(parsed_content:error_status AS STRING) IS NULL")
.select(
F.col("path"),
F.col("parsed_content"),
F.expr(
f"""
ai_classify(
parsed_content,
'{CLASSIFICATION_LABELS}',
MAP('instructions', '{CLASSIFICATION_INSTRUCTIONS_SQL}')
)
"""
).alias("classification"),
)
.select(
F.col("path"),
F.col("parsed_content"),
F.col("classification"),
F.expr("classification:response[0]::STRING").alias("contract_type"),
)
)

# Materialize classified results to a temp table so each gold-layer
# extraction reads from the table rather than re-invoking ai_classify.
_classified_table = f"_tmp_idp_classified_{_TMP_SUFFIX}"
classified_contracts_df.write.mode("overwrite").saveAsTable(_classified_table)
classified_contracts_df = spark.table(_classified_table)

num_classified = classified_contracts_df.count()
print(f"Classified {num_classified} documents")
display(classified_contracts_df.select("path", "contract_type"))

Camada Ouro — Extrair Campos Estruturados

Cada tipo de acordo tem um esquema de extração dedicado com três a quatro campos curtos. ai_extract extrai nomes de partes, datas, valores em dólar e frases curtas de cada documento classificado. O loop abaixo processa todos os cinco tipos e exibe os resultados diretamente.

Python
gold_dfs = {}

for contract_type in EXTRACTION_CONFIGS:
transform = _flatten_extraction(contract_type)
gold_df = transform(classified_contracts_df)
gold_dfs[contract_type] = gold_df

print(f"\n{'=' * 60}")
print(f" {contract_type.replace('_', ' ').title()}")
print(f"{'=' * 60}")
display(gold_df)

(Opcional) Persistir em tabelas Delta

Para salvar os resultados para fluxos de trabalho, analítica ou dashboards downstream, descomente a célula abaixo e defina seu catálogo e esquema de destino. É recomendado persistir os documentos analisados — isso permite que execuções futuras pulem o passo de análise e leiam diretamente da tabela.

Python
# Uncomment and configure to persist tables
# TARGET_CATALOG = "your_catalog"
# TARGET_SCHEMA = "your_schema"
#
# # Parsed documents — persist to avoid re-running ai_parse_document
# parsed_contracts_df.write.mode("overwrite").saveAsTable(
# f"{TARGET_CATALOG}.{TARGET_SCHEMA}.parsed_contracts"
# )
# print(f"Wrote parsed contracts to {TARGET_CATALOG}.{TARGET_SCHEMA}.parsed_contracts")
#
# # Classified documents
# classified_contracts_df.select("path", "contract_type").write.mode("overwrite").saveAsTable(
# f"{TARGET_CATALOG}.{TARGET_SCHEMA}.classified_contracts"
# )
# print(f"Wrote classifications to {TARGET_CATALOG}.{TARGET_SCHEMA}.classified_contracts")
#
# # Gold tables — one per agreement type
# for contract_type, gold_df in gold_dfs.items():
# table_name = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.gold_{contract_type}s"
# gold_df.write.mode("overwrite").saveAsTable(table_name)
# print(f"Wrote to {table_name}")
#
# print("Done — all tables persisted.")

Notebook de exemplo

Inteligência de Documentos com tecnologia de AI Functions

Abrir notebook em uma nova aba