databricks-logo

TWS Scala Top K

(Scala)
Loading...

TWS Scala Top K

Create synthetic data using dbldatagen in Python

3

Collecting dbldatagen Using cached dbldatagen-0.4.0.post1-py3-none-any.whl.metadata (9.9 kB) Using cached dbldatagen-0.4.0.post1-py3-none-any.whl (122 kB) Installing collected packages: dbldatagen Successfully installed dbldatagen-0.4.0.post1 [notice] A new release of pip is available: 24.0 -> 25.0.1 [notice] To update, run: pip install --upgrade pip

Set StateStoreProvider and input table name

6

Define stateful structs that our processor will use

Import our structs and necessary structs

9

import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider import java.util.UUID import org.apache.spark.sql.streaming.StatefulProcessor import org.apache.spark.sql.streaming._ import java.sql.Timestamp import org.apache.spark.sql.Encoders

Define our StatefulProcessor

11

import scala.collection.mutable.PriorityQueue defined class TopKStatefulProcessor

Define our input stream

13

inputStream: org.apache.spark.sql.Dataset[(String, Int)] = [user: string, value: int]

Define output table and checkpoint location

15

baseLocation: String = /Workspace/Users/bo.gao@databricks.com/tws/3626d05d-7581-4b9b-ac7c-967b0e585142 checkpointLocation: String = /Workspace/Users/bo.gao@databricks.com/tws/3626d05d-7581-4b9b-ac7c-967b0e585142/checkpoint outputTable: String = /Workspace/Users/bo.gao@databricks.com/tws/3626d05d-7581-4b9b-ac7c-967b0e585142/output

Define our stateful transformation and start our query

17

0f53b8f9-fbb9-4041-a7f4-f046b43049bd
Last updated: 47 days ago
import spark.implicits._ import org.apache.spark.sql.streaming.Trigger result: org.apache.spark.sql.Dataset[(Int, String, Int)] = [_1: int, _2: string ... 1 more field] query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@233f7401
18

    res10: Boolean = true
    ;