Develop with Apache Beam notebooks

Stay organized with collections Save and categorize content based on your preferences.

Using the Apache Beam interactive runner with JupyterLab notebooks lets you iteratively develop pipelines, inspect your pipeline graph, and parse individual PCollections in a read-eval-print-loop (REPL) workflow. These Apache Beam notebooks are made available through Vertex AI Workbench user-managed notebooks, a service that hosts notebook virtual machines pre-installed with the latest data science and machine learning frameworks.

This guide focuses on the functionality introduced by Apache Beam notebooks, but does not show how to build one. For more information on Apache Beam, see the Apache Beam programming guide.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  4. Enable the Compute Engine, Notebooks APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  7. Enable the Compute Engine, Notebooks APIs.

    Enable the APIs

When you finish this guide, you can avoid continued billing by deleting the resources you created. For more details, see Cleaning up.

Launch an Apache Beam notebook instance

  1. In the Google Cloud console, go to the Dataflow Workbench page.

    Go to workbench

  2. Make sure that you are on the User-managed notebooks tab.

  3. In the toolbar, click New notebook.

  4. Select Apache Beam > Without GPUs.

  5. (Optional) If you want to run notebooks on a GPU, you can select Apache Beam > With 1 NVIDIA Tesla T4.

  6. On the New notebook page, select a subnetwork for the notebook VM.

  7. If you choose to create the notebook instance with a GPU, on the New notebook page, select Install NVIDIA GPU driver automatically for me.

  8. (Optional) If you want to set up a custom notebook instance, click Advanced options. For more information about customizing instance properties, see Create a user-managed notebooks instance with specific properties.

  9. Click Create.

  10. When the link becomes active, click Open JupyterLab. Vertex AI Workbench creates a new Apache Beam notebook instance.

Install dependencies (Optional)

Apache Beam notebooks already come with Apache Beam and Google Cloud connector dependencies installed. If your pipeline contains custom connectors or custom PTransforms that depend on third-party libraries, you can install them after you create a notebook instance. For more information, see Install dependencies in the user-managed notebooks documentation.

Get started with Apache Beam notebooks

After opening a user-managed notebooks instance, example notebooks are available in the Examples folder. The following are currently available:

  • Word Count
  • Streaming Word Count
  • Streaming NYC Taxi Ride Data
  • Dataflow Word Count
  • Apache Beam SQL in notebooks
  • Interactive Flink at Scale
  • Use GPUs with Apache Beam
  • Visualize Data

Additional tutorials explaining the fundamentals of Apache Beam are available in the Tutorials folder. The following are currently available:

  • Basic Operations
  • Element Wise Operations
  • Aggregations
  • Windows
  • IO Operations
  • Streaming

These notebooks include explanatory text and commented code blocks to help you understand Apache Beam concepts and API usage. The Tutorials also provide hands-on exercises for you to practice concepts learned.

Create a notebook instance

Navigate to File > New > Notebook and select a kernel that is Apache Beam 2.22 or later.

Apache Beam is installed on your notebook instance, so include the interactive_runner and interactive_beam modules in your notebook.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

If your notebook uses other Google APIs, add the following import statements:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Set interactivity options

The following sets the amount of time the InteractiveRunner records data from an unbounded source. In this example, the duration is set to 10 minutes.

ib.options.recording_duration = '10m'

You can also change the recording size limit (in bytes) for an unbounded source through the recording_size_limit property.

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9

For additional interactive options, see the interactive_beam.options class.

Create your pipeline

Initialize the pipeline using an InteractiveRunner object.

options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

p = beam.Pipeline(InteractiveRunner(), options=options)

Read and visualize the data

The following example shows an Apache Beam pipeline that creates a subscription to the given Pub/Sub topic and reads from the subscription.

words = p
    | "read" >>"projects/pubsub-public-data/topics/shakespeare-kinglear")

