Dataproc Flink component

You can install additional components when you create a Dataproc cluster using the Optional components feature. This page describes the Flink component.

The Dataproc Flink component installs Apache Flink on a Dataproc cluster.

Install the component

Install the component when you create a Dataproc cluster. The Dataproc Flink component can be installed on clusters created with Dataproc image version 1.5 or later.

The cluster image version determines the version of the Flink component installed on the cluster (for example, see the Apache Flink component versions listed for the latest and previous four 2.0.x image release versions). See Supported Dataproc versions for the component version included in each Dataproc image release.

gcloud command

To create a Dataproc cluster that includes the Flink component, use the gcloud dataproc clusters create cluster-name command with the --optional-components flag.

gcloud dataproc clusters create cluster-name \
    --optional-components=FLINK \
    --region=region \
    --enable-component-gateway \
    --image-version=DATAPROC_IMAGE_VERSION \
    ... other flags

Note: Since a Flink YARN session consumes significant YARN resources, by default Dataproc does not start a Flink Session when the Dataproc cluster starts. You can start a session when you start your Flink cluster by adding the --metadata flink-start-yarn-session=true flag to the gcloud dataproc clusters create command.

REST API

The Flink component can be specified through the Dataproc API using SoftwareConfig.Component as part of a clusters.create request. The SoftwareConfig.imageVersion field is used to set the cluster image version.

Console

  1. Enable the component and component gateway.
    • In the Cloud Console, open the Dataproc Create a cluster page. The Set up cluster panel is selected.
    • In the Versioning section, confirm or change the Image Type and Version.
    • In the Components section:

After a Dataproc cluster with Flink starts, SSH into the Dataproc cluster's master node, then run Flink jobs.

Example:

Run a single Flink job. After accepting the job, Flink starts a Job Manager and slots for the job in YARN. The Flink job will be run in the YARN cluster until finished. The Job Manager is shut down after job completion. Job logs are available in YARN logs.

flink run -m yarn-cluster /usr/lib/flink/examples/batch/WordCount.jar

Example:

Start a long-running Flink YARN session, then run a job.

Start the session on the Dataproc cluster's master node. Alternatively, you can start a Flink YARN session when you create the Flink cluster using the gcloud dataproc clusters create --metadata flink-start-yarn-session=true flag.

. /usr/bin/flink-yarn-daemon

Take note of the host and port in the FLINK_MASTER_URL after the session has started successfully. Replace JOB_MANAGER_HOSTNAME and REST_API_PORT in the following command with those items. Run the job:

HADOOP_CLASSPATH=`hadoop classpath`

flink run -m JOB_MANAGER_HOSTNAME:REST_API_PORT /usr/lib/flink/examples/batch/WordCount.jar

To stop the session, replace APPLICATION_ID with the application ID associated with the Flink YARN session found in the Flink Job Manager UI or from the output of yarn application -list, then run:

yarn application -kill APPLICATION_ID

Running Apache Beam jobs

You can run Apache Beam jobs on Dataproc using the FlinkRunner.

You can run Beam jobs on Flink in the following ways:

  1. Java Beam jobs
  2. Portable Beam jobs

Java Beam jobs

Package your Beam jobs into a JAR file. Supply the bundled JAR file with the dependencies needed to run the job.

