Dataproc 可选 Flink 组件

使用可选组件功能创建 Dataproc 集群时,可以激活其他组件,例如 Flink。本页面介绍如何创建启用了 Apache Flink 可选组件的 Dataproc 集群(Flink 集群),然后在该集群上运行 Flink 作业。

您可以使用 Flink 集群执行以下操作:

  1. 通过 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 使用 Dataproc Jobs 资源运行 Flink 作业

  2. 使用在 Flink 集群主服务器节点上运行的 flink CLI 运行 Flink 作业

  3. 在 Flink 上运行 Apache Beam 作业

  4. 在 Kerberos 化的集群上运行 Flink

您可以使用 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 来创建 Dataproc 集群,并在集群上激活 Flink 组件。

建议:搭配使用标准的单主机虚拟机集群与 Flink 组件。 Dataproc 高可用性模式集群(具有 3 个主虚拟机)不支持 Flink 高可用性模式

您可以通过 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 使用 Dataproc Jobs 资源运行 Flink 作业。

控制台

如需从控制台提交 Flink Wordcount 作业示例,请执行以下操作:

  1. 在浏览器中的 Google Cloud 控制台中打开 Dataproc 提交作业页面。

  2. 填写提交作业页面上的字段:

    1. 从集群列表中选择集群名称。
    2. 作业类型设置为 Flink
    3. 主类或 jar 设置为 org.apache.flink.examples.java.wordcount.WordCount
    4. Jar 文件设置为 file:///usr/lib/flink/examples/batch/WordCount.jar
      • file:/// 表示位于集群上的文件。Dataproc 在创建 Flink 集群时安装了 WordCount.jar
      • 此字段还接受 Cloud Storage 路径 (gs://BUCKET/JARFILE) 或 Hadoop 分布式文件系统 (HDFS) 路径 (hdfs://PATH_TO_JAR)。
  3. 点击提交

    • 作业驱动程序输出会显示在作业详情页面上。
    • Flink 作业列在 Google Cloud 控制台的 Dataproc 作业页面上。
    • 作业作业详情页面中点击停止删除,以停止或删除作业。

gcloud

如需将 Flink 作业提交到 Dataproc Flink 集群,请在终端窗口或 Cloud Shell 中本地运行 gcloud CLI gcloud dataproc job submit 命令。

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

备注:

  • CLUSTER_NAME:指定要向其提交作业的 Dataproc Flink 集群的名称。
  • REGION:指定集群所在的 Compute Engine 区域
  • MAIN_CLASS:指定 Flink 应用的 main 类,例如:
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE:指定 Flink 应用 jar 文件。您可以指定:
    • 使用 file:///` 前缀在集群上安装的 jar 文件:
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • Cloud Storage 中的 jar 文件:gs://BUCKET/JARFILE
    • HDFS 中的 jar 文件:hdfs://PATH_TO_JAR
  • JOB_ARGS:(可选)在双短划线 (--) 后面添加作业参数。

  • 提交作业后,作业驱动程序输出将显示在本地或 Cloud Shell 终端中。

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)
    

REST

本部分介绍如何使用 Dataproc jobs.submit API 将 Flink 作业提交到 Dataproc Flink 集群。

在使用任何请求数据之前,请先进行以下替换:

  • PROJECT_ID:Google Cloud 项目 ID
  • REGION集群地区
  • CLUSTER_NAME:指定要向其提交作业的 Dataproc Flink 集群的名称

HTTP 方法和网址:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

请求 JSON 正文:

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

如需发送您的请求,请展开以下选项之一:

您应该收到类似以下内容的 JSON 响应:

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
  • Flink 作业列在 Google Cloud 控制台的 Dataproc 作业页面上。
  • 您可以在 Google Cloud 控制台的作业作业详情页面中点击停止删除,以停止或删除作业。

您可以使用 flink CLI 在 Flink 集群的主节点上运行 Flink 作业,而不是使用 Dataproc Jobs 资源运行 Flink 作业

