Create a Dataflow pipeline using Java

This document shows you how to set up your Google Cloud project, create an example pipeline built with the Apache Beam SDK for Java, and run the example pipeline on the Dataflow service. The pipeline reads a text file from Cloud Storage, counts the number of unique words in the file, and then writes the word counts back to Cloud Storage. For an introduction to the WordCount pipeline, see the How to use WordCount in Apache Beam video.

This tutorial requires Maven, but it's also possible to convert the example project from Maven to Gradle. To learn more, see Optional: Convert from Maven to Gradle.


To follow step-by-step guidance for this task directly in the Google Cloud console, click Guide me:

Guide me


Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Grant roles to your Compute Engine default service account. Run the following command once for each of the following IAM roles:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace PROJECT_NUMBER with your project number. To find your project number, see Identify projects or use the gcloud projects describe command.
    • Replace SERVICE_ACCOUNT_ROLE with each individual role.
  17. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (Standard).
    • Set the storage location to the following: US (United States).
    • Replace BUCKET_NAME with a unique bucket name. Don't include sensitive information in the bucket name because the bucket namespace is global and publicly visible.
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  18. Copy the following, as you need them in a later section:
    • Your Cloud Storage bucket name.
    • Your Google Cloud project ID. To find this ID, see Identifying projects.
  19. Download and install the Java Development Kit (JDK) version 11. (Dataflow continues to support version 8.) Verify that the JAVA_HOME environment variable is set and points to your JDK installation.
  20. Download and install Apache Maven, following Maven's installation guide for your specific operating system.

Get the pipeline code

The Apache Beam SDK is an open source programming model for data processing pipelines. You define these pipelines with an Apache Beam program and can choose a runner, such as Dataflow, to run your pipeline.

  1. In your shell or terminal, use the Maven Archetype Plugin to create a Maven project on your computer that contains the Apache Beam SDK's WordCount examples:
    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=2.61.0 \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    

    The command creates a new directory called word-count-beam under your current directory. The word-count-beam directory contains a simple pom.xml file and a series of example pipelines that count words in text files.

  2. Verify that your word-count-beam directory contains the pom.xml file:

    Linux or macOS

    cd word-count-beam/
    ls

    The output is the following:

    pom.xml   src

    Windows

    cd word-count-beam/
    dir

    The output is the following:

    pom.xml   src
  3. Verify that your Maven project contains the example pipelines:

    Linux or macOS

    ls src/main/java/org/apache/beam/examples/

    The output is the following:

    DebuggingWordCount.java   WindowedWordCount.java   common
    MinimalWordCount.java   WordCount.java

    Windows

    dir src/main/java/org/apache/beam/examples/

    The output is the following:

    DebuggingWordCount.java   WindowedWordCount.java   common
    MinimalWordCount.java   WordCount.java

For a detailed introduction to the Apache Beam concepts that are used in these examples, see the Apache Beam WordCount Example. The instructions in the next sections use WordCount.java.

Run the pipeline locally

  • In your shell or terminal, run the WordCount pipeline locally from your word-count-beam directory:
    mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="--output=counts"
    

    The output files have the prefix counts and are written to the word-count-beam directory. They contain unique words from the input text and the number of occurrences of each word.

Run the pipeline on the Dataflow service

  • In your shell or terminal, build and run the WordCount pipeline on the Dataflow service from your word-count-beam directory:
    mvn -Pdataflow-runner compile exec:java \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="--project=PROJECT_ID \
        --gcpTempLocation=gs://BUCKET_NAME/temp/ \
        --output=gs://BUCKET_NAME/output \
        --runner=DataflowRunner \
        --region=REGION"
    

    Replace the following:

    • PROJECT_ID: your Google Cloud project ID
    • BUCKET_NAME: the name of your Cloud Storage bucket
    • REGION: a Dataflow region, like us-central1

View your results

  1. In the Google Cloud console, go to the Dataflow Jobs page.

    Go to Jobs

    The Jobs page shows the details of all the available jobs, including the status. The wordcount job's Status is Running at first, and then updates to Succeeded.

  2. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

    The Buckets page displays the list of all the storage buckets in your project.

  3. Click the storage bucket that you created.

    The Bucket details page shows the output files and staging files that your Dataflow job created.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.

Delete the project

The easiest way to eliminate billing is to delete the Google Cloud project that you created for the quickstart.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete the individual resources

If you want to keep the Google Cloud project that you used in this quickstart, then delete the individual resources:

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete, and then follow the instructions.
  4. Revoke the roles that you granted to the Compute Engine default service account. Run the following command once for each of the following IAM roles:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  6. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

What's next