The pipeline counts the words by windows from the source. It creates fixed windowing with each window being 10 seconds in duration.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

After the data is windowed, the words are counted by window.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

The show() method visualizes the resulting PCollection in the notebook., include_window_info=True)

The show method visualizing a PCollection in tabular form.

You can scope the result set back from show() by setting two optional parameters: n and duration. Setting n limits the result set to show at most n number of elements, such as 20. If n is not set, the default behavior is to list the most recent elements captured until the source recording is over. Setting duration limits the result set to a specified number of seconds worth of data starting from the beginning of the source recording. If duration is not set, the default behavior is to list all elements until the recording is over.

If both optional parameters are set, show() stops whenever either threshold is met. In the following example, show() returns at most 20 elements that are computed based on the first 30 seconds worth of data from the recorded sources., include_window_info=True, n=20, duration=30)

To display visualizations of your data, pass visualize_data=True into the show() method. You can apply multiple filters to your visualizations. The following visualization allows you to filter by label and axis:

The show method visualizing a PCollection as a rich set of filterable UI elements.

Another useful visualization in Apache Beam notebooks is a Pandas DataFrame. The following example first converts the words to lowercase and then computes the frequency of each word.

windowed_lower_word_counts = (windowed_words
   | beam.Map(lambda word: word.lower())
   | "count" >> beam.combiners.Count.PerElement())

The collect() method provides the output in a Pandas DataFrame.

ib.collect(windowed_lower_word_counts, include_window_info=True)

The collect method representing a PCollection in a Pandas DataFrame.

Visualize the data through the Interactive Beam inspector

You might find it distracting to introspect the data of a PCollection by constantly calling show() and collect(), especially when the output takes up a lof of the space on your screen and makes it hard to navigate through the notebook. You might also want to compare multiple PCollections side by side to validate if a transform works as intended, for example, when one PCollection goes through a transform and produces the other. For these use cases, the Interactive Beam inspector is a more convenient solution.

Interactive Beam inspector is provided as a JupyterLab extension apache-beam-jupyterlab-sidepanel that has been preinstalled in the Apache Beam notebook. With the extension, you can interactively inspect the state of pipelines and data associated with each PCollection without explicitly invoking show() or collect().

There are 3 ways to open the inspector:

  • Click Interactive Beam on the top menu bar of JupyterLab. In the dropdown, locate Open Inspector and click it to open the inspector.

    Open inspector through menu

  • Use the launcher page. If there is no launcher page opened, click File -> New Launcher to open it. On the launcher page, locate Interactive Beam and click Open Inspector to open the inspector.

    Open inspector through launcher

  • Use the command palette. Click View -> Activate Command Palette on the top menu bar of JupyterLab. In the popup, search Interactive Beam to list all options of the extension. Click Open Inspector to open the inspector.

    Open inspector through command palette

When the inspector is about to open:

  1. If there is exactly 1 notebook opened, the inspector automatically connects to it.

  2. Otherwise, a dialog appears so that you can select a kernel (when no notebook is opened) or the notebook session (when multiple notebooks are opened) to connect to.

    Select the notebook to connect to

You can open multiple inspectors for an opened notebook and arrange the inspectors by dragging and dropping their tabs freely in the workspace.

Open 2 inspectors and arrange them side by side

The inspector page automatically refreshes itself as you execute cells in the notebook. On the left side, it lists pipelines and PCollections defined in the connected notebook. PCollections are organized by the pipelines they belong to and can be collapsed by clicking the header pipeline.

For the items in the pipelines and PCollections list, on click, the inspector renders corresponding visualizations on the right side:

  • If it's a PCollection, the inspector renders its data (dynamically if the data is still coming in for unbounded PCollections) with additional widgets to tune the visualization after clicking the APPLY button.

    Inspector page

  • If it's a pipeline, the inspector displays the pipeline graph.

    Inspector page

