Stay organized with collections Save and categorize content based on your preferences.

Create a Dataflow pipeline using Java

This document shows you how to set up your Google Cloud project, create an example pipeline built with the Apache Beam SDK for Java, and run the example pipeline on the Dataflow service. The pipeline reads a text file from Cloud Storage, counts the number of unique words in the file, and then writes the word counts back to Cloud Storage. For an introduction to the WordCount pipeline, see the How to use WordCount in Apache Beam video.

This tutorial requires Maven, but it's also possible to convert the example project from Maven to Gradle. To learn more, see Optional: Convert from Maven to Gradle.


To follow step-by-step guidance for this task directly in the Google Cloud console, click Guide me:

Guide me


Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install and initialize the Google Cloud CLI.
  3. Create or select a Google Cloud project.

    • Create a Cloud project:

      gcloud projects create PROJECT_ID
    • Select the Cloud project that you created:

      gcloud config set project PROJECT_ID
  4. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  5. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  6. Create authentication credentials for your Google Account:

    gcloud auth application-default login
  7. Grant roles to your Google Account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace EMAIL_ADDRESS with your email address.
    • Replace ROLE with each individual role.
  8. Install and initialize the Google Cloud CLI.
  9. Create or select a Google Cloud project.

    • Create a Cloud project:

      gcloud projects create PROJECT_ID
    • Select the Cloud project that you created:

      gcloud config set project PROJECT_ID
  10. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  11. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  12. Create authentication credentials for your Google Account:

    gcloud auth application-default login
  13. Grant roles to your Google Account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace EMAIL_ADDRESS with your email address.
    • Replace ROLE with each individual role.
  14. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (Standard).
    • Set the storage location to the following: US (United States).
    • Replace BUCKET_NAME with a unique bucket name. Don't include sensitive information in the bucket name because the bucket namespace is global and publicly visible.
    gsutil mb -c STANDARD -l US gs://BUCKET_NAME
  15. Grant roles to your Compute Engine default service account. Run the following command once for each of the following IAM roles: roles/dataflow.admin, roles/dataflow.worker, and roles/storage.objectAdmin.

    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace PROJECT_NUMBER with your project number. To find your project number, see Identify projects or use the gcloud projects describe command.
    • Replace SERVICE_ACCOUNT_ROLE with each individual role.
  16. Copy the following, as you need them in a later section:
    • Your Cloud Storage bucket name.
    • Your Google Cloud project ID. To find this ID, see Identifying projects.
  17. Download and install the Java Development Kit (JDK) version 11. (Dataflow continues to support version 8.) Verify that the JAVA_HOME environment variable is set and points to your JDK installation.
  18. Download and install Apache Maven, following Maven's installation guide for your specific operating system.

Get the pipeline code

The Apache Beam SDK is an open source programming model for data processing pipelines. You define these pipelines with an Apache Beam program and can choose a runner, such as Dataflow, to run your pipeline.

  1. In your shell or terminal, use the Maven Archetype Plugin to create a Maven project on your computer that contains the Apache Beam SDK's WordCount examples:
    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=2.43.0 \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    

    The command creates a new directory called word-count-beam under your current directory. The word-count-beam directory contains a simple pom.xml file and a series of example pipelines that count words in text files.

  2. Verify that your word-count-beam directory contains the pom.xml file:

    Linux or macOS

    cd word-count-beam/
    ls

    The output is the following:

    pom.xml   src

    Windows

    cd word-count-beam/
    dir

    The output is the following:

    pom.xml   src
  3. Verify that your Maven project contains the example pipelines:

    Linux or macOS

    ls src/main/java/org/apache/beam/examples/

    The output is the following:

    DebuggingWordCount.java   WindowedWordCount.java   common
    MinimalWordCount.java   WordCount.java

    Windows

    dir src/main/java/org/apache/beam/examples/

    The output is the following:

    DebuggingWordCount.java   WindowedWordCount.java   common
    MinimalWordCount.java   WordCount.java

For a detailed introduction to the Apache Beam concepts that are used in these examples, see the Apache Beam WordCount Example. The instructions in the next sections use WordCount.java.

Run the pipeline locally

  • In your shell or terminal, run the WordCount pipeline locally from your word-count-beam directory:
    mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="--output=counts"
    

    The output files have the prefix counts and are written to the word-count-beam directory. They contain unique words from the input text and the number of occurrences of each word.

Run the pipeline on the Dataflow service

  • In your shell or terminal, build and run the WordCount pipeline on the Dataflow service from your word-count-beam directory:
    mvn -Pdataflow-runner compile exec:java \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="--project=PROJECT_ID \
        --gcpTempLocation=gs://BUCKET_NAME/temp/ \
        --output=gs://BUCKET_NAME/output \
        --runner=DataflowRunner \
        --region=REGION"
    

    Replace the following:

    • PROJECT_ID: your Cloud project ID
    • BUCKET_NAME: the name of your Cloud Storage bucket
    • REGION: a Dataflow regional endpoint, like us-central1

View your results

  1. In the Google Cloud console, go to the Dataflow Jobs page.
    Go to Jobs

    The Jobs page shows the details of all the available jobs, including the status. The wordcount job's Status is Running at first, and then updates to Succeeded.

  2. In the Google Cloud console, go to the Cloud Storage Browser page.
    Go to Browser

    The Browser page displays the list of all the storage buckets in your project.

  3. Click the storage bucket that you created.

    The Bucket details page shows the output files and staging files that your Dataflow job created.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Cloud project with the resources.

Delete the project

The easiest way to eliminate billing is to delete the Google Cloud project that you created for the quickstart.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete the individual resources

If you want to keep the Google Cloud project that you used in this quickstart, then delete the individual resources:

  1. In the Google Cloud console, go to the Cloud Storage Browser page.

    Go to Browser

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete, and then follow the instructions.
  4. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  5. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke
  6. Revoke the roles that you granted to the Compute Engine default service account. Run the following command once for each of the following IAM roles: roles/dataflow.admin, roles/dataflow.worker, and roles/storage.objectAdmin.

    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=ROLE

What's next