Pular para o conteúdo principal

Ampliação automática de tipos com Auto Loader

info

Visualização

Este recurso está em Pré-visualização Pública no Databricks Runtime 16.4 e versões superiores.

Auto Loader processa de forma incremental e eficiente novos arquivos de dados à medida que chegam ao armazenamento cloud . Isso também reduz a manutenção do pipeline, lidando automaticamente com alterações complexas de esquema. Por exemplo, você pode configurar o Auto Loader para detectar automaticamente o esquema dos dados carregados, permitindo inicializar tabelas sem declarar explicitamente o esquema dos dados. Você também pode evoluir o esquema da tabela à medida que novas colunas são introduzidas, eliminando a necessidade de rastrear e aplicar manualmente as alterações de esquema ao longo do tempo. O Auto Loader pode até mesmo recuperar dados inesperados (por exemplo, devido a tipos de dados diferentes) em uma coluna de dados recuperados, ajudando você a evitar a perda de dados.

No entanto, a coluna de dados recuperada exige que você lide manualmente com quaisquer alterações no tipo de dados.

Para lidar automaticamente com algumas dessas alterações de tipo de dados, use a ampliação de tipo no Auto Loader. O Delta Lake agora suporta alterações de ampliação de diferentes tipos de dados sem exigir a reescrita de dados ou intervenção do usuário. Veja alargamento do tipo Delta Lake. O novo modo de evolução do esquema, addNewColumnsWithTypeWidening, evolui automaticamente o esquema em alterações de tipo de dados compatíveis.

Você pode expandir tipos primitivos como int para long, float para double e muito mais. O alargamento de tipos está disponível para todos os formatos de arquivo com suporte a evolução do esquema no Auto Loader. Isso inclui formatos de texto (como JSON, CSV ou XML) e formatos binários (como Avro ou Parquet). Não há mudança no comportamento de evolução do esquema para os modos de evolução do esquema existentes (como addNewColumns, rescue, failOnNewColumns ou none).

Alterações de tipo suportadas

As seguintes alterações de tipo são suportadas:

Tipo de origem

Tipos mais amplos suportados

byte

short, int, long, decimal, double

short

int, long, decimal, double

int

long, decimal, double

long

decimal

float

double

decimal

decimal com maior precisão e escala

date

timestampNTZ (somente para arquivos Parquet)

Ao expandir qualquer tipo numérico para decimal, o Auto Loader expande para decimal com precisão igual ou maior que a precisão inicial. Ao aumentar a escala, a precisão total aumenta na mesma proporção.

A precisão inicial dos integer types é a seguinte:

Tipo

Precisão inicial

byte

10

short

10

int

10

long

20

Por exemplo, se o tipo atual de uma coluna for int e um arquivo com o tipo dessa coluna como decimal(5, 2) for lido, o Auto Loader amplia o tipo dessa coluna para decimal(12, 2).

Pré-requisitos

Para usar a ampliação de tipos com o Auto Loader, você deve atender aos seguintes requisitos:

  • Use Databricks Runtime 16.4 ou acima.
  • Se o destino de gravação for uma tabela Delta Lake, habilite a ampliação de tipo para a tabela Delta Lake usando um dos seguintes métodos:
    • Se estiver usando uma tabela existente:

      SQL
      ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
    • Se estiver criando uma nova tabela com a ampliação de tipo ativada:

      SQL
      CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')

Para obter mais informações sobre a ampliação de tipos nas tabelas Delta Lake , consulte Ampliação de tipos.

Habilitar a ampliação de tipos com a evolução do esquema

Para usar o alargamento de tipo com Auto Loader, especifique addNewColumnsWithTypeWidening ao usar a evolução do esquema. O Auto Loader detecta a adição de novas colunas e alterações de tipo à medida que processa seus dados.

Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", <schemaPath>)
.option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
.load(<inputPath>)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", <checkpointPath>)
.trigger(availableNow=True)
.toTable("table_name")
)

