Dataproc Flink Component

You can install additional components when you create a Dataproc cluster using the Optional Components feature. This page describes the Flink component.

The Dataproc Flink component installs Apache Flink on a Dataproc cluster.

Install the component

Install the component when you create a Dataproc cluster. The Dataproc Flink component can be installed on clusters created with Dataproc image version 1.5 or later.

See Supported Dataproc versions for the component version included in each Dataproc image release.

gcloud command

To create a Dataproc cluster that includes the Flink component, use the gcloud dataproc clusters create cluster-name command with the --optional-components flag.

gcloud dataproc clusters create cluster-name \
    --optional-components=FLINK \
    --region=region \
    --image-version=1.5 \
    --enable-component-gateway \
    ... other flags

Note: Since a Flink YARN session consumes significant YARN resources, by default Dataproc does not start a Flink Session when the Dataproc cluster starts. You can start a session when you start your Flink cluster by adding the --metadata flink-start-yarn-session=true flag to the gcloud dataproc clusters create command.

REST API

The Flink component can be specified through the Dataproc API using SoftwareConfig.Component as part of a clusters.create request.

Console

  1. Enable the component and component gateway.
    • In the Cloud Console, open the Dataproc Create a cluster page. The Set up cluster panel is selected.
    • In the Components section:

After a Dataproc cluster with Flink starts, SSH into the Dataproc master node, then run Flink jobs.

Example:

Run a single Flink job. After accepting the job, Flink starts a Job Manager and slots for the job in YARN. The Flink job will be run in the YARN cluster until finished. The Job Manager is shut down after job completion. Job logs are available in YARN logs.

flink run -m yarn-cluster /usr/lib/flink/examples/batch/WordCount.jar

Example:

Start a long-running Flink YARN session, then run a job.

Start the session. Note: Alternatively, you can start a Flink YARN session when you create the Flink cluster using the gcloud dataproc clusters create --metadata flink-start-yarn-session=true flag.

. /usr/bin/flink-yarn-daemon

Run the job:

HADOOP_CLASSPATH=`hadoop classpath` \
    flink run -m JOB_MANAGER_HOSTNAME:REST_API_PORT /usr/lib/flink/examples/batch/WordCount.jar

Running Apache Beam jobs

You can run Apache Beam jobs on Dataproc using the FlinkRunner.

After you SSH into the Dataproc master node, you can run Beam jobs on Flink in two ways:

  1. Java Beam jobs
  2. Portable Beam jobs.

Java Beam jobs

Package your Beam jobs into a jar file, then run the job.

mvn package -Pflink-runner
bin/flink run -c org.apache.beam.examples.WordCount /path/to/your.jar \
    --runner=FlinkRunner \
    --other-parameters

Portable Beam Jobs

To run Beam jobs written in Python, Go, or other supported languages,

  1. Use the PortableRunner (see Portability Framework Roadmap ).

  2. The Dataproc cluster must be created with the Docker component enabled, which installs Docker on each cluster node. To add the Docker component to the cluster, create the cluster by installing both the Flink and Docker components.

    gcloud example:

    gcloud dataproc clusters create cluster-name \
        --optional-components=FLINK,DOCKER \
        --region=region \
        --image-version=1.5 \
        --enable-component-gateway \
        ... other flags
    

  3. Install necessary Python or other libraries needed by Beam, such as apache_beam or apache_beam[gcp]. You can pass in the Flink master URL or omit it and run a single job.

    Python example:

    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    options = PipelineOptions([
        "--runner=FlinkRunner",
        "--flink_version=1.9",
        "--flink_master=localhost:8081",
        "--environment_type=DOCKER"
    ])
    with beam.Pipeline(options=options) as p:
    

The Dataproc Flink component supports Kerberized clusters. A valid Kerberos ticket is needed to submit and persist a Flink job or start a Flink cluster. By default, a ticket remains valid for 7 days.

The Flink Job Manager web interface is available while a Flink job or Flink session cluster is running. The Flink Job Manager UI can be opened from the Application Master of the Flink application in YARN.

To enable and use UI access:

  1. Create the Dataproc cluster with the Component Gateway enabled.
  2. After cluster creation, click the Component Gateway YARN ResourceManager link on Web Interface tab on the Cluster details page in the Google Cloud Console.
  3. On the YARN Resource Manager page, identify the Flink cluster application entry and click the Application Master link.
  4. The Flink dashboard opens.