This page shows you how to set up your Python development environment, get the Apache Beam SDK for Python, and run and modify an example pipeline.
Another option for learning how to create and run an Apache Beam pipeline is to interactively develop one using an Apache Beam notebook. If you already have a Google Cloud project set up, the WordCount example pipeline in this quickstart is available as an example notebook.
Before you begin
-
Sign in to your Google Account.
If you don't already have one, sign up for a new account.
-
In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.
- Enable the Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs.
-
Set up authentication:
-
In the Cloud Console, go to the Create service account key page.
Go to the Create Service Account Key page - From the Service account list, select New service account.
- In the Service account name field, enter a name.
From the Role list, select Project > Owner.
- Click Create. A JSON file that contains your key downloads to your computer.
-
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again. - Create a Cloud Storage bucket:
- In the Cloud Console, go to the Cloud Storage Browser page.
- Click Create bucket.
- In the Create bucket dialog, specify the following attributes:
- Name: A unique bucket name. Do not include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
- Default storage class: Standard
- A location where bucket data will be stored.
- Click Create.
Set up your environment
-
Use the Apache Beam SDK for Python with
pip
and Python version 3.6, 3.7, or 3.8. Check that you have a working Python andpip
installation by running:python --version python -m pip --version
If you do not have Python, find the installation steps for your operating system on the Installing Python page. -
Setup and activate a
Python virtual environment for this quickstart.
After you complete the quickstart, you can deactivate the virtual environment by running
deactivate
.
Dataflow no longer supports pipelines using Python 2. Read more information on the Python 2 support on Google Cloud page.
Note: For best results, launch Python 3 pipelines with Apache Beam 2.16.0 or later. Apache Beam SDK version 2.24.0 was the last version to support Python 2 and Python 3.5. For a summary of recent Python 3 improvements in Apache Beam, see the Apache Beam issue tracker.
Get the Apache Beam SDK
The Apache Beam SDK is an open source programming model for data pipelines. You define these pipelines with an Apache Beam program and can choose a runner, such as Dataflow, to execute your pipeline.Install the latest version of the Apache Beam SDK for Python by running the following command from a virtual environment:
pip install 'apache-beam[gcp]'
Run WordCount locally
The WordCount example demonstrates a pipeline that performs the following steps:- Takes a text file as input.
- Parses each line into words.
- Performs a frequency count on the tokenized words.
Run the wordcount
module from the apache_beam
package on your local
machine with the following command:
python -m apache_beam.examples.wordcount \ --output outputsThis text file is located in a Cloud Storage bucket with the resource name
gs://dataflow-samples/shakespeare/kinglear.txt
.
To view the outputs, run the following command:
more outputs*
To exit, press the q key.
Executing your pipeline locally allows you to test and debug your Apache Beam program. You can view thewordcount.py
source code on the
Apache Beam GitHub.
Run WordCount on the Dataflow service
You can run thewordcount
module from the apache_beam
package on the
Dataflow service by specifying DataflowRunner
in the
runner
field and selecting a region where the pipeline
will execute.
First, define your PROJECT, BUCKET, and REGION variables:
PROJECT=PROJECT_ID BUCKET=GCS_BUCKET REGION=DATAFLOW_REGIONRun this pipeline by running the following command:
python -m apache_beam.examples.wordcount \ --region $REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://$BUCKET/results/outputs \ --runner DataflowRunner \ --project $PROJECT \ --temp_location gs://$BUCKET/tmp/
View your results using GCP
When you run a pipeline using Dataflow, your results are located in a Cloud Storage bucket.
You can use the gsutil
tool to view the results
from your terminal.
List the output files by running:
gsutil ls -lh "gs://$BUCKET/results/outputs*"View the results in these files by running:
gsutil cat "gs://$BUCKET/results/outputs*"To view your results from the monitoring UI:
- Open the Dataflow monitoring UI.
Go to the Dataflow Web UIYou should see your wordcount job with a status of Running at first, and then Succeeded:
- Open the Cloud Storage Browser in the Google Cloud Console.
Go to the Cloud Storage browserIn the
wordcount
directory, you should see the output files that your job created:
Modifying pipeline code
The WordCount pipeline in the previous examples distinguishes between uppercase and lowercase words. The following walkthrough demonstrates how to modify the pipeline so that the WordCount pipeline is not case sensitive.- Download the latest copy of the
WordCount
code from the Apache Beam GitHub repository. - Run the pipeline on your local machine:
python wordcount.py --output outputs
- View the results by running the following command:
more outputs*
To exit, press the q key. - Open the file
wordcount.py
in the editor of your choice. - Examine the pipeline steps inside the
run
function. Aftersplit
, the lines are split into words as strings.counts = (lines | 'split' >> (beam.ParDo(WordExtractingDoFn()) .with_output_types(unicode)) | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | 'group' >> beam.GroupByKey() | 'count' >> beam.Map(count_ones))
- Modify the line after
split
to lowercase the strings.counts = (lines | 'split' >> (beam.ParDo(WordExtractingDoFn()) .with_output_types(unicode)) | 'lowercase' >> beam.Map(unicode.lower) # Add this line to modify the pipeline | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | 'group' >> beam.GroupByKey() | 'count' >> beam.Map(count_ones))
This modification maps thestr.lower
function onto every word. This line is equivalent tobeam.Map(lambda word: str.lower(word))
. - Save the file and run the modified WordCount job on your local machine:
python wordcount.py --output outputs
- View the results of the modified pipeline by running the following command:
more outputs*
To exit, press the q key.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this quickstart, follow these steps.
- In the Cloud Console, go to the Cloud Storage Browser page.
- Click the checkbox for the bucket you want to delete.
- To delete the bucket, click Delete delete.
What's next
- Read about the Apache Beam programming model.
- Learn how to design and create your own pipeline.
- Work through the WordCount and Mobile Gaming examples.