Convert JSON to CSV using Google Data Flow pipeline

JSON to CSV conversion

Today in this article we will see how to get started with Google data flow and Apache beam services and create a simplified streaming data pipeline to Convert JSON to CSV using Google Data Flow.

Today in this article, we will cover below aspects,

We shall learn how to create a data flow job that will read a JSON file from a google storage bucket and then convert it to CSV format.

This article is based on the last article on CSV to JSON conversion using Dataflow. Here in this article, we shall only cover the basic logic of converting JSON to CSV.

Rest of the Dataflow configuration, scheduling, and running the data flow job you can refer to the same article.

Prerequisites

Setup the prerequisites and environment,

  • Create a Google Cloud project. Make sure that billing is enabled for this project.
  • Enable all the Google APIs required for the projects.
JSON to CSV conversion
  • Make sure to create a Google bucket storage account.
  • Set up the Environment
    • Install Apache Beam
pip install 'apache-beam[gcp]'

  • Set up the Service account and add the required permission/role required as per the need.

Getting started

Let’s upload the sample JSON file to the google GCP bucket.

{
  "account": {
    "id": "1234567890",
    "name": "Checking Account",
    "amount": "1410"
  }
}

I have a sample file as shown below which we shall upload to the GCP environment.

Below I have created the “Input” folder within the Bucket where the file is located.

JSON to CSV using Google Data Flow

Please click on the file and copy its location as highlighted.

All the files located in the bucket will be denoted as common folder naming as below example,

gs://test-bucket-13/Input/input.json

Configure the Dataflow Job

Configuring the dataflow job is already explained in detail in the previous article,

Convert CSV to JSON

Below is a sample logic that I am using to read the JSON file,

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = 'service-account.json'

class ReadJsonInputFile(beam.DoFn):

    def __init__(self, input_path):
        self.input_path = input_path

    def start_bundle(self):
        self.client = storage.Client()

    def process(self, something):
        clear_data = []
        with open(self.input_path) as fin:
            s = fin.read()
            data = json.loads(s)
            account = data.get('account')

            if product and account.get('id'):
                    account_id = str(account.get('id'))
                    name = account.get('name')
                    amount = account.get('amount')
                    clear_data.append([account_id, name, amount])

        yield clear_data

JSON to CSV conversion

Below is the logic using panda’s library, which converts JSON files to CSV files.

class WriteCSVFile(beam.DoFn):

    def __init__(self, bucket_name):
        self.bucket_name = bucket_name

    def start_bundle(self):
        self.client = storage.Client()

    def process(self, mylist):
        df = pd.DataFrame(mylist, columns={'account_id': str, 'name': str, 'amount': str})
        bucket = self.client.get_bucket(self.bucket_name)
        bucket.blob(f"output.csv").upload_from_string(df.to_csv(index=False), 'text/csv')

In the above logic, we are specifying the bucket name where the input file will be uploaded as JSON.

Below is the command line argument setup which will provide the default input argument. If not specified below default argument will be used.

class DataflowPipeline(PipelineOptions):
 
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument('--inputFilePath', type=str, default='gs://test-bucket-13/Input/input.json')
        parser.add_argument('--outputFilePath', type=str, default='gs://test-bucket-13/output/temp/output.csv')

Below is the parser and pipeline execution flow,

def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)

    pipeline_options.view_as(SetupOptions).setup_file = "./setup.py"

    dataflow_options = pipeline_options.view_as(DataflowOptions)

    with beam.Pipeline(options=pipeline_options) as pipeline:
        (pipeline | 'Start' >> beam.Create([None]) 
         | 'Read JSON' >> beam.ParDo(ReadFile(dataflow_options.input_path)) 
         | 'Write CSV' >> beam.ParDo(WriteCSVFIle(dataflow_options.bucket_name)))


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Running Dataflow command

I have already explained in detail how to configure the dataflow job and schedule it using the dataflow scheduler in my previous article,

Note: Please make sure your Dataflow Service account has all permission/roles as per the guidelines.

That’s All!

Do you have any comments or suggestions?

Please sound off your comments below.



Please bookmark this page and share it with your friends. Please Subscribe to the blog to receive notifications on freshly published(2024) best practices and guidelines for software design and development.



Leave a Reply

Your email address will not be published. Required fields are marked *