Quickstart using Python

This page shows you how to set up your Python development environment, get the Apache Beam SDK for Python, and run and modify an example pipeline.

Before you begin

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Select or create a GCP project.

    Go to the project selector page

  3. Make sure that billing is enabled for your Google Cloud Platform project. Learn how to enable billing.

  4. Enable the Cloud Dataflow, Compute Engine, Stackdriver Logging, Google Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs.

    Enable the APIs

  5. Set up authentication:
    1. In the GCP Console, go to the Create service account key page.

      Go to the Create Service Account Key page
    2. From the Service account list, select New service account.
    3. In the Service account name field, enter a name.
    4. From the Role list, select Project > Owner.

      Note: The Role field authorizes your service account to access resources. You can view and change this field later by using the GCP Console. If you are developing a production app, specify more granular permissions than Project > Owner. For more information, see granting roles to service accounts.
    5. Click Create. A JSON file that contains your key downloads to your computer.
  6. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the file path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again.

  7. Create a Cloud Storage bucket:
    1. In the GCP Console, go to the Cloud Storage Browser page.

      Go to the Cloud Storage Browser page

    2. Click Create bucket.
    3. In the Create bucket dialog, specify the following attributes:
      • Name: A unique bucket name. Do not include sensitive information in the bucket name, as the bucket namespace is global and publicly visible.
      • Default storage class: Standard
      • A location where bucket data will be stored.
    4. Click Create.

Set up your environment

    This quickstart uses a command prompt. If you don't have a command prompt readily available, you can use Cloud Shell. It has Python's package manager already installed, so you can skip step 1.

  1. Use the Apache Beam SDK for Python with pip and Python version 2.7, 3.5, 3.6 or 3.7. Check that you have a working Python and pip installation by running:
    python --version
    python -m pip --version
    If you do not have Python, find the installation steps for your operating system on the Installing Python page.
  2. Cython is not required, but if it is installed, you must have version 0.28.1 or above. To check your Cython version, run pip show cython.

  3. Install a Python virtual environment for initial experiments. If you do not have virtualenv version 13.1.0 or above, find the installation steps for your operating system on Installing and using the virtualenv tool.

    Run the following to set up and activate a new virtual environment:

    python -m virtualenv env
    source env/bin/activate
    Use a virtual environment for this quickstart. After you complete the quickstart, you can deactivate the virtual environment by running deactivate.
Read more about using Python on Google Cloud Platform on the Setting Up a Python Development Environment page.

Note: For best results, launch Python 3 pipelines with Apache Beam 2.16.0 or later. For a summary of recent Python 3 improvements in Apache Beam, see the Apache Beam issue tracker.

Get the Apache Beam SDK

The Apache Beam SDK is an open source programming model for data pipelines. You define these pipelines with an Apache Beam program and can choose a runner, such as Cloud Dataflow, to execute your pipeline.

Install the latest version of the Apache Beam SDK for Python by running the following command from a virtual environment:

pip install apache-beam[gcp]

Run WordCount locally

The WordCount example demonstrates a pipeline that performs the following steps:
  1. Takes a text file as input.
  2. Parses each line into words.
  3. Performs a frequency count on the tokenized words.

Run the wordcount module from the apache_beam package on your local machine with the following command:

python -m apache_beam.examples.wordcount \
  --output outputs
This text file is located in a Cloud Storage bucket with the resource name gs://dataflow-samples/shakespeare/kinglear.txt. To view the outputs, run the following command:
more outputs*

To exit, press the q key.

Executing your pipeline locally allows you to test and debug your Apache Beam program. You can view the wordcount.py source code on the Apache Beam GitHub.

Run WordCount on the Cloud Dataflow service

You can run the wordcount module from the apache_beam package on the Cloud Dataflow service by specifying DataflowRunner in the runner field.

First, define your PROJECT and GCS_BUCKET variables:

PROJECT=PROJECT_ID
BUCKET=GCS_BUCKET
Run this pipeline by running the following command:
python -m apache_beam.examples.wordcount \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://$BUCKET/wordcount/outputs \
  --runner DataflowRunner \
  --project $PROJECT \
  --temp_location gs://$BUCKET/tmp/

View your results using GCP

When you run a pipeline using Cloud Dataflow, your results are located in a Cloud Storage bucket.

You can use the gsutil tool to view the results from your terminal.

List the output files by running:

gsutil ls -lh "gs://$BUCKET/wordcount/outputs*"  
View the results in these files by running:
gsutil cat "gs://$BUCKET/wordcount/outputs*"

To view your results from the monitoring UI:
  1. Open the Cloud Dataflow monitoring UI.
    Go to the Cloud Dataflow Web UI

    You should see your wordcount job with a status of Running at first, and then Succeeded:

    The Cloud Dataflow WordCount job with a status of Succeeded.
  2. Open the Cloud Storage Browser in the Google Cloud Platform Console.
    Go to the Cloud Storage browser

    In the wordcount directory, you should see the output files that your job created:

    Results directory, with output files from the WordCount job.

Modifying pipeline code

The WordCount pipeline in the previous examples distinguishes between uppercase and lowercase words. The following walkthrough demonstrates how to modify the pipeline so that the WordCount pipeline is not case sensitive.
  1. Download the latest copy of the WordCount code from the Apache Beam GitHub repository.
  2. Run the pipeline on your local machine:
    python wordcount.py --output outputs
  3. View the results by running the following command:
    more outputs*
    To exit, press the q key.
  4. Open the file wordcount.py in the editor of your choice.
  5. Examine the pipeline steps inside the run function. After split, the lines are split into words as strings.
    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones))
  6. Modify the line after split to lowercase the strings.
    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'lowercase' >> beam.Map(unicode.lower)     # Add this line to modify the pipeline
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones)) 
    This modification maps the str.lower function onto every word. This line is equivalent to beam.Map(lambda word: str.lower(word)).
  7. Save the file and run the modified WordCount job on your local machine:
    python wordcount.py --output outputs
  8. View the results of the modified pipeline by running the following command:
    more outputs*
    To exit, press the q key.

Clean up

To avoid incurring charges to your GCP account for the resources used in this quickstart, follow these steps.

  1. In the GCP Console, go to the Cloud Storage Browser page.

    Go to the Cloud Storage Browser page

  2. Click the checkbox for the bucket you want to delete.
  3. Click Delete to delete the bucket.

What's next

Apache Beam is a trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.
Var denne siden nyttig? Si fra hva du synes:

Send tilbakemelding om ...

Trenger du hjelp? Gå til brukerstøttesiden vår.