Dataproc 可选 Flink 组件

您可以在创建 Dataproc 时激活其他组件,例如 Flink 使用 可选组件 功能。本页面介绍了如何创建 Dataproc 集群 使用 Apache Flink 激活可选组件(Flink 集群),然后在该集群上运行 Flink 作业。

您可以使用 Flink 集群执行以下操作:

  1. 使用 Dataproc Jobs 资源运行 Flink 作业 从 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 登录。

  2. 使用 flink CLI 运行 Flink 作业 在 Flink 集群主服务器节点上运行的

  3. 在 Flink 上运行 Apache Beam 作业

  4. 在 Kerberos 化的集群上运行 Flink

您可以使用 Google Cloud 控制台、Google Cloud CLI 或 Dataproc 创建具有 Flink 组件的 Dataproc 集群的 API 集群上已激活的资源

建议:使用带有 Flink 组件的标准单主实例虚拟机集群。 Dataproc 高可用性模式集群 (具有 3 个主虚拟机)不支持 Flink 高可用性模式

您可以使用 Google Cloud 控制台中的 Dataproc Jobs 资源来运行 Flink 作业, Google Cloud 控制台、Google Cloud CLI 或 Dataproc API。

控制台

如需从控制台提交示例 Flink WordCount 作业,请执行以下操作:

  1. 打开 Dataproc 提交作业页面(位于 在浏览器中打开 Google Cloud 控制台。

  2. 填写提交作业页面上的字段:

    1. 从集群列表中选择集群名称。
    2. 作业类型设置为 Flink
    3. 主类或 jar 设置为 org.apache.flink.examples.java.wordcount.WordCount
    4. Jar 文件设置为 file:///usr/lib/flink/examples/batch/WordCount.jar
      • file:/// 表示位于集群上的文件。Dataproc 在创建 Flink 集群时安装了 WordCount.jar
      • 此字段还接受 Cloud Storage 路径 (gs://BUCKET/JARFILE) 或 Hadoop 分布式文件系统 (HDFS) 路径 (hdfs://PATH_TO_JAR).
  3. 点击提交

    • 作业驱动程序输出会显示在作业详情页面上。
    • Flink 作业列在 Dataproc 作业页面 Google Cloud 控制台中。
    • 作业作业详情页面中点击停止删除 停止或删除作业。

gcloud

如需将 Flink 作业提交到 Dataproc Flink 集群,请运行 gcloud CLI gcloud dataproc 作业提交 命令行中的命令 Cloud Shell

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

注意:

  • CLUSTER_NAME:指定 Dataproc Flink 的名称 将作业提交到哪个集群
  • REGION:指定 Compute Engine 区域 集群所在的位置
  • MAIN_CLASS:指定您的 API 的 main 类。 Flink 应用,例如: <ph type="x-smartling-placeholder">
      </ph>
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE:指定 Flink 应用 jar 文件。您可以指定: <ph type="x-smartling-placeholder">
      </ph>
    • 使用 file:///安装在集群上的 jar 文件 前缀:
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • Cloud Storage 中的 jar 文件: gs://BUCKET/JARFILE
    • HDFS 中的 jar 文件: hdfs://PATH_TO_JAR
  • JOB_ARGS:(可选)在双短划线 (--) 后面添加作业参数。

  • 提交作业后,作业驱动程序输出将显示在 或 Cloud Shell 终端

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)
    

REST

本部分介绍如何将 Flink 作业提交到 Dataproc Flink 使用 Dataproc jobs.submit API。

在使用任何请求数据之前,请先进行以下替换:

  • PROJECT_ID:Google Cloud 项目 ID
  • REGION集群地区
  • CLUSTER_NAME:指定要将作业提交到的 Dataproc Flink 集群的名称

HTTP 方法和网址:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

请求 JSON 正文:

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

如需发送您的请求,请展开以下选项之一:

您应该收到类似以下内容的 JSON 响应:

<ph type="x-smartling-placeholder">
</ph>
{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
  • Flink 作业列在 Dataproc 作业页面 Google Cloud 控制台中。
  • 您可以从作业作业详情页面中点击停止删除。 来停止或删除作业。

而不是 使用 Dataproc Jobs 资源运行 Flink 作业; 您可以使用 flink CLI 在 Flink 集群的主节点上运行 Flink 作业。

以下部分介绍了运行 flink CLI 作业的不同方法 Dataproc Flink 集群。

  1. 通过 SSH 连接到主节点:使用 SSH 实用程序打开终端窗口。

  2. 设置类路径:从 Flink 集群主服务器虚拟机:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. 运行 Flink 作业:您可以在不同的环境中运行 Flink 作业 YARN 上的部署模式:应用模式、每个作业模式和会话模式。

    1. 应用模式:Dataproc 映像 2.0 及更高版本支持 Flink 应用模式。 此模式会在 YARN 作业管理器上执行作业的 main() 方法。集群将关闭 。

      作业提交示例:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      列出正在运行的作业:

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      取消正在运行的作业:

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. 按作业模式:此 Flink 模式会在main() 客户端。

      作业提交示例:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. 会话模式:启动长时间运行的 Flink YARN 会话,然后提交 一个或多个作业添加到会话中

      1. 启动会话:您可以在以下任一位置启动 Flink 会话: 方法:

        1. 创建一个 Flink 集群,并添加 --metadata flink-start-yarn-session=true 标记添加到 gcloud dataproc clusters create 命令(请参阅 创建一个 Dataproc Flink 集群。 带有此标志 创建集群后,Dataproc 将 /usr/bin/flink-yarn-daemon,用于在集群上启动 Flink 会话。

          会话的 YARN 应用 ID 保存在 /tmp/.yarn-properties-${USER} 中。 您可以使用 yarn application -list 命令列出该 ID。

        2. 运行 Flink yarn-session.sh 脚本(预先安装在集群主服务器虚拟机上,具有自定义设置):

          自定义设置示例:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. 使用以下命令运行 Flink /usr/bin/flink-yarn-daemon 封装容器脚本 默认设置:

          . /usr/bin/flink-yarn-daemon
          
      2. 将作业提交到会话:运行以下命令以提交 Flink 作业与会话。

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL:网址,包括主机 和端口。 从以下位置移除 http:// prefix: 。当您启动 Flink 会话。您可以运行以下命令列出此网址 在 Tracking-URL 字段中:
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. 列出会话中的作业:如需列出会话中的 Flink 作业,请执行以下操作之一: 以下:

        • 在不使用参数的情况下运行 flink list。该命令会查找 /tmp/.yarn-properties-${USER} 中会话的 YARN 应用 ID。

        • 从以下位置获取会话的 YARN 应用 ID: /tmp/.yarn-properties-${USER}yarn application -list 的输出。 然后运行 <code>flink list -yidYARN_APPLICATION_ID

        • 运行 flink list -m FLINK_MASTER_URL

      4. 停止会话:如需停止会话,请获取 YARN 应用 ID 从 /tmp/.yarn-properties-${USER} 开始会话或 yarn application -list,然后运行以下任一命令:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

您可以使用 FlinkRunner 在 Dataproc 上运行 Apache Beam 作业。

您可以通过以下方式在 Flink 上运行 Beam 作业:

  1. Java Beam 作业
  2. 可移植的 Beam 作业

Java Beam 作业

将 Beam 作业打包成一个 JAR 文件。为捆绑的 JAR 文件提供运行作业所需的依赖项。

以下示例从 Dataproc 集群的主节点运行 Java Beam 作业。

  1. 创建启用了 Flink 组件的 Dataproc 集群。

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components:Flink。
    • --image-version集群的映像版本,用于确定集群上安装的 Flink 版本(例如,请参阅针对最近 4 个 2.0.x 映像发布版本列出的 Apache Flink 组件版本)。
    • --region:受支持的 Dataproc 区域
    • --enable-component-gateway:启用对 Flink Job Manager 界面的访问权限。
    • --scopes:启用您的集群对 Google Cloud API 的访问权限 (请参阅范围最佳实践)。 cloud-platform 范围默认处于启用状态(您不需要包含 此标志设置) (使用 Dataproc 映像 2.1 或更高版本)。
  2. 使用 SSH 实用程序 在 Flink 集群主服务器节点上打开一个终端窗口。

  3. 在 Dataproc 集群主服务器实例上启动 Flink YARN 会话 节点。

    . /usr/bin/flink-yarn-daemon
    

    记下 Dataproc 集群上的 Flink 版本。

    flink --version
    
  4. 在本地机器上,使用 Java 生成规范的 Beam 字数统计示例

    选择与 Dataproc 集群上的 Flink 版本兼容的 Beam 版本。请参阅 Flink 版本兼容性 列出了 Beam-Flink 版本兼容性的表

    打开生成的 POM 文件。检查 <flink.artifact.name> 标记指定的 Beam Flink 运行程序版本。如果 Flink 工件名称中的 Beam Flink 运行程序版本与您的集群上的 Flink 版本不匹配,请更新版本号以使其相匹配。

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. 打包字数统计示例。

    mvn package -Pflink-runner
    
  6. 将打包的超级 JAR 文件 word-count-beam-bundled-0.1.jar(约 135 MB)上传到您的 Dataproc 集群的主节点。您可以使用 gcloud storage cp 加快从 Cloud Storage 向 Dataproc 集群传输文件的速度。

    1. 在本地终端上,创建一个 Cloud Storage 存储桶,然后上传超级 JAR。

      gcloud storage buckets create BUCKET_NAME
      
      gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. 在 Dataproc 的主节点上,下载超级 JAR。

      gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. 在 Dataproc 集群的主节点上运行 Java Beam 作业。

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. 检查结果是否已写入 Cloud Storage 存储桶。

    gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. 停止 Flink YARN 会话。

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

可移植的 Beam 作业

如需运行使用 Python、Go 和其他支持的语言编写的 Beam 作业,您可以 按照 Beam 的FlinkRunnerPortableRunner Flink Runner 页面(另请参阅可移植性框架路线图)。

以下示例从 Dataproc 集群的主节点运行以 Python 编写的可移植 Beam 作业。

  1. 创建同时启用了 FlinkDocker 组件的 Dataproc 集群。

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    注意:

    • --optional-components:Flink 和 Docker。
    • --image-version集群的映像版本。 这决定了集群上安装的 Flink 版本(例如, 请查看列出的 Apache Flink 组件版本, 四个 2.0.x 映像发布版本)。
    • --region:可用的 Dataproc 区域
    • --enable-component-gateway:启用对 Flink Job Manager 界面的访问权限。
    • --scopes:启用您的集群对 Google Cloud API 的访问权限 (请参阅范围最佳实践)。 cloud-platform 范围默认处于启用状态(您不需要包含 此标志设置) (使用 Dataproc 映像 2.1 或更高版本)。
  2. 在本地或在 Cloud Shell,用于创建 Cloud Storage 存储桶。您将指定BUCKET_NAME

    gcloud storage buckets create BUCKET_NAME
    
  3. 在集群虚拟机的终端窗口中,启动 Flink YARN 会话。 记下 Flink 主网址,即 Flink 主实例的地址 执行作业的位置。在执行以下操作时,您需要指定FLINK_MASTER_URL 运行示例 WordCount 程序。

    . /usr/bin/flink-yarn-daemon
    

    显示并记下运行 Dataproc 的 Flink 版本 集群。在执行以下操作时,您需要指定FLINK_VERSION 运行示例 WordCount 程序。

    flink --version
    
  4. 在 集群主服务器节点。

  5. 安装 Beam 版本 兼容集群上的 Flink 版本。

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. 在集群主服务器实例上运行单词数示例 节点。

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    注意:

    • --runnerFlinkRunner
    • --flink_version:之前提到的 FLINK_VERSION
    • --flink_master:之前提到的 FLINK_MASTER_URL
    • --flink_submit_uber_jar:使用超级 JAR 执行 Beam 作业。
    • --outputBUCKET_NAME(之前创建)。
  7. 验证结果是否已写入存储桶。

    gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. 停止 Flink YARN 会话。

    1. 获取应用 ID。
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

Dataproc Flink 组件支持 Kerberos 化的集群。需要有效的 Kerberos 票据才能提交和保留 Flink 作业,或启动 一个 Flink 集群默认情况下,Kerberos 票据的有效期为 7 天。

当 Flink 作业或 Flink 会话集群运行时,您可以使用 Flink Job Manager 网页界面。如需使用网页界面,请执行以下操作:

  1. 创建 Dataproc Flink 集群
  2. 创建集群后,点击组件网关 集群详情页面上“网络界面”标签页上的 YARN ResourceManager 链接 Google Cloud 控制台中。
  3. YARN Resource Manager 界面中,识别 Flink 集群应用条目。根据作业的完成状态,ApplicationMaster历史记录链接。
  4. 对于长时间运行的流处理作业,请点击 ApplicationManager 链接,以 打开 Flink 信息中心;对于已完成的作业,请点击历史记录链接 查看作业详情。