Padrões comuns de carregamento de dados usando COPY INTO

Aprenda padrões comuns para usar COPY INTO para carregar dados de fontes de arquivos no Delta Lake.

Existem muitas opções para usar COPY INTO. Você também pode usar credenciais temporárias com COPY INTO em combinação com esses padrões.

Consulte COPY INTO para obter uma referência completa de todas as opções.

Criar tabelas de destino para COPY INTO

COPY INTO deve ter como alvo uma tabela Delta existente. No Databricks Runtime 11.0 e acima, definir o esquema para essas tabelas é opcional para formatos que dão suporte à evolução do esquema:

CREATE TABLE IF NOT EXISTS my_table
[(col_1 col_1_type, col_2 col_2_type, ...)]
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

Observe que para inferir o esquema com COPY INTO, você deve passar opções adicionais:

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

O exemplo a seguir cria uma tabela Delta sem esquema chamada my_pipe_data e carrega um CSV delimitado por barra vertical com um cabeçalho:

CREATE TABLE IF NOT EXISTS my_pipe_data;

COPY INTO my_pipe_data
  FROM 's3://my-bucket/pipeData'
  FILEFORMAT = CSV
  FORMAT_OPTIONS ('mergeSchema' = 'true',
                  'delimiter' = '|',
                  'header' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true');

Carregar dados JSON com COPY INTO

O exemplo a seguir carrega dados JSON de cinco arquivos no Amazon S3 (S3) na tabela Delta chamada my_json_data. Esta tabela deve ser criada antes que COPY INTO possa ser executado. Se algum dado já tiver sido carregado de um dos arquivos, os dados não serão recarregados para esse arquivo.

COPY INTO my_json_data
  FROM 's3://my-bucket/jsonData'
  FILEFORMAT = JSON
  FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')

-- The second execution will not copy any data since the first command already loaded the data
COPY INTO my_json_data
  FROM 's3://my-bucket/jsonData'
  FILEFORMAT = JSON
  FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')

Carregar dados Avro com COPY INTO

O exemplo a seguir carrega dados Avro no S3 usando expressões SQL adicionais como parte da instrução SELECT .

COPY INTO my_delta_table
  FROM (SELECT to_date(dt) dt, event as measurement, quantity::double
          FROM 's3://my-bucket/avroData')
  FILEFORMAT = AVRO

Carregar arquivos CSV com COPY INTO

O exemplo a seguir carrega arquivos CSV do S3 em s3://bucket/base/path/folder1 em uma tabela Delta em s3://bucket/deltaTables/target.

COPY INTO delta.`s3://bucket/deltaTables/target`
  FROM (SELECT key, index, textData, 'constant_value'
          FROM 's3://bucket/base/path')
  FILEFORMAT = CSV
  PATTERN = 'folder1/file_[a-g].csv'
  FORMAT_OPTIONS('header' = 'true')

-- The example below loads CSV files without headers in S3 using COPY INTO.
-- By casting the data and renaming the columns, you can put the data in the schema you want
COPY INTO delta.`s3://bucket/deltaTables/target`
  FROM (SELECT _c0::bigint key, _c1::int index, _c2 textData
        FROM 's3://bucket/base/path')
  FILEFORMAT = CSV
  PATTERN = 'folder1/file_[a-g].csv'

Ignore arquivos corrompidos ao carregar dados

Se os dados que você está carregando não puderem ser lidos devido a algum problema de corrupção, esses arquivos podem ser ignorados definindo ignoreCorruptFiles como true no FORMAT_OPTIONS.

O resultado do comando COPY INTO retorna quantos arquivos foram ignorados devido à corrupção na coluna num_skipped_corrupt_files . Essa avaliação também aparece na coluna operationMetrics em numSkippedCorruptFiles após a execução DESCRIBE HISTORY na tabela Delta.

Arquivos corrompidos não são rastreados por COPY INTO, portanto, eles podem ser recarregados em uma execução subsequente se a corrupção for corrigida. Você pode ver quais arquivos estão corrompidos executando COPY INTO no modo VALIDATE .

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
[VALIDATE ALL]
FORMAT_OPTIONS ('ignoreCorruptFiles' = 'true')

Observação

ignoreCorruptFiles está disponível no Databricks Runtime 11.0e acima.