以下部分介绍了在 Dataproc Flink 集群上运行 flink CLI 作业的不同方法。

  1. 通过 SSH 连接到主节点:使用 SSH 实用程序在集群主服务器虚拟机上打开一个终端窗口。

  2. 设置类路径:在 Flink 集群主服务器虚拟机上的 SSH 终端窗口中初始化 Hadoop 类路径:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. 运行 Flink 作业:您可以在 YARN 上的不同部署模式(应用、每个作业和会话模式)下运行 Flink 作业。

    1. 应用模式:Dataproc 映像 2.0 及更高版本支持 Flink 应用模式。 此模式在 YARN 作业管理器上执行作业的 main() 方法。集群会在作业完成后关闭。

      作业提交示例:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      列出正在运行的作业:

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      取消正在运行的作业:

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. 按作业模式:此 Flink 模式在客户端执行作业的 main() 方法。

      作业提交示例:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. 会话模式:启动一个长时间运行的 Flink YARN 会话,然后向该会话提交一个或多个作业。

      1. 启动会话:您可以通过以下任一方式启动 Flink 会话:

        1. 创建 Flink 集群,并将 --metadata flink-start-yarn-session=true 标志添加到 gcloud dataproc clusters create 命令(请参阅创建 Dataproc Flink 集群)。启用此标志后,创建集群后,Dataproc 将运行 /usr/bin/flink-yarn-daemon 以在集群上启动 Flink 会话。

          会话的 YARN 应用 ID 保存在 /tmp/.yarn-properties-${USER} 中。您可以使用 yarn application -list 命令列出此 ID。

        2. 使用自定义设置运行预安装在集群主服务器虚拟机上的 Flink yarn-session.sh 脚本:

          使用自定义设置的示例:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. 使用默认设置运行 Flink /usr/bin/flink-yarn-daemon 封装容器脚本:

          . /usr/bin/flink-yarn-daemon
          
      2. 将作业提交到会话:运行以下命令,将 Flink 作业提交到会话。

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL:执行作业的 Flink 主虚拟机的网址(包括主机和端口)。从网址中移除 http:// prefix。启动 Flink 会话时,命令输出中会列出该网址。您可以运行以下命令,在 Tracking-URL 字段中列出此网址:
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. 列出会话中的作业:如需列出会话中的 Flink 作业,请执行以下操作之一:

        • 运行不带实参的 flink list。该命令会在 /tmp/.yarn-properties-${USER} 中查找会话的 YARN 应用 ID。

        • /tmp/.yarn-properties-${USER}yarn application -list 的输出获取会话的 YARN 应用 ID,然后运行 <code>flink list -yid YARN_APPLICATION_ID

        • 运行 flink list -m FLINK_MASTER_URL

      4. 停止会话:如需停止会话,请从 /tmp/.yarn-properties-${USER}yarn application -list 的输出获取会话的 YARN 应用 ID,然后运行以下任一命令:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

您可以使用 FlinkRunner 在 Dataproc 上运行 Apache Beam 作业。

您可以通过以下方式在 Flink 上运行 Beam 作业:

  1. Java Beam 作业
  2. 可移植的 Beam 作业

Java Beam 作业

将 Beam 作业打包成一个 JAR 文件。为捆绑的 JAR 文件提供运行作业所需的依赖项。

以下示例从 Dataproc 集群的主节点运行 Java Beam 作业。

  1. 创建启用了 Flink 组件的 Dataproc 集群。

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components:Flink。
    • --image-version集群的映像版本,用于确定集群上安装的 Flink 版本(例如,请参阅针对最近 4 个 2.0.x 映像发布版本列出的 Apache Flink 组件版本)。
    • --region:受支持的 Dataproc 区域
    • --enable-component-gateway:启用对 Flink Job Manager 界面的访问权限。
    • --scopes:启用集群对 Google Cloud API 的访问权限(请参阅范围最佳做法)。当您创建使用 Dataproc 映像 2.1 版或更高版本的集群时,系统会默认启用 cloud-platform 范围(无需包括此标志设置)。
  2. 使用 SSH 实用程序在 Flink 集群主服务器节点上打开一个终端窗口。

  3. 在 Dataproc 集群主服务器节点上启动 Flink YARN 会话。

    . /usr/bin/flink-yarn-daemon
    

    记下 Dataproc 集群上的 Flink 版本。

    flink --version
    
  4. 在本地机器上,使用 Java 生成规范的 Beam 字数统计示例

    选择与 Dataproc 集群上的 Flink 版本兼容的 Beam 版本。请参阅 Flink 版本兼容性表,其中列出了 Beam 与 Flink 之间的版本兼容性。

    打开生成的 POM 文件。检查 <flink.artifact.name> 标记指定的 Beam Flink 运行程序版本。如果 Flink 工件名称中的 Beam Flink 运行程序版本与您的集群上的 Flink 版本不匹配,请更新版本号以使其相匹配。

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. 打包字数统计示例。

    mvn package -Pflink-runner
    
  6. 将打包的超级 JAR 文件 word-count-beam-bundled-0.1.jar(约 135 MB)上传到您的 Dataproc 集群的主节点。您可以使用 gsutil cp 加快从 Cloud Storage 向 Dataproc 集群传输文件的速度。

    1. 在本地终端上,创建一个 Cloud Storage 存储桶,然后上传超级 JAR。

      gsutil mb BUCKET_NAME
      
      gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. 在 Dataproc 的主节点上,下载超级 JAR。

      gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. 在 Dataproc 集群的主节点上运行 Java Beam 作业。

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. 检查结果是否已写入 Cloud Storage 存储桶。

    gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. 停止 Flink YARN 会话。

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

