Trabalhar com dados não estruturados em grandes volumes.
Esta página mostra como armazenar, consultar e processar arquivos de dados não estruturados usando volumes do Unity Catalog. Você aprenderá como upload arquivos, consultar metadados, processar arquivos com funções AI , aplicar controle de acesso e compartilhar volumes com outras organizações. Sempre que possível, foram incluídas instruções para realizar este tutorial utilizando a interface do usuário do Catalog Explorer. Caso a opção Catalog Explorer não seja exibida, utilize o comando Python ou SQL fornecido.
Para uma visão geral completa dos recursos e casos de uso de volumes, consulte O que são volumes Unity Catalog ?.
Requisitos
- Um workspace Databricks com Unity Catalog ativado.
CREATE CATALOGprivilégio no metastore. Consulte Criar catálogos. Se você não conseguir criar um catálogo, peça acesso ao seu administrador ou use um catálogo existente onde você tenha o privilégioCREATE SCHEMA.- Databricks Runtime 14.3 LTS e versões superiores.
- Para funções AI : Um workspace em uma região compatível.
- Para Delta Sharing: privilégios
CREATE SHAREeCREATE RECIPIENTno metastore. Consulte Compartilhe dados e AI ativa com segurança.
o passo 1: Criar um volume
Crie um catálogo, um esquema e um volume para armazenar seus arquivos. Para obter instruções detalhadas sobre o gerenciamento de volumes, consulte Criar e gerenciar volumes Unity Catalog.
o passo 1.1: Criar um catálogo e um esquema
- SQL
- Python
- Catalog Explorer
-- Create a catalog
CREATE CATALOG IF NOT EXISTS unstructured_data_lab;
USE CATALOG unstructured_data_lab;
-- Create a schema
CREATE SCHEMA IF NOT EXISTS raw;
USE SCHEMA raw;
spark.sql("CREATE CATALOG IF NOT EXISTS unstructured_data_lab")
spark.sql("USE CATALOG unstructured_data_lab")
spark.sql("CREATE SCHEMA IF NOT EXISTS raw")
spark.sql("USE SCHEMA raw")
- Clique
Catálogo na barra lateral.
- Clique em Criar > Criar um catálogo .
- Insira unstructured_data_lab como o nome do catálogo .
- Clique em Criar .
- Clique em " Ver catálogo" .
Na página do catálogo:
- Clique em Criar esquema .
- Insira "raw" como nome do esquema .
- Clique em Criar .
o passo 1.2: Crie um volume de gerenciamento
- SQL
- Python
- Catalog Explorer
CREATE VOLUME IF NOT EXISTS files_volume
COMMENT 'Volume for storing unstructured data files';
spark.sql("""
CREATE VOLUME IF NOT EXISTS files_volume
COMMENT 'Volume for storing unstructured data files'
""")
Na página de esquema:
- Clique em Criar > Volume .
- Insira files_volume como o nome do volume .
- Verifique se a opção "Gerenciar volume" está selecionada.
- Clique em Criar .
o passo 2: carregar arquivos
Faça upload de arquivos para o seu volume. Para exemplos abrangentes de gerenciamento de arquivos, consulte Trabalhar com arquivos em volumes Unity Catalog.
o passo 2.1: carregar arquivos
Você pode usar exemplos de databricks-datasets para este tutorial ou upload seus próprios arquivos usando a interface do Explorador de Catálogo.
Você pode usar o comando Python para copiar arquivos de databricks-datasets para o seu volume mesmo que você não esteja familiarizado com Python. Consulte o Gerenciador de Notebook para obter instruções sobre como executar comandos no Notebook.
- Python
- Catalog Explorer
# Upload a single image file
dbutils.fs.cp(
"dbfs:/databricks-datasets/flower_photos/roses/10090824183_d02c613f10_m.jpg",
"/Volumes/unstructured_data_lab/raw/files_volume/rose.jpg"
)
# Upload a single PDF file
dbutils.fs.cp(
"dbfs:/databricks-datasets/COVID/CORD-19/2020-03-13/COVID.DATA.LIC.AGMT.pdf",
"/Volumes/unstructured_data_lab/raw/files_volume/covid.pdf"
)
# Upload a directory
local_dir = "dbfs:/databricks-datasets/samples/data/mllib"
volume_path = "/Volumes/unstructured_data_lab/raw/files_volume/sample_files"
for file_info in dbutils.fs.ls(local_dir):
source = file_info.path
dest = f"{volume_path}/{file_info.name}"
dbutils.fs.cp(source, dest, recurse=True)
print(f"Uploaded: {file_info.name}")
O código Python na tab Python carrega dois arquivos (um JPG e um PDF) e um diretório que inclui os arquivos .txt e .csv . Para upload arquivos usando o Explorador de Catálogo:
- Na página do volume, clique em "Enviar para este volume" .
- Na caixa de diálogo de carregamento de arquivos , em Arquivos , clique em Procurar ou arraste e solte os arquivos na área de destino.
- Em Volume de destino , verifique se o volume que você criou na etapa anterior está selecionado.
o passo 2.2: Verificar o upload
- SQL
- Python
- Catalog Explorer
LIST '/Volumes/unstructured_data_lab/raw/files_volume/';
files = dbutils.fs.ls("/Volumes/unstructured_data_lab/raw/files_volume/")
for f in files:
print(f"{f.name}\t{f.size} bytes")
Quando os arquivos são carregados, eles aparecem na página de volumes. Clique no nome de um arquivo para ver uma pré-visualização ou clique em um diretório para view arquivos individuais.
Alternativa: Use o comando mágico %fs
Use o comando mágico %fs :
%fs ls /Volumes/unstructured_data_lab/raw/files_volume/
o passo 3: Consultar metadados do arquivo
Consulte as informações do arquivo para entender o que há em seu volume. Para obter mais padrões de consulta, consulte Listar e consultar arquivos em volumes com SQL.
o passo 3.1: Mostrar metadados do arquivo
- SQL
- Python
- Catalog Explorer
SELECT
path,
_metadata.file_name,
_metadata.file_size,
_metadata.file_modification_time
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile'
);
df = (
spark.read
.format("binaryFile")
.option("recursiveFileLookup", "true")
.load("/Volumes/unstructured_data_lab/raw/files_volume/")
)
df.select("path", "modificationTime", "length").show(truncate=False)
A página de volumes no Explorador de Catálogos mostra o nome de cada arquivo (incluindo a extensão), o tamanho e a data da última modificação .
o passo 4: Consultar e processar arquivos
Utilize as funções AI Databricks para extrair conteúdo de documentos e analisar imagens. Para uma visão geral completa das funcionalidades de AI , consulte Aplicar AI no uso de dados Databricks AI Functions.
As funções AI requerem um workspace em uma região compatível. Consulte Aplicar AI no uso de dados Databricks AI Functions.
Se você não tiver acesso às funções AI , use a biblioteca padrão Python . Expanda as seções Alternativas abaixo para ver exemplos.
o passo 4.1: Analisar documentos
- SQL
- Python
SELECT
path AS file_path,
ai_parse_document(content, map('version', '2.0')) AS parsed_content
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile',
fileNamePattern => '*.pdf'
);
result_df = spark.sql("""
SELECT
path AS file_path,
ai_parse_document(content, map('version', '2.0')) AS parsed_content
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile',
fileNamePattern => '*.pdf'
)
""")
display(result_df)
Alternativa: Analisar PDFs sem funções AI
Se as funções AI não estiverem disponíveis na sua região, use a biblioteca Python :
%pip install PyPDF2
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from PyPDF2 import PdfReader
import io
@udf(returnType=StringType())
def extract_pdf_text(content):
if content is None:
return None
try:
reader = PdfReader(io.BytesIO(content))
return "\n".join(page.extract_text() or "" for page in reader.pages)
except Exception as e:
return f"Error: {str(e)}"
df = spark.read.format("binaryFile") \
.option("pathGlobFilter", "*.pdf") \
.load("/Volumes/unstructured_data_lab/raw/files_volume/")
result_df = df.withColumn("text_content", extract_pdf_text("content"))
display(result_df.select("path", "text_content"))
o passo 4.2: Analisar imagens
- SQL
- Python
SELECT
path,
ai_query(
'databricks-llama-4-maverick',
'Describe this image in one sentence:',
files => content
) AS description
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile',
fileNamePattern => '*.{jpg,jpeg,png}'
)
WHERE _metadata.file_size < 5000000;
result_df = spark.sql("""
SELECT
path,
ai_query(
'databricks-llama-4-maverick',
'Describe this image in one sentence:',
files => content
) AS description
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile',
fileNamePattern => '*.{jpg,jpeg,png}'
)
WHERE _metadata.file_size < 5000000
""")
display(result_df)
Alternativa: Extrair metadados de imagens sem funções AI
Para extrair metadados de imagens sem o uso AI :
%pip install pillow
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from PIL import Image
import io
image_schema = StructType([
StructField("width", IntegerType()),
StructField("height", IntegerType()),
StructField("format", StringType())
])
@udf(returnType=image_schema)
def get_image_info(content):
if content is None:
return None
try:
img = Image.open(io.BytesIO(content))
return {"width": img.width, "height": img.height, "format": img.format}
except:
return None
df = spark.read.format("binaryFile") \
.option("pathGlobFilter", "*.{jpg,jpeg,png}") \
.load("/Volumes/unstructured_data_lab/raw/files_volume/")
result_df = df.withColumn("image_info", get_image_info("content"))
display(result_df.select("path", "image_info.*"))
o passo 4.3: Filtrar e analisar por nome de arquivo
Este exemplo filtra arquivos de imagem que contenham a substring "rose" em seus nomes.
- SQL
- Python
SELECT
path AS file_path,
ai_query(
'databricks-llama-4-maverick',
'Describe this image in one sentence:',
files => content
) AS description
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile',
fileNamePattern => '*.{jpg,jpeg,png}'
)
WHERE _metadata.file_name ILIKE '%rose%';
result_df = spark.sql("""
SELECT
path AS file_path,
ai_query(
'databricks-llama-4-maverick',
'Describe this image in one sentence:',
files => content
) AS description
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile',
fileNamePattern => '*.{jpg,jpeg,png}'
)
WHERE _metadata.file_name ILIKE '%rose%'
""")
display(result_df)
o passo 4.4: unir arquivos com tabelas estruturadas
Este exemplo utiliza números de linha para associar arquivos a viagens de táxi para fins de demonstração. Na produção, join com base em fatores-chave relevantes para o negócio.
- SQL
- Python
-- This example demonstrates joining file metadata with structured data
-- by pairing files with taxi trips using row numbers
WITH files_with_row AS (
SELECT
path,
SPLIT(path, '/')[SIZE(SPLIT(path, '/')) - 1] AS file_name,
length,
ROW_NUMBER() OVER (ORDER BY path) AS file_row
FROM read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/',
format => 'binaryFile'
)
),
trips_with_row AS (
SELECT
tpep_pickup_datetime,
pickup_zip,
dropoff_zip,
fare_amount,
ROW_NUMBER() OVER (ORDER BY tpep_pickup_datetime) AS trip_row
FROM samples.nyctaxi.trips
WHERE pickup_zip IS NOT NULL
LIMIT 5
)
SELECT
f.path,
f.file_name,
f.length,
t.pickup_zip,
t.dropoff_zip,
t.fare_amount,
t.tpep_pickup_datetime
FROM files_with_row f
INNER JOIN trips_with_row t ON f.file_row = t.trip_row;
from pyspark.sql.functions import col, row_number, element_at, split
from pyspark.sql.window import Window
# Read files and add row numbers
files_df = spark.read.format("binaryFile") \
.load("/Volumes/unstructured_data_lab/raw/files_volume/") \
.withColumn("file_name", element_at(split(col("path"), "/"), -1))
files_with_row = files_df.alias("files") \
.withColumn("file_row", row_number().over(Window.orderBy("path")))
# Get trips and add row numbers
trips_df = spark.table("samples.nyctaxi.trips") \
.filter(col("pickup_zip").isNotNull()) \
.limit(5)
trips_with_row = trips_df.alias("trips") \
.withColumn("trip_row", row_number().over(Window.orderBy("tpep_pickup_datetime")))
# Join on row numbers
result_df = files_with_row \
.join(trips_with_row, col("file_row") == col("trip_row"), "inner") \
.select(
"files.path",
"files.file_name",
"files.length",
"trips.pickup_zip",
"trips.dropoff_zip",
"trips.fare_amount",
"trips.tpep_pickup_datetime"
)
display(result_df)
o passo 5: Aplicar controle de acesso
Controle quem pode ler e gravar arquivos em seus volumes. Para saber mais sobre como gerenciar privilégios no Unity Catalog, consulte Gerenciar privilégios no Unity Catalog.
o passo 5.1: Conceder acesso
- SQL
- Python
- Catalog Explorer
-- Replace <user-or-group-name> with your workspace group or user name
-- Grant read access
GRANT READ VOLUME ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`;
-- Grant read and write access
GRANT READ VOLUME, WRITE VOLUME ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`;
-- Grant all privileges
GRANT ALL PRIVILEGES ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`;
# Replace <user-or-group-name> with your workspace group or user name
spark.sql("""
GRANT READ VOLUME ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`
""")
spark.sql("""
GRANT READ VOLUME, WRITE VOLUME ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`
""")
spark.sql("""
GRANT ALL PRIVILEGES ON VOLUME unstructured_data_lab.raw.files_volume
TO `<user-or-group-name>`
""")
- Acesse a tab Permissões na página do volume.
- Clique em Conceder .
- Insira o endereço email de um usuário ou o nome de um grupo.
- Selecione as permissões que deseja conceder.
- Clique em Confirmar .
o passo 5.2: visualizar privilégios atuais
- SQL
- Python
- Catalog Explorer
SHOW GRANTS ON VOLUME unstructured_data_lab.raw.files_volume;
display(spark.sql("SHOW GRANTS ON VOLUME unstructured_data_lab.raw.files_volume"))
A tab Permissões na página do volume mostra quais usuários e grupos têm acesso ao volume.
o passo 6: Configurar a ingestão incremental
Utilize o Auto Loader para processar automaticamente novos arquivos à medida que chegam ao seu volume. Este padrão é útil para ingestão contínua de fluxo de dados de trabalho. Para obter mais informações sobre padrões de ingestão, consulte Padrões comuns de carregamento de dados.
o passo 6.1: Criar uma tabela de transmissão
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE document_ingestion
SCHEDULE EVERY 1 HOUR
AS SELECT
path,
modificationTime,
length,
content,
_metadata,
current_timestamp() AS ingestion_time
FROM STREAM(read_files(
'/Volumes/unstructured_data_lab/raw/files_volume/incoming/',
format => 'binaryFile'
));
from pyspark.sql.functions import current_timestamp, col
dbutils.fs.mkdirs("/Volumes/unstructured_data_lab/raw/files_volume/incoming/")
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobFilter", "*.pdf") \
.load("/Volumes/unstructured_data_lab/raw/files_volume/incoming/")
df_enriched = df \
.withColumn("ingestion_time", current_timestamp()) \
.withColumn("source_file", col("_metadata.file_path"))
query = df_enriched.writeStream \
.option("checkpointLocation",
"/Volumes/unstructured_data_lab/raw/files_volume/_checkpoints/docs") \
.trigger(availableNow=True) \
.toTable("document_ingestion")
query.awaitTermination()
o passo 7: Compartilhar arquivos com Delta Sharing
Compartilhe volumes com segurança com usuários de outras organizações usando Delta Sharing. Você precisa criar um destinatário antes de compartilhar. Um destinatário representa uma organização ou usuário externo que pode acessar seus dados compartilhados. Consulte Criar e gerenciar destinatários de dados para Delta Sharing (Compartilhamento Databricks-to-Databricks ) para obter informações sobre a configuração de destinatários.
o passo 7.1: Criar e configurar um compartilhamento
- SQL
- Python
-- Create a share
CREATE SHARE IF NOT EXISTS unstructured_data_share
COMMENT 'Document files for partners';
-- Add the volume
ALTER SHARE unstructured_data_share
ADD VOLUME unstructured_data_lab.raw.files_volume;
-- Create a recipient
CREATE RECIPIENT IF NOT EXISTS <partner_org>
USING ID '<recipient-sharing-identifier>';
-- Grant access
GRANT SELECT ON SHARE unstructured_data_share
TO RECIPIENT <partner_org>;
spark.sql("""
CREATE SHARE IF NOT EXISTS unstructured_data_share
COMMENT 'Document files for partners'
""")
spark.sql("""
ALTER SHARE unstructured_data_share
ADD VOLUME unstructured_data_lab.raw.files_volume
""")
spark.sql("""
CREATE RECIPIENT IF NOT EXISTS <partner_org>
USING ID '<recipient-sharing-identifier>'
""")
spark.sql("""
GRANT SELECT ON SHARE unstructured_data_share
TO RECIPIENT <partner_org>
""")
o passo 7.2: Acessar dados compartilhados (como destinatário)
- SQL
- Python
-- View available shares
SHOW SHARES IN PROVIDER <provider_name>;
-- Create a catalog from the share
CREATE CATALOG IF NOT EXISTS shared_documents
FROM SHARE <provider_name>.unstructured_data_share;
-- Query shared files
SELECT * EXCEPT (content), _metadata
FROM read_files(
'/Volumes/shared_documents/raw/files_volume/',
format => 'binaryFile'
)
LIMIT 10;
spark.sql("SHOW SHARES IN PROVIDER <provider_name>").show()
spark.sql("""
CREATE CATALOG IF NOT EXISTS shared_documents
FROM SHARE <provider_name>.unstructured_data_share
""")
df = spark.read.format("binaryFile") \
.load("/Volumes/shared_documents/raw/files_volume/")
df.select("path", "modificationTime", "length").show(10)
o passo 8: Limpar arquivos
Remova os arquivos quando não forem mais necessários.
- Python
- CLI
# Delete a single file
dbutils.fs.rm("/Volumes/unstructured_data_lab/raw/files_volume/covid.pdf")
# Delete a directory recursively
dbutils.fs.rm("/Volumes/unstructured_data_lab/raw/files_volume/sample_files/", recurse=True)
# Delete a single file
databricks fs rm dbfs:/Volumes/unstructured_data_lab/raw/files_volume/covid.pdf
# Delete a directory recursively
databricks fs rm -r dbfs:/Volumes/unstructured_data_lab/raw/files_volume/sample_files/
Alternativa: Use o Python padrão.
import os
os.remove("/Volumes/unstructured_data_lab/raw/files_volume/covid.pdf")
import shutil
shutil.rmtree("/Volumes/unstructured_data_lab/raw/files_volume/sample_files/")
Próximos passos
Continue aprendendo sobre volumes
- O que são volumes Unity Catalog ?
- Trabalhar com arquivos em volumes Unity Catalog
- Criar e gerenciar volumes Unity Catalog
Explore as funcionalidades relacionadas.
- Aplicar AI no uso de dados Databricks AI Functions
- Padrões comuns de carregamento de dados
- Compartilhe dados e AI ativo com segurança