Developing with Apache Beam notebooks

Using the Apache Beam interactive runner with JupyterLab notebooks lets you iteratively develop pipelines, inspect your pipeline graph, and parse individual PCollections in a read-eval-print-loop (REPL) workflow. These Apache Beam notebooks are made available through Notebooks, a managed service that hosts notebook virtual machines pre-installed with the latest data science and machine learning frameworks.

This guide focuses on the functionality introduced by Apache Beam notebooks, but does not show how to build one. For more information on Apache Beam, see the Apache Beam programming guide.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  4. Enable the Compute Engine, Notebooks APIs.

    Enable the APIs

  5. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  7. Enable the Compute Engine, Notebooks APIs.

    Enable the APIs

When you finish this guide, you can avoid continued billing by deleting the resources you created. For more details, see Cleaning up.

Launching an Apache Beam notebooks instance

  1. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.
  2. Navigate to Dataflow in the side panel and click Notebooks.
  3. In the toolbar, click New Instance.
  4. Select Apache Beam > Without GPUs.
  5. (Optional) If you want to run notebooks on a GPU, you can select Apache Beam > With 1 NVIDIA Tesla T4.
  6. On the New notebook instance page, select a network for the notebook VM and click Create.
  7. (Optional) If you choose to create the notebook instance with a GPU, on the New notebook instance page, you have to check the Install NVIDIA GPU driver automatically for me option before clicking Create.
  8. (Optional) If you want to set up a custom notebook instance, click Customize. For more information on customizing instance properties, see Create an Notebooks instance with specific properties.
  9. Click Open JupyterLab when the link becomes active. Notebooks creates a new Apache Beam notebook instance.

Installing dependencies (Optional)

Apache Beam notebooks already come with Apache Beam and Google Cloud connector dependencies installed. If your pipeline contains custom connectors or custom PTransforms that depend on third-party libraries, you can install them after you create a notebook instance. For more information, see Installing Dependencies in the Notebooks documentation.

Getting started with Apache Beam notebooks

After opening an Notebooks instance, example notebooks are available in the Examples folder. The following are currently available:

  • Word Count
  • Streaming Word Count
  • Streaming NYC Taxi Ride Data
  • Dataflow Word Count
  • Use GPUs with Apache Beam
  • Visualize Data

Additional tutorials explaining the fundamentals of Apache Beam are available in the Tutorials folder. The following are currently available:

  • Basic Operations
  • Element Wise Operations
  • Aggregations
  • Windows
  • IO Operations
  • Streaming

These notebooks include explanatory text and commented code blocks to help you understand Apache Beam concepts and API usage. The Tutorials also provide hands-on exercises for you to practice concepts learned.

Creating a notebook instance

Navigate to File > New > Notebook and select a kernel that is Apache Beam 2.22 or later.

Apache Beam is installed on your notebook instance, so include the interactive_runner and interactive_beam modules in your notebook.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

If your notebook uses other Google APIs, add the following import statements:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Setting interactivity options

The following sets the amount of time the InteractiveRunner records data from an unbounded source. In this example, the duration is set to 10 minutes.

ib.options.recording_duration = '10m'

You can also change the recording size limit (in bytes) for an unbounded source through the recording_size_limit property.

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9

For additional interactive options, see the interactive_beam.options class.

Creating your pipeline

Initialize the pipeline using an InteractiveRunner object.

options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

p = beam.Pipeline(InteractiveRunner(), options=options)

Reading and visualizing the data

The following example shows an Apache Beam pipeline that creates a subscription to the given Pub/Sub topic and reads from the subscription.

words = p
    | "read" >>"projects/pubsub-public-data/topics/shakespeare-kinglear")

The pipeline counts the words by windows from the source. It creates fixed windowing with each window being 10 seconds in duration.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

After the data is windowed, the words are counted by window.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

The show() method visualizes the resulting PCollection in the notebook., include_window_info=True)

The show method visualizing a PCollection in tabular form.

You can scope the result set back from show() by setting two optional parameters: n and duration. Setting n limits the result set to show at most n number of elements, such as 20. If n is not set, the default behavior is to list the most recent elements captured until the source recording is over. Setting duration limits the result set to a specified number of seconds worth of data starting from the beginning of the source recording. If duration is not set, the default behavior is to list all elements until the recording is over.

If both optional parameters are set, show() stops whenever either threshold is met. In the following example, show() returns at most 20 elements that are computed based on the first 30 seconds worth of data from the recorded sources., include_window_info=True, n=20, duration=30)