You might notice there are anonymous pipelines. Those are pipelines with PCollections that you can access, but they are no longer referenced by the main session. For example:

p = beam.Pipeline()
pcoll = p | beam.Create([1, 2, 3])

p = beam.Pipeline()

The above example creates an empty pipeline p and an anonymous pipeline that contains 1 PCollection pcoll. You can still access the anonymous pipeline through pcoll.pipeline.

The pipeline and PCollection list on the left can be toggled to save space for big visualizations. Toggel left list

Understand a pipeline's recording status

In addition to visualizations, you can also inspect the recording status for one or all pipelines in your notebook instance by calling describe.

# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.

The describe() method provides the following details:

  • Total size (in bytes) of all of the recordings for the pipeline on disk
  • Start time of when the background recording job started (in seconds from Unix epoch)
  • Current pipeline status of the background recording job
  • Python variable for the pipeline

Launch Dataflow jobs from a pipeline created in your notebook

  1. (Optional) Before using your notebook to run Dataflow jobs, restart the kernel, rerun all cells, and verify the output. If you skip this step, hidden states in the notebook might affect the job graph in the pipeline object.
  2. Enable the Dataflow API.
  3. Add the following import statement:

    from apache_beam.runners import DataflowRunner
  4. Pass in your pipeline options.

    # Set up Apache Beam pipeline options.
    options = pipeline_options.PipelineOptions()
    # Set the project to the default project in your current Google Cloud
    # environment.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    # Set the Google Cloud region to run Dataflow.
    options.view_as(GoogleCloudOptions).region = 'us-central1'
    # Choose a Cloud Storage location.
    dataflow_gcs_location = 'gs://<change me>/dataflow'
    # Set the staging location. This location is used to stage the
    # Dataflow pipeline and SDK binary.
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    # Set the temporary location. This location is used to store temporary files
    # or intermediate results before outputting to the sink.
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
    # If and only if you are using Apache Beam SDK built from source code, set
    # the SDK location. This is used by Dataflow to locate the SDK
    # needed to run the pipeline.
    options.view_as(pipeline_options.SetupOptions).sdk_location = (
        '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %

    You can adjust the parameter values. For example, you can change the region value from us-central1.

  5. Run the pipeline with DataflowRunner. This runs your job on the Dataflow service.

    runner = DataflowRunner()
    runner.run_pipeline(p, options=options)

    p is a pipeline object from Creating your pipeline.

For an example on how to perform this conversion on an interactive notebook, see the Dataflow Word Count notebook in your notebook instance.

Alternatively, you can export your notebook as an executable script, modify the generated .py file using the previous steps, and then deploy your pipeline to the Dataflow service.

Save your notebook

Notebooks you create are saved locally in your running notebook instance. If you reset or shut down the notebook instance during development, those new notebooks are persisted as long as they are created under the /home/jupyter directory. However, if a notebook instance is deleted, those notebooks are also deleted.

To keep your notebooks for future use, download them locally to your workstation, save them to GitHub, or export them to a different file format.

Save your notebook to additional persistent disks

If you want to keep your work such as notebooks and scripts throughout various notebook instances, you can store them in Persistent Disk.

  1. Create or attach a Persistent Disk. Follow the instructions to use ssh to connect to the VM of the notebook instance and issue commands in the opened Cloud Shell.

  2. Note the directory where the Persistent Disk is mounted, for example, /mnt/myDisk.

  3. Edit the VM details of the notebook instance to add an entry to the Custom metadata: key - container-custom-params; value - -v /mnt/myDisk:/mnt/myDisk. Additional metadata needed to bind the mounted PD

  4. Click Save.

  5. To update these changes, reset the notebook instance. Reset a notebook instance

  6. When the link becomes active after the reset, click Open JupyterLab. It might take a while for the JupyterLab UI to become available. Once the UI shows up, open a terminal and run the following command: ls -al /mnt The /mnt/myDisk directory should be listed. List volume bound

Now you can save your work to the /mnt/myDisk directory. Even if the notebook instance is deleted, the Persistent Disk still exists under your project. You can then attach this Persistent Disk to other notebook instances.

Clean up

After you've finished using your Apache Beam notebook instance, clean up the resources you created on Google Cloud by shutting down the notebook instance.

Advanced Features

Interactive FlinkRunner on notebook-managed clusters

To work with production sized data interactively from the notebook, you can use the FlinkRunner with some generic pipeline options to tell the notebook session to manage a long-lasting Dataproc cluster and execute your Beam pipelines distributedly.


To use this feature:

  • Dataproc API needs to be enabled.
  • The service account running the notebook instance needs to have admin or editor roles for Dataproc.
  • Use a notebook kernel with Beam version >= 2.40.0.


The minimum setup:

# Set a Cloud Storage bucket to cache source recording and PCollections.
# By default, the cache is on the notebook instance itself, but that does not
# apply to the distributed execution scenario.
ib.options.cache_root = 'gs://<CHANGE_ME>/flink'

# Define an InteractiveRunner that uses the FlinkRunner under the hood.
interactive_flink_runner = InteractiveRunner(underlying_runner=FlinkRunner())

options = PipelineOptions()
# Instruct the notebook that Google Cloud is used to run the FlinkRunner.
cloud_options = options.view_as(GoogleCloudOptions)
cloud_options.project = '<YOUR-PROJECT>'

(Optional) Explicit provision

More options:

# Change this if the pipeline needs to be executed in a different region
# than the default 'us-central1'. For example, to set it to 'us-west1':
cloud_options.region = 'us-west1'

# Explicitly provision the notebook-managed cluster.
worker_options = options.view_as(WorkerOptions)
# Provision 40 workers to run the pipeline.
# Use the default subnetwork.
# Choose the machine type for the workers.

# When working with non-official Beam releases, e.g. Beam built from source
# code, configure the environment to use a compatible released SDK container.
# If needed, build a custom container and use it (to learn more, see:
options.view_as(PortableOptions).environment_config = 'apache/beam_python3.7_sdk:2.41.0 or'


# The parallelism is applied to each step, so if your pipeline has 10 steps, you
# end up having 10 * 10 = 100 tasks scheduled that can be executed in parallel.
options.view_as(FlinkRunnerOptions).parallelism = 10

p_word_count = beam.Pipeline(interactive_flink_runner, options=options)
word_counts = (
    | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
    | 'count' >> beam.combiners.Count.PerElement())
# The notebook session automatically starts and manages a cluster to execute
# your pipelines for execution with the FlinkRunner.

# Interactively adjust the parallelism.
options.view_as(FlinkRunnerOptions).parallelism = 150
# The BigQuery read needs a Cloud Storage bucket as a temporary location.
options.view_as(GoogleCloudOptions).temp_location = ib.options.cache_root
p_bq = beam.Pipeline(runner=interactive_flink_runner, options=options)
delays_by_airline = (
    | 'Read Dataset from BigQuery' >>
        project=project, use_standard_sql=True,
        query=('SELECT airline, arrival_delay '
               'FROM `` '
               'WHERE date >= "2010-01-01"'))
    | 'Rebalance Data to TM Slots' >> beam.Reshuffle(num_buckets=1000)
    | 'Extract Delay Info' >> beam.Map(
        lambda e: (e['airline'], e['arrival_delay'] > 0))
    | 'Filter Delayed' >> beam.Filter(lambda e: e[1])
    | 'Count Delayed Flights Per Airline' >> beam.combiners.Count.PerKey())
