def flatten_table(service_name):
flattenedStream = spark.readStream.table(f"{catalog}.{database}.silver")
flattened = spark.table(f"{database}.silver")
schema = StructType()
keys = (
flattened
.filter(col("serviceName") == service_name)
.select(just_keys_udf(col("flattened")))
.alias("keys")
.distinct()
.collect()
)
keysList = [i.asDict()['justKeys(flattened)'][1:-1].split(", ") for i in keys]
keysDistinct = {j for i in keysList for j in i if j != ""}
if len(keysDistinct) == 0:
schema.add(StructField('placeholder', StringType()))
else:
for i in keysDistinct:
schema.add(StructField(i, StringType()))
table_name = service_name.replace("-","_")
(flattenedStream
.filter(col("serviceName") == service_name)
.withColumn("requestParams", from_json(col("flattened"), schema))
.drop("flattened")
.writeStream
.partitionBy("date")
.outputMode("append")
.format("delta")
.option("checkpointLocation", f"{checkpoint}/gold/{service_name}")
.option("mergeSchema", True)
.trigger(availableNow=True)
.table(f"{catalog}.{database}.{table_name}")
)
spark.sql(f"OPTIMIZE {database}.{table_name}")