Using the Apache Beam interactive runner with JupyterLab notebooks lets you iteratively develop pipelines, inspect your pipeline graph, and parse individual PCollections in a read-eval-print-loop (REPL) workflow. These Apache Beam notebooks are made available through AI Platform Notebooks, a managed service that hosts notebook virtual machines pre-installed with the latest data science and machine learning frameworks.
This guide focuses on the functionality introduced by Apache Beam notebooks, but does not show how to build one. For more information on Apache Beam, see the Apache Beam programming guide.
Before you begin
Sign in to your Google Account.
If you don't already have one, sign up for a new account.
In the Cloud Console, on the project selector page, select or create a Cloud project.
Make sure that billing is enabled for your Google Cloud project. Learn how to confirm billing is enabled for your project.
- Enable the Compute Engine API.
When you finish this guide, you can avoid continued billing by deleting the resources you created. For more details, see Cleaning up.
Launching an Apache Beam notebooks instance
- In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.
- Navigate to Dataflow in the side panel and click Notebooks.
- In the toolbar, click New Instance.
- Select Apache Beam.
- On the New notebook instance page, select a network for the notebook VM and click Create.
- (Optional) If you want to set up a custom notebook instance, click Customize. For more information on customizing instance properties, see Create an AI Platform Notebooks instance with specific properties.
- Click Open JupyterLab when the link becomes active. AI Platform Notebooks creates a new Apache Beam notebook instance.
Installing dependencies (Optional)
Apache Beam notebooks already come with Apache Beam and Google Cloud connector dependencies installed. If your pipeline contains custom connectors or custom PTransforms that depend on third-party libraries, you can install them after you create a notebook instance. For more information, see Installing Dependencies in the AI Platform Notebooks documentation.
Getting started with Apache Beam notebooks
After opening an AI Platform Notebooks instance, example notebooks are available in the Examples folder. The following are currently available:
- Word Count
- Streaming Word Count
- Streaming NYC Taxi Ride Data
- Dataflow Word Count
These notebooks include explanatory text and commented code blocks to help you understand Apache Beam concepts and API usage.
Creating a notebook instance
Navigate to File > New > Notebook and select a kernel that is Apache Beam 2.22 or later.
Apache Beam is installed on your notebook instance, so include the
interactive_beam modules in your notebook.
import apache_beam as beam from apache_beam.runners.interactive.interactive_runner import InteractiveRunner import apache_beam.runners.interactive.interactive_beam as ib
If your notebook uses other Google APIs, add the following import statements:
from apache_beam.options import pipeline_options from apache_beam.options.pipeline_options import GoogleCloudOptions import google.auth
Setting interactivity options
The following sets the data capture duration to 60 seconds.
ib.options.capture_duration = timedelta(seconds=60)
For additional interactive options, see the interactive_beam.options class.
Creating your pipeline
Initialize the pipeline using an
options = pipeline_options.PipelineOptions() # Set the pipeline mode to stream the data from Pub/Sub. options.view_as(pipeline_options.StandardOptions).streaming = True p = beam.Pipeline(InteractiveRunner(), options=options)
Reading and visualizing the data
The following example shows a Apache Beam pipeline that creates a subscription to the given Pub/Sub topic and reads from the subscription.
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
The pipeline counts the words by windows from the source. It creates fixed windowing with each window being 10 seconds in duration.
windowed_words = (words | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
After the data is windowed, the words are counted by window.
windowed_words_counts = (windowed_words | "count" >> beam.combiners.Count.PerElement())
show() method visualizes the resulting PCollection in the notebook.
To display visualizations of your data, pass
visualize_data=True into the
show() method. You can apply multiple filters to your visualizations. The
following visualization allows you to filter by label and axis:
Another useful visualization in Apache Beam notebooks is a Pandas DataFrame. The following example first converts the words to lowercase and then computes the frequency of each word.
windowed_lower_word_counts = (windowed_words | beam.Map(lambda word: word.lower()) | "count" >> beam.combiners.Count.PerElement())
collect() method provides the output in a Pandas DataFrame.
Launching Dataflow jobs from a pipeline created in your notebook
- (Optional) Before using your notebook to run Dataflow jobs, restart the kernel, rerun all cells, and verify the output. If you skip this step, hidden states in the notebook might affect the job graph in the pipeline object.
- Enable the Dataflow API.
Add the following import statement:
from apache_beam.runners import DataflowRunner
Pass in your pipeline options.
# Set up Apache Beam pipeline options. options = pipeline_options.PipelineOptions() # Set the project to the default project in your current Google Cloud # environment. _, options.view_as(GoogleCloudOptions).project = google.auth.default() # Set the Google Cloud region to run Dataflow. options.view_as(GoogleCloudOptions).region = 'us-central1' # Choose a Cloud Storage location. dataflow_gcs_location = 'gs://<change me>/dataflow' # Set the staging location. This location is used to stage the # Dataflow pipeline and SDK binary. options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location # Set the temporary location. This location is used to store temporary files # or intermediate results before outputting to the sink. options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
You can adjust the parameter values. For example, you can change the
Run the pipeline with
DataflowRunner. This runs your job on the Dataflow service.
runner = DataflowRunner() runner.run_pipeline(p, options=options)
pis a pipeline object from Creating your pipeline.
For an example on how to perform this conversion on an interactive notebook, see the Dataflow Word Count notebook in your notebook instance.
Alternatively, you can export your notebook as an executable script, modify the
.py file using the previous steps, and then deploy your
pipeline to the Dataflow
Saving your notebook
Notebooks you create are saved locally in your running notebook instance. If you reset or shut down the notebook instance during development, those new notebooks are deleted. To keep your notebooks for future use, download them locally to your workstation, save them to GitHub, or export them to a different file format.
After you've finished using your Apache Beam notebook instance, clean up the resources you created on Google Cloud by shutting down the notebook instance.