# This reuses the existing cluster.

# Describe the cluster running the pipelines.
# You can access the Flink Dashboard from the printed link.

# Cleans up all long-lasting clusters managed by the notebook session.

Notebook-Managed Clusters

  • By default, Interactive Beam always reuses the most recently used cluster to run a pipeline with the FlinkRunner if no pipeline options are given.
    • To avoid this behavior, e.g. to run another pipeline in the same notebook session with a FlinkRunner not hosted by the notebook, run ib.clusters.set_default_cluster(None).
  • When instantiating a new pipeline that uses a project, region, and provisioning configuration that map to an existing Dataproc cluster, Dataflow also reuses the cluster (though it might not be the most recently used cluster).
  • However, whenever a provisioning change (e.g. resizing a cluster) is given, a new cluster is created to actuate the desired change. Avoid exhausting cloud resources by cleaning up unnecessary clusters through ib.clusters.cleanup(pipeline) if you intend to resize a cluster.
  • When a Flink master_url is specified, if it belongs to a cluster that is managed by the notebook session, Dataflow reuses the managed cluster.
    • If the master_url is unknown to the notebook session, it means that a user-self-hosted FlinkRunner is desired; the notebook won't do anything implicitly.

Notes and FAQs

For simplicity, the Flink network buffer configuration is not exposed for configuration.

