This page shows you how to set up your Python development environment, get the Apache Beam SDK for Python, and run and modify an example pipeline.
Another option for learning how to create and run an Apache Beam pipeline is to interactively
develop one using an Apache Beam notebook.
If you already have a Google Cloud project set up, the wordcount
example pipeline in this quickstart is available as an example notebook.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.
- Enable the Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs.
-
Create a service account:
-
In the Cloud Console, go to the Create service account page.
Go to Create service account - Select a project.
-
In the Service account name field, enter a name. The Cloud Console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create.
-
Click the Select a role field.
Under Quick access, click Basic, then click Owner.
- Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the Cloud Console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again. - Create a Cloud Storage bucket:
- In the Cloud Console, go to the Cloud Storage Browser page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select the following: Standard.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
Set up your environment
This quickstart uses a command prompt. If you don't have a command prompt readily available, you can use Cloud Shell. Cloud Shell already has the package manager for Python installed, so you can skip step 1 in the following procedure.-
Use the Apache Beam SDK for Python with
pip
and Python version 3.6, 3.7, or 3.8. Check that you have a working Python andpip
installation by running the following command:python --version python -m pip --version
If you do not have Python, find the installation steps for your operating system on the Installing Python page. -
Setup and activate a
Python virtual environment for this quickstart.
After you complete the quickstart, you can deactivate the virtual environment by running
deactivate
.
Dataflow no longer supports pipelines using Python 2. Read more information on the Python 2 support on Google Cloud page.
Note: For best results, launch Python 3 pipelines with Apache Beam 2.16.0 or later. Apache Beam SDK version 2.24.0 was the last version to support Python 2 and Python 3.5. For a summary of recent Python 3 improvements in Apache Beam, see the Apache Beam issue tracker.
Get the Apache Beam SDK
The Apache Beam SDK is an open source programming model for data pipelines. You define these pipelines with an Apache Beam program and can choose a runner, such as Dataflow, to execute your pipeline.Install the latest version of the Apache Beam SDK for Python by running the following command from a virtual environment:
pip install 'apache-beam[gcp]'
Run WordCount locally
Thewordcount
example demonstrates a pipeline that performs the following steps:
- Takes a text file as input.
- Parses each line into words.
- Performs a frequency count on the tokenized words.
Run the wordcount
module from the apache_beam
package on your local
machine with the following command:
python -m apache_beam.examples.wordcount \ --output outputsThis text file is located in a Cloud Storage bucket with the resource name
gs://dataflow-samples/shakespeare/kinglear.txt
.
To view the output, run the following command:
more outputs*
To exit, press the q key.
Running the pipeline locally lets you to test and debug your Apache Beam program. You can view thewordcount.py
source code on Apache Beam GitHub.
Run WordCount on the Dataflow service
You can run thewordcount
module from the apache_beam
package on the
Dataflow service by specifying DataflowRunner
in the
runner
field and selecting a region for the pipleline to run.
First, define your PROJECT, BUCKET, and REGION variables:
PROJECT=PROJECT_ID BUCKET=GCS_BUCKET REGION=DATAFLOW_REGIONReplace the following:
PROJECT_ID
: a unique project ID.GCS_BUCKET
: a unique name for the Cloud Storage bucket.DATAFLOW_REGION
: the region where your project is located.
python -m apache_beam.examples.wordcount \ --region $REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://$BUCKET/results/outputs \ --runner DataflowRunner \ --project $PROJECT \ --temp_location gs://$BUCKET/tmp/
View your results
When you run a pipeline using Dataflow, your results are stored in a Cloud Storage bucket. You can use either the local terminal or Cloud Console to view the results.
To view the results from your terminal, use the gsutil
tool.
gsutil ls -lh "gs://$BUCKET/results/outputs*"View the results in the output files by running the following command in your terminal:
gsutil cat "gs://$BUCKET/results/outputs*"To view your results in Cloud Console, perform the following steps:
- Open the Dataflow monitoring page in Cloud Console.
Go to Dataflow jobsThe Jobs page lists the details of all the available jobs, including the status.
You should see your
wordcount
job with a Status of Running at first, and then Succeeded. - Open the Cloud Storage browser in Cloud Console.
Go to Cloud Storage browserThe Cloud Storage browser page displays the list of all the storage buckets in your project.
- Click the storage bucket that you created.
In the
wordcount
directory, you should see the output files that your job created.
Modify the pipeline code
Thewordcount
pipeline in the previous examples distinguishes between uppercase and lowercase words.
The following walkthrough demonstrates how to modify the pipeline so that the WordCount pipeline
is not case-sensitive.
- Download the latest copy of the
wordcount
code from the Apache Beam GitHub repository. - Run the pipeline on your local machine:
python wordcount.py --output outputs
- View the results by running the following command:
more outputs*
To exit, press the q key. - Open the file
wordcount.py
in the editor of your choice. - Examine the pipeline steps inside the
run
function. Aftersplit
, the lines are split into words as strings.counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWIthOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
- Modify the line after
split
to lowercase the strings.counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWIthOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
This modification maps thestr.lower
function onto every word. This line is equivalent tobeam.Map(lambda word: str.lower(word))
. - Save the file and run the modified
wordcount
job on your local machine:python wordcount.py --output outputs
- View the results of the modified pipeline by running the following command:
more outputs*
To exit, press the q key.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this quickstart, follow these steps.
- In the Cloud Console, go to the Browser page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete.
What's next
- Read about the Apache Beam programming model.
- Learn how to design and create your own pipeline.
- Work through the WordCount and Mobile Gaming examples.