Create a Dataflow pipeline using Go
This page shows you how to use the Apache Beam SDK for Go to build a program that defines a pipeline. Then, you'll run the pipeline locally and on the Dataflow service.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs.
-
Create a service account:
-
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
To provide access to your project, grant the following role(s) to your service account: Project > Owner.
In the Select a role list, select a role.
For additional roles, click
Add another role and add each additional role. - Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the Google Cloud console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again. -
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs.
-
Create a service account:
-
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
To provide access to your project, grant the following role(s) to your service account: Project > Owner.
In the Select a role list, select a role.
For additional roles, click
Add another role and add each additional role. - Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the Google Cloud console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again. - Create a Cloud Storage bucket:
- In the Google Cloud console, go to the Cloud Storage Browser page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select the following: Standard.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
- Copy the Google Cloud project ID and the Cloud Storage bucket name. You need these values later in this quickstart.
Set up your development environment
The Apache Beam SDK is an open source programming model for data pipelines. You define a pipeline with an Apache Beam program and then choose a runner, such as Dataflow, to run your pipeline.
The Apache Beam SDK for Go requires Go version 1.16 or later. If you don't have Go installed, use Go's Download and install guide to download and install Go for your specific operating system.
To verify you have Go version 1.16 or later, run the following command in your local terminal:
go version
Run the Beam wordcount example
The Apache Beam SDK for Go includes a
wordcount
pipeline example.
The wordcount
example does the following:
- Reads a text file as input. By default, it reads a text file located in a
Cloud Storage bucket with the resource name
gs://dataflow-samples/shakespeare/kinglear.txt
. - Parses each line into words.
- Performs a frequency count on the tokenized words.
To run the latest version of the Beam wordcount
example on your local machine,
use the following command. The input
flag specifies the file to read,
and the output
flag specifies the filename for the frequency count output.
go run github.com/apache/beam/sdks/v2/go/examples/wordcount@latest \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output outputs
After the pipeline completes, view the output results:
more outputs*
To exit, press q.
Modify the pipeline code
The Beam wordcount
pipeline distinguishes between uppercase and lowercase
words. The following steps show how to create your own Go module, modify the
wordcount
pipeline so that the pipeline is not case-sensitive, and run it on
Dataflow.
Create a Go module
To make changes to the pipeline code, follow these steps.
Create a directory for your Go module in a location of your choice:
mkdir wordcount
cd wordcount
Create a Go module. For this example, use
example/dataflow
as the module path.go mod init example/dataflow
Download the latest copy of the
wordcount
code from the Apache Beam GitHub repository. Put this file into thewordcount
directory you created.If you are using a non-Linux operating system, you must get the Go
unix
package. This package is required to run pipelines on the Dataflow service.go get -u golang.org/x/sys/unix
Ensure that the
go.mod
file matches the module's source code:go mod tidy
Run the unmodified pipeline
Verify the unmodified wordcount
pipeline runs locally.
From the terminal, build and run the pipeline locally:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
View the output results:
more outputs*
To exit, press q.
Change the pipeline code
To change the pipeline so that it is not case-sensitive, modify the code to
apply the strings.ToLower
function to all words.
In an editor of your choice, open the
wordcount.go
file.Examine the
init
block:func init() { beam.RegisterFunction(formatFn) beam.RegisterType(reflect.TypeOf((*extractFn)(nil))) }
Add a new line to register the
strings.ToLower
function:func init() { beam.RegisterFunction(formatFn) beam.RegisterFunction(strings.ToLower) beam.RegisterType(reflect.TypeOf((*extractFn)(nil))) }
Examine the
CountWords
function:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) }
To lowercase the words, add a ParDo that applies
strings.ToLower
to every word:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Map all letters to lowercase. lowercaseWords := beam.ParDo(s, strings.ToLower, col) // Count the number of times each word occurs. return stats.Count(s, lowercaseWords) }
Save the file.
Run the updated pipeline locally
Run your updated wordcount
pipeline locally and verify the output has changed.
Build and run the modified
wordcount
pipeline:go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
View the output results of the modified pipeline. All words should be lowercase.
more outputs*
To exit, press q.
Run the pipeline on the Dataflow service
To run the updated wordcount
example on the Dataflow service,
use the following command:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://STORAGE_BUCKET/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://STORAGE_BUCKET/binaries/
Replace the following:
STORAGE_BUCKET
: the Cloud Storage bucket name.PROJECT_ID
: the Google Cloud project ID.DATAFLOW_REGION
: the regional endpoint where you want to deploy the Dataflow job. For example,europe-west1
. For a list of available locations, see Dataflow locations. The--region
flag overrides the default region that is set in the metadata server, your local client, or environment variables.
You can see a list of your Dataflow jobs in Google Cloud console. In the Google Cloud console, go to the Dataflow Jobs page.
The Jobs page displays details of your wordcount
job, including a status
of Running at first, and then Succeeded.
When you run a pipeline using Dataflow, your results are stored in a Cloud Storage bucket. View the output results by using either the Google Cloud console or the local terminal.
Console
To view your results in Google Cloud console, go to the Cloud Storage Browser page.
From the list of buckets in your project, click the storage bucket that you
created earlier. The output files that your job created are displayed in the
results
directory.
Terminal
To view the results from your terminal, install and use the
gsutil
tool. You can also run the commands from
Cloud Shell.
List the output files:
gsutil ls -lh "gs://STORAGE_BUCKET/results/outputs*"
Replace
STORAGE_BUCKET
with the name of the specified output Cloud Storage bucket.View the results in the output files:
gsutil cat "gs://STORAGE_BUCKET/results/outputs*"
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.
- In the Google Cloud console, go to the Cloud Storage Browser page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.
What's next
- Programming model for Apache Beam
- Pipeline fundamentals for the Apache Beam SDKs
- Setting pipeline options
- Pipeline options reference
- Deploying a pipeline