To display visualizations of your data, pass visualize_data=True into the show() method. You can apply multiple filters to your visualizations. The following visualization allows you to filter by label and axis:

The show method visualizing a PCollection as a rich set of filterable UI elements.

Another useful visualization in Apache Beam notebooks is a Pandas DataFrame. The following example first converts the words to lowercase and then computes the frequency of each word.

windowed_lower_word_counts = (windowed_words
   | beam.Map(lambda word: word.lower())
   | "count" >> beam.combiners.Count.PerElement())

The collect() method provides the output in a Pandas DataFrame.

ib.collect(windowed_lower_word_counts, include_window_info=True)

The collect method representing a PCollection in a Pandas DataFrame.

Understanding a pipeline's recording status

In addition to visualizations, you can also inspect the recording status for one or all pipelines in your notebook instance by calling describe.

# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.

The describe() method provides the following details:

  • Total size (in bytes) of all of the recordings for the pipeline on disk
  • Start time of when the background recording job started (in seconds from Unix epoch)
  • Current pipeline status of the background recording job
  • Python variable for the pipeline

Launching Dataflow jobs from a pipeline created in your notebook

  1. (Optional) Before using your notebook to run Dataflow jobs, restart the kernel, rerun all cells, and verify the output. If you skip this step, hidden states in the notebook might affect the job graph in the pipeline object.
  2. Enable the Dataflow API.
  3. Add the following import statement:

    from apache_beam.runners import DataflowRunner
  4. Pass in your pipeline options.

    # Set up Apache Beam pipeline options.
    options = pipeline_options.PipelineOptions()
    # Set the project to the default project in your current Google Cloud
    # environment.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    # Set the Google Cloud region to run Dataflow.
    options.view_as(GoogleCloudOptions).region = 'us-central1'
    # Choose a Cloud Storage location.
    dataflow_gcs_location = 'gs://<change me>/dataflow'
    # Set the staging location. This location is used to stage the
    # Dataflow pipeline and SDK binary.
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    # Set the temporary location. This location is used to store temporary files
    # or intermediate results before outputting to the sink.
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
    # Set the SDK location. This is used by Dataflow to locate the
    # SDK needed to run the pipeline.
    options.view_as(pipeline_options.SetupOptions).sdk_location = (
        '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %

    You can adjust the parameter values. For example, you can change the region value from us-central1.

  5. Run the pipeline with DataflowRunner. This runs your job on the Dataflow service.

    runner = DataflowRunner()
    runner.run_pipeline(p, options=options)

    p is a pipeline object from Creating your pipeline.

For an example on how to perform this conversion on an interactive notebook, see the Dataflow Word Count notebook in your notebook instance.

Alternatively, you can export your notebook as an executable script, modify the generated .py file using the previous steps, and then deploy your pipeline to the Dataflow service.

Saving your notebook

Notebooks you create are saved locally in your running notebook instance. If you reset or shut down the notebook instance during development, those new notebooks are persisted as long as they are created under the /home/jupyter directory. However, if a notebook instance is deleted, those notebooks are also deleted.

To keep your notebooks for future use, download them locally to your workstation, save them to GitHub, or export them to a different file format.

Saving your notebook to additional persistent disks

If you want to keep your work such as notebooks and scripts throughout various notebook instances, you can store them in Persistent Disk.

  1. Create or attach a Persistent Disk. Follow the instructions to use ssh to connect to the VM of the notebook instance and issue commands in the opened Cloud Shell.

  2. Note the directory where the Persistent Disk is mounted, for example, /mnt/myDisk.

  3. Edit the VM details of the notebook instance to add an entry to the Custom metadata: key - container-custom-params; value - -v /mnt/myDisk:/mnt/myDisk. Additional metadata needed to bind the mounted PD

  4. Click Save.

  5. To update these changes, reset the notebook instance. Reset a notebook instance

  6. When the link becomes active after the reset, click Open JupyterLab. It might take a while for the JupyterLab UI to become available. Once the UI shows up, open a terminal and run the following command: ls -al /mnt The /mnt/myDisk directory should be listed. List volume bound

Now you can save your work to the /mnt/myDisk directory. Even if the notebook instance is deleted, the Persistent Disk still exists under your project. You can then attach this Persistent Disk to other notebook instances.

Cleaning up

After you've finished using your Apache Beam notebook instance, clean up the resources you created on Google Cloud by shutting down the notebook instance.