# Use secrets DBUtil to get Snowflake credentials. user = dbutils.secrets.get("data-warehouse", "<snowflake-user>") password = dbutils.secrets.get("data-warehouse", "<snowflake-password>") options = { "sfUrl": "<snowflake-url>", "sfUser": user, "sfPassword": password, "sfDatabase": "<snowflake-database>", "sfSchema": "<snowflake-schema>", "sfWarehouse": "<snowflake-cluster>", }
%scala val user = dbutils.secrets.get("data-warehouse", "<snowflake-user>") val password = dbutils.secrets.get("data-warehouse", "<snowflake-password>") val options = Map( "sfUrl" -> "<snowflake-url>", "sfUser" -> user, "sfPassword" -> password, "sfDatabase" -> "<snowflake-database>", "sfSchema" -> "<snowflake-schema>", "sfWarehouse" -> "<snowflake-cluster>" )
%scala import net.snowflake.spark.snowflake.Utils Utils.runQuery(options, """CREATE SCHEMA IF NOT EXISTS <snowflake-database>""") Utils.runQuery(options, """DROP TABLE IF EXISTS adult""") Utils.runQuery(options, """CREATE TABLE adult ( age DOUBLE, workclass STRING, fnlwgt DOUBLE, education STRING, education_num DOUBLE, marital_status STRING, occupation STRING, relationship STRING, race STRING, sex STRING, capital_gain DOUBLE, capital_loss DOUBLE, hours_per_week DOUBLE, native_country STRING, income STRING)""")
import pyspark from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, VectorAssembler from distutils.version import LooseVersion categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"] stages = [] # stages in our Pipeline for categoricalCol in categoricalColumns: # Category Indexing with StringIndexer stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index") # Use OneHotEncoder to convert categorical variables into binary SparseVectors if LooseVersion(pyspark.__version__) < LooseVersion("3.0"): from pyspark.ml.feature import OneHotEncoderEstimator encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) else: from pyspark.ml.feature import OneHotEncoder encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) # Add stages. These are not run here, but will run all at once later on. stages += [stringIndexer, encoder]
# Transform all features into a vector using VectorAssembler numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"] assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features") stages += [assembler]
finalPredictions \ .drop("features") \ .drop("rawPrediction") \ .selectExpr("*", "VectorToArray(probability)[0] as prob_0", "VectorToArray(probability)[1] as prob_1") \ .drop("probability") \ .write.format("snowflake") \ .options(**options) \ .option("dbtable", "adult_results") \ .mode("overwrite") \ .save()
Configure Snowflake connection options.
Last refresh: Never