Quickstart using Python

This page shows you how to set up your Python development environment, get the Apache Beam SDK for Python, and run and modify an example pipeline.

Another option for learning how to create and run an Apache Beam pipeline is to interactively develop one using an Apache Beam notebook. If you already have a Google Cloud project set up, the wordcount example pipeline in this quickstart is available as an example notebook.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  4. Enable the Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs.

    Enable the APIs

  5. Create a service account:

    1. In the Cloud Console, go to the Create service account page.

      Go to Create service account
    2. Select a project.
    3. In the Service account name field, enter a name. The Cloud Console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create.
    5. Click the Select a role field.

      Under Quick access, click Basic, then click Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  6. Create a service account key:

    1. In the Cloud Console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again.

  8. Create a Cloud Storage bucket:
    1. In the Cloud Console, go to the Cloud Storage Browser page.

      Go to Browser

    2. Click Create bucket.
    3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
      • For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
      • For Choose where to store your data, do the following:
        • Select a Location type option.
        • Select a Location option.
      • For Choose a default storage class for your data, select the following: Standard.
      • For Choose how to control access to objects, select an Access control option.
      • For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
    4. Click Create.

Set up your environment

This quickstart uses a command prompt. If you don't have a command prompt readily available, you can use Cloud Shell. Cloud Shell already has the package manager for Python installed, so you can skip step 1 in the following procedure.
  1. Use the Apache Beam SDK for Python with pip and Python version 3.6, 3.7, or 3.8. Check that you have a working Python and pip installation by running the following command:
    python --version
    python -m pip --version
    If you do not have Python, find the installation steps for your operating system on the Installing Python page.
  2. Dataflow no longer supports pipelines using Python 2. Read more information on the Python 2 support on Google Cloud page.

  3. Setup and activate a Python virtual environment for this quickstart.

    After you complete the quickstart, you can deactivate the virtual environment by running deactivate.

Read more about using Python on Google Cloud on the Setting Up a Python Development Environment page.

Note: For best results, launch Python 3 pipelines with Apache Beam 2.16.0 or later. Apache Beam SDK version 2.24.0 was the last version to support Python 2 and Python 3.5. For a summary of recent Python 3 improvements in Apache Beam, see the Apache Beam issue tracker.

Get the Apache Beam SDK

The Apache Beam SDK is an open source programming model for data pipelines. You define these pipelines with an Apache Beam program and can choose a runner, such as Dataflow, to execute your pipeline.

Install the latest version of the Apache Beam SDK for Python by running the following command from a virtual environment:

pip install 'apache-beam[gcp]'

Run WordCount locally

The wordcount example demonstrates a pipeline that performs the following steps:
  1. Takes a text file as input.
  2. Parses each line into words.
  3. Performs a frequency count on the tokenized words.

Run the wordcount module from the apache_beam package on your local machine with the following command:

python -m apache_beam.examples.wordcount \
  --output outputs
This text file is located in a Cloud Storage bucket with the resource name gs://dataflow-samples/shakespeare/kinglear.txt. To view the output, run the following command:
more outputs*

To exit, press the q key.

Running the pipeline locally lets you to test and debug your Apache Beam program. You can view the wordcount.py source code on Apache Beam GitHub.

Run WordCount on the Dataflow service

You can run the wordcount module from the apache_beam package on the Dataflow service by specifying DataflowRunner in the runner field and selecting a region for the pipleline to run.

First, define your PROJECT, BUCKET, and REGION variables:

PROJECT=PROJECT_ID
BUCKET=GCS_BUCKET
REGION=DATAFLOW_REGION
Replace the following:
  • PROJECT_ID: a unique project ID.
  • GCS_BUCKET: a unique name for the Cloud Storage bucket.
  • DATAFLOW_REGION: the region where your project is located.
Run the pipeline by using the following command:
python -m apache_beam.examples.wordcount \
  --region $REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://$BUCKET/results/outputs \
  --runner DataflowRunner \
  --project $PROJECT \
  --temp_location gs://$BUCKET/tmp/

View your results

When you run a pipeline using Dataflow, your results are stored in a Cloud Storage bucket. You can use either the local terminal or Cloud Console to view the results.

To view the results from your terminal, use the gsutil tool.

List the output files by running the following command in your terminal:
gsutil ls -lh "gs://$BUCKET/results/outputs*"  
View the results in the output files by running the following command in your terminal:
gsutil cat "gs://$BUCKET/results/outputs*"
To view your results in Cloud Console, perform the following steps:
  1. Open the Dataflow monitoring page in Cloud Console.
    Go to Dataflow jobs

    The Jobs page lists the details of all the available jobs, including the status.

    You should see your wordcount job with a Status of Running at first, and then Succeeded.

  2. Open the Cloud Storage browser in Cloud Console.
    Go to Cloud Storage browser

    The Cloud Storage browser page displays the list of all the storage buckets in your project.

  3. Click the storage bucket that you created.

    In the wordcount directory, you should see the output files that your job created.

Modify the pipeline code

The wordcount pipeline in the previous examples distinguishes between uppercase and lowercase words. The following walkthrough demonstrates how to modify the pipeline so that the WordCount pipeline is not case-sensitive.
  1. Download the latest copy of the wordcount code from the Apache Beam GitHub repository.
  2. Run the pipeline on your local machine:
    python wordcount.py --output outputs
  3. View the results by running the following command:
    more outputs*
    To exit, press the q key.
  4. Open the file wordcount.py in the editor of your choice.
  5. Examine the pipeline steps inside the run function. After split, the lines are split into words as strings.
    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones))
  6. Modify the line after split to lowercase the strings.
    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'lowercase' >> beam.Map(unicode.lower)     # Add this line to modify the pipeline
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones)) 
    This modification maps the str.lower function onto every word. This line is equivalent to beam.Map(lambda word: str.lower(word)).
  7. Save the file and run the modified wordcount job on your local machine:
    python wordcount.py --output outputs
  8. View the results of the modified pipeline by running the following command:
    more outputs*
    To exit, press the q key.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this quickstart, follow these steps.

  1. In the Cloud Console, go to the Browser page.

    Go to Browser

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete.

What's next

Apache Beam is a trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.