Quickstart using Python

This page shows you how to set up your Python development environment, get the Apache Beam SDK for Python, and run and modify an example pipeline.

Another option for learning how to create and run an Apache Beam pipeline is to interactively develop one using an Apache Beam notebook. If you already have a Google Cloud project set up, the WordCount example pipeline in this quickstart is available as an example notebook.

Before you begin

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. In the Cloud Console, on the project selector page, select or create a Cloud project.

    Go to the project selector page

  3. Verifica che la fatturazione sia attivata per il tuo progetto.

    scopri come attivare la fatturazione

  4. Enable the Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs.

    Enable the APIs

  5. Set up authentication:
    1. In the Cloud Console, go to the Create service account key page.

      Go to the Create Service Account Key page
    2. From the Service account list, select New service account.
    3. In the Service account name field, enter a name.
    4. From the Role list, select Project > Owner.

      Note: The Role field authorizes your service account to access resources. You can view and change this field later by using the Cloud Console. If you are developing a production app, specify more granular permissions than Project > Owner. For more information, see granting roles to service accounts.
    5. Click Create. A JSON file that contains your key downloads to your computer.
  6. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again.

  7. Create a Cloud Storage bucket:
    1. In the Cloud Console, go to the Cloud Storage Browser page.

      Go to the Cloud Storage Browser page

    2. Click Create bucket.
    3. In the Create bucket dialog, specify the following attributes:
      • Name: A unique bucket name. Do not include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
      • Default storage class: Standard
      • A location where bucket data will be stored.
    4. Click Create.

Set up your environment

  1. Use the Apache Beam SDK for Python with pip and Python version 2.7, 3.5, 3.6 or 3.7. Check that you have a working Python and pip installation by running:
    python --version
    python -m pip --version
    If you do not have Python, find the installation steps for your operating system on the Installing Python page.
  2. On October 7, 2020, Dataflow will stop supporting pipelines using Python 2. Read more information on the Python 2 support on Google Cloud page.

  3. Setup and activate a Python virtual environment for this quickstart.

    After you complete the quickstart, you can deactivate the virtual environment by runningdeactivate.

Read more about using Python on Google Cloud on the Setting Up a Python Development Environment page.

Note: For best results, launch Python 3 pipelines with Apache Beam 2.16.0 or later. For a summary of recent Python 3 improvements in Apache Beam, see the Apache Beam issue tracker.

Get the Apache Beam SDK

The Apache Beam SDK is an open source programming model for data pipelines. You define these pipelines with an Apache Beam program and can choose a runner, such as Dataflow, to execute your pipeline.

Install the latest version of the Apache Beam SDK for Python by running the following command from a virtual environment:

pip install apache-beam[gcp]

Run WordCount locally

The WordCount example demonstrates a pipeline that performs the following steps:
  1. Takes a text file as input.
  2. Parses each line into words.
  3. Performs a frequency count on the tokenized words.

Run the wordcount module from the apache_beam package on your local machine with the following command:

python -m apache_beam.examples.wordcount \
  --output outputs
This text file is located in a Cloud Storage bucket with the resource name gs://dataflow-samples/shakespeare/kinglear.txt. To view the outputs, run the following command:
more outputs*

To exit, press the q key.

Executing your pipeline locally allows you to test and debug your Apache Beam program. You can view the wordcount.py source code on the Apache Beam GitHub.

Run WordCount on the Dataflow service

You can run the wordcount module from the apache_beam package on the Dataflow service by specifying DataflowRunner in the runner field and selecting a region where the pipeline will execute.

First, define your PROJECT, BUCKET, and REGION variables:

PROJECT=PROJECT_ID
BUCKET=GCS_BUCKET
REGION=DATAFLOW_REGION
Run this pipeline by running the following command:
python -m apache_beam.examples.wordcount \
  --region $REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://$BUCKET/results/outputs \
  --runner DataflowRunner \
  --project $PROJECT \
  --temp_location gs://$BUCKET/tmp/

View your results using GCP

When you run a pipeline using Dataflow, your results are located in a Cloud Storage bucket.

You can use the gsutil tool to view the results from your terminal.

List the output files by running:

gsutil ls -lh "gs://$BUCKET/results/outputs*"  
View the results in these files by running:
gsutil cat "gs://$BUCKET/results/outputs*"

To view your results from the monitoring UI:
  1. Open the Dataflow monitoring UI.
    Go to the Dataflow Web UI

    You should see your wordcount job with a status of Running at first, and then Succeeded:

    The Cloud Dataflow WordCount job with a status of Succeeded.
  2. Open the Cloud Storage Browser in the Google Cloud Console.
    Go to the Cloud Storage browser

    In the wordcount directory, you should see the output files that your job created:

    Results directory, with output files from the WordCount job.

Modifying pipeline code

The WordCount pipeline in the previous examples distinguishes between uppercase and lowercase words. The following walkthrough demonstrates how to modify the pipeline so that the WordCount pipeline is not case sensitive.
  1. Download the latest copy of the WordCount code from the Apache Beam GitHub repository.
  2. Run the pipeline on your local machine:
    python wordcount.py --output outputs
  3. View the results by running the following command:
    more outputs*
    To exit, press the q key.
  4. Open the file wordcount.py in the editor of your choice.
  5. Examine the pipeline steps inside the run function. After split, the lines are split into words as strings.
    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones))
  6. Modify the line after split to lowercase the strings.
    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'lowercase' >> beam.Map(unicode.lower)     # Add this line to modify the pipeline
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones)) 
    This modification maps the str.lower function onto every word. This line is equivalent to beam.Map(lambda word: str.lower(word)).
  7. Save the file and run the modified WordCount job on your local machine:
    python wordcount.py --output outputs
  8. View the results of the modified pipeline by running the following command:
    more outputs*
    To exit, press the q key.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this quickstart, follow these steps.

  1. In the Cloud Console, go to the Cloud Storage Browser page.

    Go to the Cloud Storage Browser page

  2. Click the checkbox for the bucket you want to delete.
  3. To delete the bucket, click Delete .

What's next

Apache Beam is a trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.