Coluna de metadados do arquivo

Você pode obter informações de metadados para arquivos de entrada com a coluna _metadata . A coluna _metadata é uma coluna oculta e está disponível para todos os formatos de arquivo de entrada. Para incluir a coluna _metadata no DataFrame retornado, você deve referenciá-la explicitamente em sua query.

Se a fonte de dados contiver uma coluna denominada _metadata, query retornará a coluna da fonte de dados e não os metadados do arquivo.

Aviso

Novos campos podem ser adicionados à coluna _metadata em versões futuras. Para evitar erros de evolução do esquema se a coluna _metadata for atualizada, Databricks recomenda selecionar campos específicos da coluna em sua query. Veja exemplos.

Metadados compatíveis

A coluna _metadata é um STRUCT contendo os seguintes campos:

Nome

Tipo

Descrição

Exemplo

Versão mínima do Databricks Runtime

caminho de arquivo

STRING

Caminho de arquivo do arquivo de entrada.

file:/tmp/f0.csv

10.5

nome do arquivo

STRING

Nome do arquivo de entrada junto com sua extensão.

f0.csv

10.5

tamanho do arquivo

LONG

Comprimento do arquivo de entrada, em bytes.

628

10.5

file_modification_time

TIMESTAMP

Carimbo de data/hora da última modificação do arquivo de entrada.

2021-12-20 20:05:21

10.5

file_block_start

LONG

começar offset do bloco que está sendo lido, em bytes.

0

13,0

file_block_length

LONG

Comprimento do bloco que está sendo lido, em bytes.

628

13,0

Exemplos

Use em um leitor de fonte de dados baseado em arquivo básico

df = spark.read \
  .format("csv") \
  .schema(schema) \
  .load("dbfs:/tmp/*") \
  .select("*", "_metadata")

display(df)

'''
Result:
+---------+-----+----------------------------------------------------+
|   name  | age |                 _metadata                          |
+=========+=====+====================================================+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f0.csv",                |
| Debbie  | 18  |    "file_name": "f0.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-07-02 01:05:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f1.csv",                |
| Frank   | 24  |    "file_name": "f1.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-12-20 02:06:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
'''
val df = spark.read
  .format("csv")
  .schema(schema)
  .load("dbfs:/tmp/*")
  .select("*", "_metadata")

display(df_population)

/* Result:
+---------+-----+----------------------------------------------------+
|   name  | age |                 _metadata                          |
+=========+=====+====================================================+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f0.csv",                |
| Debbie  | 18  |    "file_name": "f0.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-07-02 01:05:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f1.csv",                |
| Frank   | 24  |    "file_name": "f1.csv",                          |
|         |     |    "file_size": 10,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-12-20 02:06:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
*/

Selecione campos específicos

spark.read \
  .format("csv") \
  .schema(schema) \
  .load("dbfs:/tmp/*") \
  .select("_metadata.file_name", "_metadata.file_size")
spark.read
  .format("csv")
  .schema(schema)
  .load("dbfs:/tmp/*")
  .select("_metadata.file_name", "_metadata.file_size")

Usar em filtros

spark.read \
  .format("csv") \
  .schema(schema) \
  .load("dbfs:/tmp/*") \
  .select("*") \
  .filter(col("_metadata.file_name") == lit("test.csv"))
spark.read
  .format("csv")
  .schema(schema)
  .load("dbfs:/tmp/*")
  .select("*")
  .filter(col("_metadata.file_name") === lit("test.csv"))

Use em COPY INTO

COPY INTO my_delta_table
FROM (
  SELECT *, _metadata FROM 's3://my-bucket/csvData'
)
FILEFORMAT = CSV

Usar no Auto Loader

Observação

Ao escrever a coluna _metadata , nós a renomeamos como source_metadata. Escrevê-lo como _metadata impossibilitaria o acesso à coluna de metadados na tabela de destino, porque se a fonte de dados contiver uma coluna chamada _metadata, query retornará a coluna da fonte de dados e não os metadados do arquivo.

spark.readStream \
  .format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .schema(schema) \
  .load("s3://my-bucket/csvData") \
  .selectExpr("*", "_metadata as source_metadata") \
  .writeStream \
  .format("delta") \
  .option("checkpointLocation", checkpointLocation) \
  .start(targetTable)
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .schema(schema)
  .load("s3://my-bucket/csvData")
  .selectExpr("*", "_metadata as source_metadata")
  .writeStream
  .format("delta")
  .option("checkpointLocation", checkpointLocation)
  .start(targetTable)