databricks-logo

TWS Scala SCD Type 2

(Scala)
Loading...

TWS Scala SCD Type 2

Create synthetic data using dbldatagen in Python

3

Collecting dbldatagen Using cached dbldatagen-0.4.0.post1-py3-none-any.whl.metadata (9.9 kB) Using cached dbldatagen-0.4.0.post1-py3-none-any.whl (122 kB) Installing collected packages: dbldatagen Successfully installed dbldatagen-0.4.0.post1 [notice] A new release of pip is available: 24.0 -> 25.0.1 [notice] To update, run: pip install --upgrade pip

Set StateStoreProvider and input table name

6

Define stateful structs that our processor will use

8

Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful.

Import our structs and necessary structs

10

import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider import java.util.UUID import org.apache.spark.sql.streaming.StatefulProcessor import org.apache.spark.sql.streaming._ import java.sql.Timestamp import org.apache.spark.sql.Encoders import org.apache.spark.sql.streaming.MS._

Define our StatefulProcessor

12

defined class SCDType2StatefulProcessor

Define our input stream

14

inputStream: org.apache.spark.sql.Dataset[org.apache.spark.sql.streaming.MS.UserLocation] = [user: string, time: timestamp ... 1 more field]

Define output table and checkpoint location

16

baseLocation: String = /Workspace/Users/bo.gao@databricks.com/tws/fd8d30f8-d135-4349-927d-ea6b4b56f843 checkpointLocation: String = /Workspace/Users/bo.gao@databricks.com/tws/fd8d30f8-d135-4349-927d-ea6b4b56f843/checkpoint outputTable: String = /Workspace/Users/bo.gao@databricks.com/tws/fd8d30f8-d135-4349-927d-ea6b4b56f843/output

Define our stateful transformation and start our query

18

Cancelled
19

    res17: Boolean = true
    20

    ;