databricks-logo

Scala TWS Initial State

(Scala)
Loading...

TWS initialize state store from Delta in Scala

Create synthetic data using dbldatagen in Python

3

Collecting dbldatagen Downloading dbldatagen-0.4.0.post1-py3-none-any.whl.metadata (9.9 kB) Downloading dbldatagen-0.4.0.post1-py3-none-any.whl (122 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 122.8/122.8 kB 3.8 MB/s eta 0:00:00 Installing collected packages: dbldatagen Successfully installed dbldatagen-0.4.0.post1 [notice] A new release of pip is available: 24.0 -> 25.0.1 [notice] To update, run: pip install --upgrade pip
4

Set StateStoreProvider and input table name

6

tableName: String = initial_state_input_table df: org.apache.spark.sql.DataFrame = [ip_address: string, username: string ... 7 more fields]

Define stateful structs that our processor will use

8

Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful.

Import our structs and necessary structs

10

import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider import java.util.UUID import org.apache.spark.sql.streaming.StatefulProcessor import org.apache.spark.sql.streaming._ import java.sql.Timestamp import org.apache.spark.sql.Encoders import org.apache.spark.sql.streaming.IPState._

Define our StatefulProcessor

12

defined class SuspiciousIPTrackingProcessor

Define our input stream

14

import spark.implicits._ inputStream: org.apache.spark.sql.Dataset[org.apache.spark.sql.streaming.IPState.SecurityEvent] = [ipAddress: string, username: string ... 7 more fields]

Define output table and checkpoint location

16

checkpointLocation: String = /Workspace/Users/eric.marnadi@databricks.com/streaming_query/checkpoint_ebd91116-aff2-441f-8bc3-0762342bf1ce outputTable: String = /Workspace/Users/eric.marnadi@databricks.com/streaming_query/output_table_e79fc2c6-1c83-473f-98a2-20a7f66de599

Define our stateful transformation and start our query

18

initialStates: org.apache.spark.sql.KeyValueGroupedDataset[String,org.apache.spark.sql.streaming.IPState.IPInitialState] = KeyValueGroupedDataset: [key: [value: string], value: [lastTimestamp: bigint, loginAttempts: int ... 12 more field(s)]]
19

suspiciousIPStream: org.apache.spark.sql.Dataset[org.apache.spark.sql.streaming.IPState.SecurityMetrics] = [ipAddress: string, threatLevel: string ... 8 more fields]
20

b6084530-350d-4ec1-b714-a482d22ecb15
Last updated: 97 days ago
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@45506b33
21

+-----------+-----------+-------------+------------+---------------------+-------------+-----------+-------------+--------------------+--------------------+ |ipAddress |threatLevel|loginAttempts|failedLogins|distinctUsernameCount|totalRequests|lastAction |adminAttempts|suspiciousUserAgents|distinctCountryCount| +-----------+-----------+-------------+------------+---------------------+-------------+-----------+-------------+--------------------+--------------------+ |192.168.1.1|HIGH |2 |1 |3 |6 |api_call |1 |1 |3 | |192.168.1.1|HIGH |2 |1 |3 |6 |api_call |1 |1 |3 | |192.168.1.1|HIGH |2 |1 |3 |6 |file_access|1 |1 |3 | |192.168.1.1|HIGH |2 |1 |3 |6 |data_export|1 |1 |3 | |192.168.1.1|HIGH |3 |1 |3 |6 |login |1 |1 |3 | |192.168.1.1|HIGH |2 |1 |3 |6 |data_export|1 |1 |3 | |192.168.1.1|HIGH |2 |1 |3 |6 |data_export|1 |1 |3 | |192.168.1.1|HIGH |2 |1 |3 |6 |api_call |1 |1 |3 | |192.168.1.1|HIGH |3 |1 |3 |6 |login |1 |1 |3 | |192.168.1.1|HIGH |2 |1 |3 |6 |api_call |1 |1 |3 | |192.168.1.1|HIGH |2 |1 |3 |6 |api_call |1 |1 |3 | |192.168.1.1|HIGH |2 |1 |3 |6 |data_export|1 |1 |3 | |192.168.1.1|HIGH |2 |1 |3 |6 |file_access|1 |1 |3 | |192.168.1.1|HIGH |2 |1 |3 |6 |data_export|1 |1 |3 | |192.168.1.1|HIGH |2 |1 |3 |6 |api_call |1 |1 |3 | |192.168.1.1|HIGH |2 |1 |3 |6 |api_call |1 |1 |3 | |192.168.1.1|HIGH |2 |1 |3 |6 |file_access|1 |1 |3 | |192.168.1.1|HIGH |3 |1 |3 |6 |login |1 |1 |3 | |192.168.1.1|HIGH |2 |1 |3 |6 |data_export|1 |1 |3 | |192.168.1.1|HIGH |3 |1 |3 |6 |login |1 |1 |3 | +-----------+-----------+-------------+------------+---------------------+-------------+-----------+-------------+--------------------+--------------------+ only showing top 20 rows import org.apache.spark.sql.functions._
;