The following example runs a Java Beam job from the Dataproc cluster's master node.

  1. Create a Dataproc cluster with the Flink component enabled.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: the cluster's image version, which determines the Flink version installed on the cluster (for example, see the Apache Flink component versions listed for the latest and previous four 2.0.x image release versions).
    • --region: a supported Dataproc region.
    • --enable-component-gateway: enable access to the Flink Job Manager UI.
    • --scopes: enable API access to GCP services in the same project.
  2. SSH into the Dataproc cluster's master node.

  3. Start a Flink YARN session on the Dataproc cluster's master node.

    . /usr/bin/flink-yarn-daemon
    

    Take note of the Flink version on your Dataproc cluster.

    flink --version
    
  4. On your local machine, generate the canonical Beam word count example in Java.

    Choose a Beam version that is compatible with the Flink version on your Dataproc cluster. See the Flink Version Compatibility table that lists Beam-Flink version compatibility.

    Open the generated POM file. Check the Beam Flink runner version specified by the tag <flink.artifact.name>. If the Beam Flink runner version in the Flink artifact name does not match the Flink version on your cluster, update the version number to match.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Package the word count example.

    mvn package -Pflink-runner
    
  6. Upload the packaged uber JAR file, word-count-beam-bundled-0.1.jar (~135 MB) to your Dataproc cluster's master node. You can use gsutil cp for faster file transfers to your Dataproc cluster from Cloud Storage.

    1. On your local terminal, create a Cloud Storage bucket, and upload the uber JAR.

      gsutil mb BUCKET_NAME
      
      gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. On your Dataproc's master node, download the uber JAR.

      gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Run the Java Beam job on the Dataproc cluster's master node.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Check that the results were written to your Cloud Storage bucket.

    gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Stop the Flink YARN session.

    yarn application -list
    
    yarn application -kill APPLICATION_ID
    

Portable Beam Jobs

To run Beam jobs written in Python, Go, and other supported languages, you can use the FlinkRunner and PortableRunner as described on the Beam's Flink Runner page (also see Portability Framework Roadmap).

The following example runs a portable Beam job in Python from the Dataproc cluster's master node.

  1. Create a Dataproc cluster with both the Flink and Docker components enabled.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink and Docker.
    • --image-version: the cluster's image version, which determines the Flink version installed on the cluster (for example, see the Apache Flink component versions listed for the latest and previous four 2.0.x image release versions).
    • --region: a supported Dataproc region.
    • --enable-component-gateway: enable access to the Flink Job Manager UI.
    • --scopes: enable API access to GCP services in the same project.
  2. SSH into the Dataproc cluster's master node.

  3. Create a Cloud Storage bucket.

    gsutil mb BUCKET_NAME
    
  4. Start a Flink YARN session on the Dataproc cluster's master node and save the Flink master URL once the session starts.

    . /usr/bin/flink-yarn-daemon
    

    Take note of the Flink version on your Dataproc cluster.

    flink --version
    
  5. Install the necessary Python libraries for the job on the Dataproc cluster's master node.

    Choose a Beam version that is compatible with the Flink version on your Dataproc cluster. See the Flink Version Compatibility table that lists Beam-Flink version compatibility.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Run the word count example on the Dataproc cluster's master node.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    
    • --runner (required): FlinkRunner.
    • --flink_version (required): Flink version.
    • --flink_master (required): address of the Flink Master where the job will be executed.
    • --flink_submit_uber_jar (required): use the uber JAR to execute the Beam job.
    • --output (required): where output shall be written.
  7. Check that the results were written to your bucket.

    gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Stop the Flink YARN session.

    yarn application -list
    
    yarn application -kill APPLICATION_ID
    

The Dataproc Flink component supports Kerberized clusters. A valid Kerberos ticket is needed to submit and persist a Flink job or start a Flink cluster. By default, a ticket remains valid for 7 days.

The Flink Job Manager web interface is available while a Flink job or Flink session cluster is running. You can open the Flink Job Manager UI from the Application Master of the Flink application in YARN.

To enable and use UI access:

  1. Create the Dataproc cluster with the Component Gateway enabled.
  2. After cluster creation, click the Component Gateway YARN ResourceManager link on Web Interface tab on the Cluster details page in the Google Cloud Console.
  3. On the YARN Resource Manager UI, identify the Flink cluster application entry. Depending on your job's completion status, an ApplicationMaster or History link will be listed.
  4. For a long-running streaming job, click the ApplicationManager link to open the Flink dashboard; for a completed job, click the History link to view job details.