Python – PySpark Connect to MongoDB

Python PySpark to connect MongoDB 2 Python PySpark Connect to MongoDB

Today in this article, we will see how to use PySpark Connect to MongoDB using Python code examples.

We will make use of MongoDB Spark Connector which helps connect to MongoDB and allows us to read and write data between PySpark and MongoDB.

Here’s a basic example demonstrating how to read data from MongoDB, perform a transformation, and then write the results back to MongoDB.

We have below MongoDB collection,

Python PySpark to connect MongoDB read write Python PySpark Connect to MongoDB

Install the MongoDB Spark Connector

Please install MongoDB Spark Connector using the below command.

Command

pip install pyspark

For more details, please visit the official MongoDB website:

https://www.mongodb.com/products/spark-connector

Create a SparkSession Object

Once you have PySpark running, you can use the below command to create a PySpark session object as below,

from pyspark.sql import SparkSession


my_spark = SparkSession 
    .builder 
    .appName("TheCodeBuzz") 
    .config("spark.mongodb.input.uri", "mongodb://host/TheCodeBuzz.Orders") 
    .config("spark.mongodb.output.uri", "mongodb://host/TheCodeBuzz.Orders") 
    .getOrCreate()

We create a Spark session with the MongoDB connector configurations.

We will use the above SparkSession object to read, and write data to MongoDB, etc.

Read Data from MongoDB using PySpark

You must specify the following configuration settings to read from MongoDB:

dataFrame = spark.read
                 .format("mongodb")
                 .option("database", "TheCodeBuzz")
                 .option("collection", "Orders")
                 .load()

Above spark.read() function returns a DataFrameReader object, which you can use to specify the format and other configuration settings for batch read operation.

Write data to MongoDB using PySpark

To write data from MongoDB, call the write function on your SparkSession object.

You must specify the following configuration settings to write the data to MongoDB,

data = [
  {

    "Order": "journal2",
    "qty": 25,
    "books": [
      "white"
    ],
    "domain": [
      14
    ],
    "Link": "https://www.thecodebuzz.com/order/23456"
  },
  {

    "Order": "journal1",
    "qty": 25,
    "books": [
      "black"
    ],
    "domain": [
      15
    ],
    "Link": "https://www.thecodebuzz.com/order/2324"
  }
]
dataFrame = spark.read.json(sc.parallelize([data]))


dataFrame.write.format("mongodb")
               .mode("append")
                .option("database", "TheCodeBuzz")
                .option("collection", "Orders")
                .option("upertDocument", "true")
                .save()

The above example createDataFrame() function creates a DataFrame object from JSON input and saves it to MongoDB.

That’s all! Happy coding!

Does this help you fix your issue?

Do you have any better solutions or suggestions? Please sound off your comments below.



Please bookmark this page and share it with your friends. Please Subscribe to the blog to receive notifications on freshly published(2024) best practices and guidelines for software design and development.



Leave a Reply

Your email address will not be published. Required fields are marked *