If your job graph is too complicated or your parallelism is set too high, the cardinality of steps * parallelism can be too big, cause too many tasks to be scheduled in parallel, and fail the execution.

A few tips to improve the velocity of interactive executions:

  • Only assign the PCollection you want to inspect to a variable;
  • Inspect PCollections one by one;
  • Use Reshuffle after high fanout transforms;
  • Adjust parallelism based on the data size (sometimes smaller is faster).
It takes too long to inspect the data

Check the Flink dashboard for the running job. You might see a step where hundreds of tasks have finished and only one remains because in-flight data resides on a single machine and is not shuffled.

Always use Reshuffle after a high fanout transform such as:

  • Reading rows from a file
  • Reading rows from a BigQuery table

Without Reshuffle, fanout data will always be executed on the same task/worker and you cannot take advantage of parallelism no matter how high you set it.

How many workers do I need?

As a rule of thumb, the Flink cluster has about vCPU * #TMs slots. Say you have 40 n1-highmem-8 workers, the Flink cluster has at most 8 * 40 = 320 slots.

It should be able to handle a READ->MAP->COMBINE job with parallelism set in the hundreds that would end up scheduling thousands of tasks in parallel.

Does it work with streaming?

Streaming pipelines are not compatible with the interactive Flink on notebook-managed cluster feature at the moment.

Beam SQL and beam_sql magic

Beam SQL allows you to query bounded and unbounded PCollections with SQL statements. If you're working in an Apache Beam notebook, you can use the IPython custom magic beam_sql to speed up your pipeline development.

You can check the beam_sql magic usage with the -h or --help option: Check the beam_sql help

You can create a PCollection from constant values: Create PCollection from constant values

You can join multiple PCollections: Join multiple PCollections

You can launch a Dataflow job with the -r DataflowRunner or --runner DataflowRunner option: Launch Dataflow job with Beam SQL

To learn more, see the example notebook Apache Beam SQL in notebooks.

Accelerate using JIT compiler and GPU

You can use libraries such as numba and GPUs to accelerate your Python code and Apache Beam pipelines. In the Apache Beam notebook instance created with an nvidia-tesla-t4 GPU, you can compile your Python code with numba.cuda.jit to run on the GPU. Optionally, you can compile your Python code into machine code with numba.jit or numba.njit to speed up the execution on CPUs.

A DoFn that processes on GPUs:

