databricks-logo

Scala TWS Session Tracking

(Scala)
Loading...

TWS session tracking in Scala

Create synthetic data using dbldatagen in Python

3

Collecting dbldatagen Using cached dbldatagen-0.4.0.post1-py3-none-any.whl.metadata (9.9 kB) Using cached dbldatagen-0.4.0.post1-py3-none-any.whl (122 kB) Installing collected packages: dbldatagen Successfully installed dbldatagen-0.4.0.post1 [notice] A new release of pip is available: 24.0 -> 25.0.1 [notice] To update, run: pip install --upgrade pip

Set StateStoreProvider and input table name

6

tableName: String = session_tracking_input df: org.apache.spark.sql.DataFrame = [user_id: string, action_type: string ... 2 more fields]

Define stateful structs that our processor will use

8

Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful.

Import our structs and necessary structs

10

import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider import java.util.UUID import org.apache.spark.sql.streaming.StatefulProcessor import org.apache.spark.sql.streaming._ import java.sql.Timestamp import org.apache.spark.sql.Encoders import org.apache.spark.sql.streaming.SessionState.SessionState

Define our StatefulProcessor

12

defined class SessionTrackingProcessor

Define our input stream

14

inputStream: org.apache.spark.sql.Dataset[(String, String, Long, Int)] = [_1: string, _2: string ... 2 more fields]

Define output table and checkpoint location

16

checkpointLocation: String = /Workspace/Users/eric.marnadi@databricks.com/streaming_query/checkpoint_7e59eeb1-f9e2-49e7-9f2b-d97bbca13268 outputTable: String = /Workspace/Users/eric.marnadi@databricks.com/streaming_query/output_table_8a30fda6-cb3a-4650-9c28-8d76352b18aa

Define our stateful transformation and start our query

18

import spark.implicits._ sessionStream: org.apache.spark.sql.Dataset[(String, String, Long, Int, Int, Int)] = [_1: string, _2: string ... 4 more fields]
19

9d488b56-4285-4969-aa59-2bab14fb56dc
Last updated: 48 days ago
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1d9aefb1
20

    +-----+------+---+---+---+---+ | _1| _2| _3| _4| _5| _6| +-----+------+---+---+---+---+ |user1|ACTIVE| 1| 0| 0| 1| |user1|ACTIVE| 6| 1| 0| 0| |user1|ACTIVE| 11| 0| 1| 0| |user1|ACTIVE| 16| 1| 0| 0| |user1|ACTIVE| 21| 1| 0| 0| |user1|ACTIVE| 26| 0| 1| 0| |user1|ACTIVE| 31| 0| 1| 0| |user1|ACTIVE| 36| 1| 0| 0| |user1|ACTIVE| 41| 0| 1| 0| |user1|ACTIVE| 46| 0| 0| 1| |user1|ACTIVE| 51| 0| 1| 0| |user1|ACTIVE| 56| 0| 1| 0| |user1|ACTIVE| 61| 0| 1| 0| |user1|ACTIVE| 66| 1| 0| 0| |user1|ACTIVE| 71| 0| 1| 0| |user1|ACTIVE| 76| 0| 0| 1| |user1|ACTIVE| 81| 0| 1| 0| |user1|ACTIVE| 86| 0| 1| 0| |user1|ACTIVE| 91| 0| 1| 0| |user1|ACTIVE| 96| 0| 1| 0| +-----+------+---+---+---+---+ only showing top 20 rows
    ;