Quando Auto Loader detecta uma nova coluna ou uma mudança de tipo que é suportada pela ampliação de tipo, a transmissão para com um UnknownFieldException. Antes que sua transmissão apresente esse erro, Auto Loader realiza a inferência de esquema nos microlotes de dados mais recentes e atualiza a localização do esquema com o esquema mais recente, ampliando as colunas existentes ou mesclando novas colunas ao final do esquema.

Comportamento da evolução do esquema em relação às mudanças no tipo de dados

Se você importar um CSV com o seguinte conteúdo, o Auto Loader inferirá o esquema como STRUCT<id INT, name STRING, _rescued_data STRING>.

CSV
id, name
1, John
2, Mary

A tabela de destino tem a seguinte aparência:

| id | nome | _dados_resgatados | | -- | ---- | --------------- | | 1 | João | NULO | | 2 | Maria | NULO |

Agora, importe outro arquivo CSV onde os valores na coluna id sejam mais largos que o tipo INT :

CSV
id, name, age
2147483648, Bob, 25

A tabela a seguir explica o comportamento e a saída com diferentes modos de evolução do esquema no Auto Loader:

Mode

Comportamento com alteração de tipo de dados alargável suportada

addNewColumns (padrão)

O tipo de dados não evolui e a transmissão não falha devido à mudança do tipo de dados. As colunas com valores incompatíveis em relação ao tipo são definidas como NULL e os valores incompatíveis são adicionados à coluna de dados recuperados. a transmissão falha em novas colunas.

rescue

O esquema não evolui e as transmissões não falham devido a quaisquer alterações no esquema. As colunas com valores incompatíveis em relação ao tipo são definidas como NULL e os valores incompatíveis são adicionados à coluna de dados recuperados.

failOnNewColumns

O tipo de dados não evolui e a transmissão não falha devido à mudança do tipo de dados. As colunas com valores incompatíveis em relação ao tipo são definidas como NULL e os valores incompatíveis são adicionados à coluna de dados recuperados. A transmissão falha em novas colunas sem que o esquema seja atualizado.

none

Não evolui o esquema, novas colunas são ignoradas e os dados não são resgatados a menos que a opção rescuedDataColumn seja definida. A transmissão não falha devido a alterações de esquema.

addNewColumnsWithTypeWidening

A transmissão falhou. Novas colunas foram adicionadas ao esquema e as alterações de tipo de dados suportadas foram ampliadas. Alterações de tipo de dados não suportadas (por exemplo, int para string) são adicionadas à coluna de dados recuperados.

Resultados de exemplo

A tabela a seguir mostra o esquema inferido e os valores para cada modo de evolução do esquema após a ingestão do segundo arquivo CSV:

Mode

Esquema e valores inferidos

addNewColumns

Esquema : id: INT, name: STRING, age: INT, _rescued_data: STRING

Valores :

ID

name

idade

_dados_resgatados

1

John

null

null

2

Mary

null

null

null

Prumo

25

{"id": 2147483648}

rescue

Esquema : id: INT, name: STRING, _rescued_data: STRING

Valores :

ID

name

_dados_resgatados

1

John

null

2

Mary

null

null

Prumo

{"age": 25, "id": 2147483648}

failOnNewColumns

Esquema : id: INT, name: STRING, _rescued_data: STRING

Valores :

ID

name

_dados_resgatados

1

John

null

2

Mary

null

null

Prumo

{"id": 2147483648}

none

Esquema : id: INT, name: STRING

Valores :

ID

name

1

John

2

Mary

null

Prumo

addNewColumnsWithTypeWidening

Esquema : id: BIGINT, name: STRING, age: INT, _rescued_data: STRING

Valores :

ID

name

idade

_dados_resgatados

1

John

null

null

2

Mary

null

null

2147483648

Prumo

25

null

Limitações

  • A opção prefersDecimal não pode ser definida como false ao usar addNewColumnsWithTypeWidening. Quando addNewColumnsWithTypeWidening é especificado, o valor default de prefersDecimal é true.
  • date A ampliação para timestampNTZ só é suportada para arquivos Parquet.