mongodb(Python)

Loading...

MongoDB Atlas via Spark

This notebook provides a top-level introduction in using Spark with MongoDB, enabling developers and data engineers to bring sophisticated real-time analytics and machine learning to live, operational data.

The following illustrates how to use MongoDB and Spark with an example application that leverages MongoDB's aggregation pipeline to pre-process data within MongoDB ready for use in Databricks. It shows as well how to query and write back to MongoDB for use in applications. This notebook covers:

  1. How to read data from MongoDB into Spark.
  2. How to run the MongoDB Connector for Spark as a library in Databricks.
  3. How to leverage MongoDB's Aggregation Pipeline from within Spark
  4. How to use the machine learning ALS library in Spark to generate a set of personalized movie recommendations for a given user.
  5. How to write the results back to MongoDB so they are accessible to applications.

Create a Databricks Cluster and Add the Connector as a Library

  1. Create a Databricks cluster.
  2. Navigate to the cluster detail page and select the Libraries tab.
  3. Click the Install New button.
  4. Select Maven as the Library Source.
  5. Use the Search Packages feature, find 'mongo-spark'. This should point to org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 or newer.
  6. Click Install.
    For more info on the MongoDB Spark connector (which now supports structured streaming) see the MongoDB documentation.

Create a MongoDB Atlas Instance

Atlas is a fully managed, cloud-based MongoDB service. We'll use Atlas to test the integration between MongoDb and Spark.

  1. Sign up for MongoDB Atlas.
  2. Create an Atlas free tier cluster.
  3. Enable Databricks clusters to connect to the cluster by adding the external IP addresses for the Databricks cluster nodes to the whitelist in Atlas. For convenience you could (temporarily!!) 'allow access from anywhere', though we recommend to enable network peering for production.

Prep MongoDB with a sample data-set

MongoDB comes with a nice sample data-set that allows to quickly get started. We will use this in the context of this notebook

  1. In MongoDB Atlas Load the sample data-set once the cluster is up and running.
  2. You can confirm the presence of the data-set via the Browse Collections button in the Atlas UI.

Update Spark Configuration with the Atlas Connection String

  1. Note the connect string under the Connect dialog in MongoDB Atlas. It has the form of "mongodb+srv://<username>:<password>@<databasename>.xxxxx.mongodb.net/"
  2. Back in Databricks in your cluster configuration, under Advanced Options (bottom of page), paste the connection string for both the spark.mongodb.output.uri and spark.mongodb.input.uri variables. Please populate the username and password fields appropriately. This way all the workbooks you are running on the cluster will use this configuration.
  3. Alternatively you can explicitly set the option when calling APIs like: spark.read.format("mongodb").option("spark.mongodb.input.uri", connectionString).load(). If you configured the variables in the cluster, you don't have to set the option.

Read data from the MongoDB 'sales' collection running an Aggregation Pipeline.

MongoDB's Aggregation Pipeline is a powerful capability that allows to pre-process and transform data within MongoDB. It's a great match for real-time analytics, dashboards, report generation with roll-ups, sums & averages with 'server-side' data post-processing. (Note: there is a whole book written about it).
MongoDB even supports rich secondary/compound indexes to extract, filter, and process only the data it needs – for example, analyzing all customers located in a specific geography right within the database without first having to load the full data-set, minimizing data-movement and reducing latency.
The below aggregation pipeline in our example has 4 stages:

  1. Match stage : filters all documents which has "printer paper" in the items array.
  2. Unwind stage : unwind the items array
  3. Add fields stage : adds a new field called "totalSale" which is quantity of items sold * item price.
  4. Project stage : only project "saleDate" and "totalSale" in the output

Copied!
 
saleDate
totalSale
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2015-03-23T21:06:49.506+0000
80.02
2015-03-23T21:06:49.506+0000
70.58
2015-03-23T21:06:49.506+0000
280.60
2015-03-23T21:06:49.506+0000
155.42
2015-03-23T21:06:49.506+0000
36.94
2015-03-23T21:06:49.506+0000
159.60
2015-03-23T21:06:49.506+0000
24.24
2015-03-23T21:06:49.506+0000
42.48
2015-08-25T10:01:02.918+0000
80.50
2015-08-25T10:01:02.918+0000
254.79
2015-08-25T10:01:02.918+0000
62.85
2015-08-25T10:01:02.918+0000
3466.00
2015-08-25T10:01:02.918+0000
132.36
2015-08-25T10:01:02.918+0000
37.55
2015-08-25T10:01:02.918+0000
166.56
2015-08-25T10:01:02.918+0000
171.60
1,000 rows|Truncated data

Read data from the MongoDB 'sales' collection as-is

For comparison we can read the data from the collection as-is without applying any aggregation pipeline transformation and show the schema.

root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- couponUsed: boolean (nullable = true) |-- customer: struct (nullable = true) | |-- gender: string (nullable = true) | |-- age: integer (nullable = true) | |-- email: string (nullable = true) | |-- satisfaction: integer (nullable = true) |-- items: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- name: string (nullable = true) | | |-- price: decimal(6,2) (nullable = true) | | |-- quantity: integer (nullable = true) | | |-- tags: array (nullable = true) | | | |-- element: string (containsNull = true) |-- purchaseMethod: string (nullable = true) |-- saleDate: timestamp (nullable = true) |-- storeLocation: string (nullable = true)

Create a temp view

Let's use the dataframe created in above step and run a simple SparkSQL query on that

Copied!
 
customer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
{"gender": "M", "age": 26, "email": "rapifoozi@viupoen.bb", "satisfaction": 5}
{"gender": "F", "age": 53, "email": "se@nacwev.an", "satisfaction": 4}
{"gender": "F", "age": 39, "email": "beecho@wic.be", "satisfaction": 3}
{"gender": "F", "age": 57, "email": "cuwoik@luvgu.tc", "satisfaction": 5}
{"gender": "M", "age": 31, "email": "do@neokliw.sz", "satisfaction": 5}
{"gender": "F", "age": 52, "email": "citga@fo.pg", "satisfaction": 5}
{"gender": "M", "age": 30, "email": "remcihce@iwjamwit.kp", "satisfaction": 1}
{"gender": "F", "age": 59, "email": "kave@we.com", "satisfaction": 5}
{"gender": "M", "age": 34, "email": "li@efva.gm", "satisfaction": 5}
{"gender": "F", "age": 34, "email": "fase@has.sx", "satisfaction": 2}
{"gender": "F", "age": 60, "email": "dogzab@me.mr", "satisfaction": 3}
{"gender": "M", "age": 34, "email": "bubbecgu@odidecned.tf", "satisfaction": 3}
{"gender": "F", "age": 48, "email": "id@pahu.rs", "satisfaction": 3}
{"gender": "F", "age": 42, "email": "jecosab@copfatma.af", "satisfaction": 3}
501 rows

Writing Data to MongoDB to a new Collection

Now you can enrich the data with data coming from other sources, or use Spark MLLib for training ML Models using data in MongoDB.
For the demonstration here, we are simply writing the 'filtered_df' directly to a new MongoDB collection in the sample_supplies database.

    Write data to Delta

    You can also unify all your data in the Lakehouse by writing the data to a Delta table like below.

      View Data in MongoDB

      You should be able to view the collection added in MongoDB Atlas in the Collections tab. Or you can use MongoDB Compass to view this via the Desktop app.

      More Info

      Relevant technologies that simplify real-time data flow and processing: Data Federation, Workload Isolation, Aggregation Pipelines, Materialized Views
      For batch processing with Parquet on Object Store (S3) see the documentation on $out capabilities.