可移植的 Beam 作业

如需运行以 Python、Go 和其他支持的语言编写的 Beam 作业,您可以按照 Beam 的 Flink 运行程序页面中的说明使用 FlinkRunnerPortableRunner(另请参阅可移植性框架路线图)。

以下示例从 Dataproc 集群的主节点运行以 Python 编写的可移植 Beam 作业。

  1. 创建同时启用了 FlinkDocker 组件的 Dataproc 集群。

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    备注:

    • --optional-components:Flink 和 Docker。
    • --image-version集群的映像版本,用于确定集群上安装的 Flink 版本(例如,请参阅针对最新和之前的四个 2.0.x 映像发布版本列出的 Apache Flink 组件版本)。
    • --region:可用的 Dataproc 区域
    • --enable-component-gateway:启用对 Flink Job Manager 界面的访问权限。
    • --scopes:启用集群对 Google Cloud API 的访问权限(请参阅范围最佳做法)。当您创建使用 Dataproc 映像 2.1 版或更高版本的集群时,系统会默认启用 cloud-platform 范围(无需包括此标志设置)。
  2. 在本地或 Cloud Shell 中使用 gsutil 来创建 Cloud Storage 存储桶。在运行示例 Wordcount 程序时,您需要指定 BUCKET_NAME

    gsutil mb BUCKET_NAME
    
  3. 在集群虚拟机的终端窗口中,启动 Flink YARN 会话。 注意 Flink 主实例网址,即执行作业的 Flink 主实例的地址。在运行示例 Wordcount 程序时,您需要指定 FLINK_MASTER_URL

    . /usr/bin/flink-yarn-daemon
    

    显示并记下运行 Dataproc 集群的 Flink 版本。在运行示例 Wordcount 程序时,您需要指定 FLINK_VERSION

    flink --version
    
  4. 在集群主服务器节点上安装作业所需的 Python 库。

  5. 安装与集群上的 Flink 版本兼容的 Beam 版本

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. 在集群主服务器节点上运行字数统计示例。

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    备注:

    • --runnerFlinkRunner
    • --flink_versionFLINK_VERSION(如前所述)。
    • --flink_masterFLINK_MASTER_URL(如前所述)。
    • --flink_submit_uber_jar:使用超级 JAR 执行 Beam 作业。
    • --output:之前创建的 BUCKET_NAME
  7. 验证结果是否已写入您的存储桶。

    gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. 停止 Flink YARN 会话。

    1. 获取应用 ID。
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

Dataproc Flink 组件支持 Kerberos 化的集群。如需提交和保留 Flink 作业或启动 Flink 集群,需要有效的 Kerberos 票据。默认情况下,Kerberos 票据的有效期为 7 天。

当 Flink 作业或 Flink 会话集群运行时,您可以使用 Flink Job Manager 网页界面。如需使用网页界面,请执行以下操作:

  1. 创建 Dataproc Flink 集群
  2. 创建集群后,在 Google Cloud 控制台的集群详情页面上点击网页界面标签页中的组件网关 YARN ResourceManager 链接
  3. YARN Resource Manager 界面中,识别 Flink 集群应用条目。根据作业的完成状态,系统将列出 ApplicationMaster历史记录链接。
  4. 对于长时间运行的流式作业,请点击 ApplicationManager 链接以打开 Flink 信息中心;对于已完成的作业,请点击历史记录链接以查看作业详细信息。