class Sampler(beam.DoFn):
    def __init__(self, blocks=80, threads_per_block=64):
        # Uses only 1 cuda grid with below config.
        self.blocks = blocks
        self.threads_per_block = threads_per_block

    def setup(self):
        import numpy as np
        # An array on host as the prototype of arrays on GPU to
        # hold accumulated sub count of points in the circle.
        self.h_acc = np.zeros(
            self.threads_per_block * self.blocks, dtype=np.float32)

    def process(self, element: Tuple[int, int]):
        from numba import cuda
        from numba.cuda.random import create_xoroshiro128p_states
        from numba.cuda.random import xoroshiro128p_uniform_float32

        def gpu_monte_carlo_pi_sampler(rng_states, sub_sample_size, acc):
            """Uses GPU to sample random values and accumulates the sub count
            of values within a circle of radius 1.
            pos = cuda.grid(1)
            if pos < acc.shape[0]:
                sub_acc = 0
                for i in range(sub_sample_size):
                    x = xoroshiro128p_uniform_float32(rng_states, pos)
                    y = xoroshiro128p_uniform_float32(rng_states, pos)
                    if (x * x + y * y) <= 1.0:
                        sub_acc += 1
                acc[pos] = sub_acc

        rng_seed, sample_size = element
        d_acc = cuda.to_device(self.h_acc)
        sample_size_per_thread = sample_size // self.h_acc.shape[0]
        rng_states = create_xoroshiro128p_states(self.h_acc.shape[0], seed=rng_seed)
        gpu_monte_carlo_pi_sampler[self.blocks, self.threads_per_block](
            rng_states, sample_size_per_thread, d_acc)
        yield d_acc.copy_to_host()

Running on a GPU: Run DoFn on GPU

More details can be found in the example notebook Use GPUs with Apache Beam.

Build a Custom Container

In most cases, if your pipeline doesn't require additional Python dependencies or executables, Apache Beam can automatically use its official container images to execute your user-defined code. These images come with many common Python modules, and you don't have to build or explicitly specify them.

But in some cases, you may have extra Python dependencies or even non-Python dependencies. In these scenarios, you can build a custom container and make it available to the Flink cluster for execution. The advantages of using a custom container are:

  • Faster setup time for consecutive/interactive executions
  • Stable configurations/dependencies
  • More flexibility: you can set up more than Python dependencies

The container build process can be tedious, but the good news is that you can do everything in the notebook following the usage pattern below.

Create a local workspace

First create a local work directory under the Jupyter home directory.

!mkdir -p /home/jupyter/.flink

Prepare Python dependencies

Then install all extra Python dependencies you might use and export them into a requirements file.

%pip install dep_a
%pip install dep_b

You can explicitly create a requirements file with the %%writefile notebook magic.

%%writefile /home/jupyter/.flink/requirements.txt

Or you can freeze all local dependencies into a requirements file (this might introduce unintended dependencies).

%pip freeze > /home/jupyter/.flink/requirements.txt

Prepare your non-Python dependencies

Copy all non-Python dependencies to the workspace (skip this step if you don't have any).

!cp /path/to/your-dep /home/jupyter/.flink/your-dep

Create a Dockerfile

Create a Dockerfile with the %%writefile notebook magic. For example:

%%writefile /home/jupyter/.flink/Dockerfile
FROM apache/beam_python3.7_sdk:2.40.0

COPY  requirements.txt /tmp/requirements.txt
COPY  your_dep /tmp/your_dep

RUN python -m pip install -r /tmp/requirements.txt

The example container uses the image of Beam v2.40.0 with Python3.7 as the base, adds a your_dep file, and installs the extra Python dependencies. Use this Dockerfile as a template and edit it for your use case.

In your Beam pipelines, when referring to non-Python dependencies, use their COPY destinations. For example, /tmp/your_dep is the file path of the your_dep file.

Build Container Image with Cloud Build

Finally, you can use Cloud Build to build the container image and save it to the Container Registry.

# Enable the Cloud Build service if not enabled.
!gcloud services enable

# Enable the Container Registry service if not enabled.
!gcloud services enable

!cd /home/jupyter/.flink \
 && gcloud builds submit \
     --tag$(gcloud config get-value project)/flink:latest \

Use Custom Containers

There are different ways to use custom containers depending on the runner.

For general Apache Beam container usage, see:

For Dataflow container usage, see: