Dataproc の Flink オプション コンポーネント

Dataproc クラスタを作成する際には、オプション コンポーネント機能を使用して、Flink などの追加コンポーネントを有効化できます。このページでは、Apache Flink オプション コンポーネントが有効になっている Dataproc クラスタ(Flink クラスタ)を作成し、クラスタで Flink ジョブを実行する方法について説明します。

Flink クラスタを使用すると、次のことができます。

  1. Google Cloud コンソール、Google Cloud CLI、または Dataproc API から Dataproc Jobs リソースを使用して Flink ジョブを実行します。

  2. Flink クラスタのマスターノードで実行されている flink CLI を使用して Flink ジョブを実行する

  3. Flink で Apache Beam ジョブを実行する

  4. Kerberos クラスタで Flink を実行する

Google Cloud コンソール、Google Cloud CLI、または Dataproc API を使用して、クラスタで Flink コンポーネントが有効になっている Dataproc クラスタを作成できます。

推奨事項: Flink コンポーネントで標準の 1 マスター VM クラスタを使用します。Dataproc 高可用性モードクラスタ(3 つのマスター VM を含む)では、Flink 高可用性モードはサポートされていません。

ConsolegcloudAPI

Google Cloud コンソールを使用して Dataproc Flink クラスタを作成するには、次の操作を行います。

  1. Dataproc の [Compute Engine に Dataproc クラスタを作成する] ページを開きます。

    1. [クラスターを設定] パネルが選択されています。
      1. [バージョニング] セクションで、イメージのタイプとバージョンを確認または変更します。クラスタ イメージのバージョンに応じて、クラスタにインストールされる Flink コンポーネントのバージョンが決まります。
        • クラスタで Flink コンポーネントを有効にするには、イメージ バージョンが 1.5 以降である必要があります(各 Dataproc イメージ リリースに含まれるコンポーネント バージョンの一覧を表示するには、サポートされている Dataproc バージョンをご覧ください)。
        • Dataproc Jobs API で Flink ジョブを実行するには、イメージ バージョンが [TBD] 以降である必要があります(Dataproc Flink ジョブを実行するをご覧ください)。
      2. [コンポーネント] セクションで次の設定を行います。
        1. [コンポーネント ゲートウェイ] で [コンポーネント ゲートウェイを有効にする] を選択します。Flink History Server UI へのコンポーネント ゲートウェイ リンクを有効にするには、コンポーネント ゲートウェイを有効にする必要があります。コンポーネント ゲートウェイを有効にすると、Flink クラスタで実行されている Flink ジョブ マネージャー ウェブ インターフェースにもアクセスできるようになります。
        2. [オプション コンポーネント] で、クラスタで有効にする Flink やその他のオプション コンポーネントを選択します。
    2. [クラスタのカスタマイズ(省略可)] パネルをクリックします。

      1. [クラスタのプロパティ] セクションで、クラスタに追加するオプションのクラスタ プロパティごとに [プロパティを追加] をクリックします。flink 接頭辞付きのプロパティを追加して、クラスタで実行する Flink アプリケーションのデフォルトとして機能する /etc/flink/conf/flink-conf.yaml に Flink プロパティを構成できます。

        例:

        • flink:historyserver.archive.fs.dir を設定して、Flink ジョブ履歴ファイルを書き込む Cloud Storage のロケーションを指定します(このロケーションは、Flink クラスタで実行されている Flink 履歴サーバーによって使用されます)。
        • flink:taskmanager.numberOfTaskSlots=n を使用して Flink タスク スロットを設定します。
      2. [カスタム クラスタ メタデータ] セクションで [メタデータを追加] をクリックして、オプションのメタデータを追加します。たとえば、flink-start-yarn-session true を追加して、クラスタ マスターノードのバックグラウンドで Flink YARN デーモン(/usr/bin/flink-yarn-daemon)を実行し、Flink YARN セッションを開始します(Flink セッション モードをご覧ください)。

    3. Dataproc イメージ バージョン 2.0 以前を使用している場合は、セキュリティを管理する(省略可)パネルをクリックし、プロジェクトへのアクセスで、Enables the cloud-platform scope for this cluster を選択します。Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform スコープがデフォルトで有効になります。

  2. [作成] をクリックしてクラスタを作成します。

gcloud CLI を使用して Dataproc Flink クラスタを作成するには、ターミナル ウィンドウまたは Cloud Shell で次の gcloud dataproc のクラスタ作成 コマンドをローカルに実行します。

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --image-version=DATAPROC_IMAGE_VERSION \
    --optional-components=FLINK \
    --enable-component-gateway \
    --properties=PROPERTIES
    ... other flags

注:

  • CLUSTER_NAME: クラスタの名前を指定します。
  • REGION: クラスタが配置される Compute Engine リージョンを指定します。
  • DATAPROC_IMAGE_VERSION: 必要に応じて、クラスタで使用するイメージ バージョンを指定します。クラスタ イメージのバージョンに応じて、クラスタにインストールされる Flink コンポーネントのバージョンが決まります。

    • クラスタで Flink コンポーネントを有効にするには、イメージ バージョンが 1.5 以降である必要があります(各 Dataproc イメージ リリースに含まれるコンポーネント バージョンの一覧を表示するには、サポートされている Dataproc バージョンをご覧ください)。

    • Dataproc Jobs API で Flink ジョブを実行するには、イメージ バージョンが [TBD] 以降である必要があります(Dataproc Flink ジョブを実行するをご覧ください)。

  • --optional-components: クラスタで Flink ジョブと Flink HistoryServer ウェブサービスを実行するには、FLINK コンポーネントを指定する必要があります。

  • --enable-component-gateway: Flink 履歴サーバー UI へのコンポーネント ゲートウェイ リンクを有効にするには、コンポーネント ゲートウェイを有効にする必要があります。コンポーネント ゲートウェイを有効にすると、Flink クラスタで実行されている Flink ジョブ マネージャー ウェブ インターフェースにもアクセスできるようになります。

  • PROPERTIES。必要に応じて、1 つ以上のクラスタ プロパティを指定します。

    • イメージ バージョン 2.0.67 以降と 2.1.15 以降を使用して Dataproc クラスタを作成する場合、--properties フラグを使用して、クラスタで実行する Flink アプリケーションのデフォルトとして機能する /etc/flink/conf/flink-conf.yaml に Flink プロパティを構成できます。

    • flink:historyserver.archive.fs.dir を設定すると、Flink ジョブ履歴ファイルを書き込む Cloud Storage のロケーションを指定できます(このロケーションは、Flink クラスタで実行されている Flink 履歴サーバーによって使用されます)。

    • 複数のプロパティの例:

    --properties=flink:historyserver.archive.fs.dir=gs://my-bucket/my-flink-cluster/completed-jobs,flink:taskmanager.numberOfTaskSlots=2
    
  • その他のフラグ:

    • クラスタ マスターノードのバックグラウンドで Flink YARN デーモン(/usr/bin/flink-yarn-daemon)を実行するには、オプションの --metadata flink-start-yarn-session=true フラグ を追加して Flink YARN セッションを開始します(Flink セッション モードをご覧ください)。
  • 2.0 以前のイメージ バージョンを使用する場合は、--scopes=https://www.googleapis.com/auth/cloud-platform フラグを追加して、クラスタによる API へのアクセスを有効にできます(スコープのベスト プラクティスを参照)。 Google Cloud Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform スコープがデフォルトで有効になります。

Dataproc API を使用して Dataproc Flink クラスタを作成するには、次のように clusters.create リクエストを送信します。

注:

  • SoftwareConfig.ComponentFLINK に設定します。

  • 必要に応じて SoftwareConfig.imageVersion を設定して、クラスタで使用するイメージ バージョンを指定できます。クラスタ イメージのバージョンに応じて、クラスタにインストールされる Flink コンポーネントのバージョンが決まります。

    • クラスタで Flink コンポーネントを有効にするには、イメージ バージョンが 1.5 以降である必要があります(各 Dataproc イメージ リリースに含まれるコンポーネント バージョンの一覧を表示するには、サポートされている Dataproc バージョンをご覧ください)。

    • Dataproc Jobs API で Flink ジョブを実行するには、イメージ バージョンが [TBD] 以降である必要があります(Dataproc Flink ジョブを実行するをご覧ください)。

  • EndpointConfig.enableHttpPortAccesstrue に設定して、Flink History Server UI へのコンポーネント ゲートウェイのリンクを有効にします。コンポーネント ゲートウェイを有効にすると、Flink クラスタで実行されている Flink ジョブ マネージャー ウェブ インターフェースにもアクセスできるようになります。

  • 必要に応じて SoftwareConfig.properties を設定して、1 つ以上のクラスタ プロパティを指定できます。

    • クラスタで実行する Flink アプリケーションのデフォルトとして機能する Flink プロパティを指定できます。たとえば、flink:historyserver.archive.fs.dir を設定すると、Flink ジョブ履歴ファイルを書き込む Cloud Storage のロケーションを指定できます(このロケーションは、Flink クラスタで実行されている Flink 履歴サーバーによって使用されます)。
  • 必要に応じて、次の項目を設定できます。

    • GceClusterConfig.metadata。たとえば、flink-start-yarn-session true を指定して、クラスタ マスターノードのバックグラウンドで Flink YARN デーモン(/usr/bin/flink-yarn-daemon)を実行し、Flink YARN セッションを開始します(Flink セッション モードをご覧ください)。
    • 2.0 以前のイメージ バージョンを使用してクラスタによる API へのアクセスを有効にする場合は、GceClusterConfig.serviceAccountScopeshttps://www.googleapis.com/auth/cloud-platformcloud-platform スコープ)に設定します(スコープのベスト プラクティスをご覧ください)。 Google CloudDataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform スコープがデフォルトで有効になります。

Flink ジョブは、Google Cloud コンソール、Google Cloud CLI、または Dataproc API から Dataproc Jobs リソースを使用して実行できます。

ConsolegcloudREST

コンソールからサンプルの Flink ワードカウント ジョブを送信するには:

  1. ブラウザの Google Cloud コンソールで Dataproc の [ジョブを送信] ページを開きます。

  2. [ジョブを送信] ページのフィールドに入力します。

    1. クラスタリストから [クラスタ] 名を選択します。
    2. [ジョブタイプ] を Flink に設定します。
    3. [メインクラスまたは JAR] を org.apache.flink.examples.java.wordcount.WordCount に設定します。
    4. [JAR ファイル] を file:///usr/lib/flink/examples/batch/WordCount.jar に設定します。
      • file:/// は、クラスタ上にあるファイルを表します。Dataproc は、Flink クラスタを作成するときに WordCount.jar をインストールしています。
      • このフィールドには、Cloud Storage パス(gs://BUCKET/JARFILE)または Hadoop 分散ファイル システム(HDFS)パス(hdfs://PATH_TO_JAR)も指定できます。
  3. [送信] をクリックします。

    • ジョブドライバの出力は [ジョブの詳細] ページに表示されます。
    • Flink ジョブは、Google Cloud コンソールの Dataproc の [ジョブ] ページに表示されます。
    • ジョブを停止または削除するには、[ジョブ] ページまたは [ジョブの詳細] ページで [停止] または [削除] をクリックします。

Dataproc Flink クラスタに Flink ジョブを送信するには、ターミナル ウィンドウまたは Cloud Shell で gcloud CLI の gcloud dataproc ジョブの送信 コマンドをローカルに実行します。

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

注:

  • CLUSTER_NAME: ジョブを送信する Dataproc Flink クラスタの名前を指定します。
  • REGION: クラスタが配置される Compute Engine リージョンを指定します。
  • MAIN_CLASS: Flink アプリケーションの main クラスを指定します。次に例を示します。
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE: Flink アプリケーションの jar ファイルを指定します。以下を指定できます。
    • file:/// 接頭辞を使用して、クラスタにインストールされている jar ファイル:
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • Cloud Storage の jar ファイル: gs://BUCKET/JARFILE
    • HDFS の jar ファイル: hdfs://PATH_TO_JAR
  • JOB_ARGS: 必要に応じて、2 個のダッシュ(--)の後にジョブ引数を追加します。

  • ジョブを送信すると、ジョブドライバの出力がローカル ターミナルまたは Cloud Shell ターミナルに表示されます。

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)

このセクションでは、Dataproc の jobs.submit API を使用して、Dataproc Flink クラスタに Flink ジョブを送信する方法について説明します。

リクエストのデータを使用する前に、次のように置き換えます。

  • PROJECT_ID: Google Cloud プロジェクト ID
  • REGION: クラスタ リージョン
  • CLUSTER_NAME: ジョブを送信する Dataproc Flink クラスタの名前を指定します

HTTP メソッドと URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

リクエストの本文(JSON):

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

リクエストを送信するには、次のいずれかのオプションを展開します。

リクエスト本文を request.json という名前のファイルに保存して、次のコマンドを実行します。

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit"

リクエスト本文を request.json という名前のファイルに保存して、次のコマンドを実行します。

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit" | Select-Object -Expand Content

次のような JSON レスポンスが返されます。

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
  • Flink ジョブは、Google Cloud コンソールの Dataproc の [ジョブ] ページに表示されます。
  • Google Cloud コンソールの [ジョブ] ページまたは [ジョブの詳細] ページで [停止] または [削除] をクリックすると、ジョブの停止または削除ができます。

Dataproc Jobs リソースを使用して Flink ジョブを実行する代わりに、flink CLI を使用して Flink クラスタのマスターノードで Flink ジョブを実行できます。

以降のセクションでは、Dataproc Flink クラスタで flink CLI ジョブを実行するさまざまな方法について説明します。

  1. マスターノードに SSH 接続する: SSH ユーティリティを使用して、クラスタ マスター VM でターミナル ウィンドウを開きます。

  2. クラスパスを設定する: Flink クラスタ マスター VM の SSH ターミナル ウィンドウから Hadoop クラスパスを初期化します。

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. Flink ジョブを実行する: Flink ジョブをさまざまな YARN のデプロイモードで実行できます。アプリケーション、ジョブごと、セッション モードで行います。

    1. アプリケーション モード: Flink アプリケーション モードは、Dataproc イメージ バージョン 2.0 以降でサポートされています。このモードでは、YARN Job Manager でジョブの main() メソッドを実行します。ジョブが完了すると、クラスタがシャットダウンします。

      ジョブ送信の例:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      実行中のジョブを一覧表示します。

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      実行中のジョブをキャンセルします。

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. ジョブごとモード: この Flink モードでは、クライアント側でジョブの main() メソッドを実行します。

      ジョブ送信の例:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. セッション モード: 長時間実行される Flink YARN セッションを開始してから、1 つ以上のジョブをセッションに送信します。

      1. セッションを開始する: Flink セッションは、次のいずれかの方法で開始できます。

        1. Flink クラスタを作成し、gcloud dataproc clusters create コマンドに --metadata flink-start-yarn-session=true フラグを追加します(Dataproc Flink クラスタを作成するをご覧ください)。このフラグを有効にすると、クラスタの作成後に Dataproc が /usr/bin/flink-yarn-daemon を実行し、クラスタで Flink セッションを開始します。

          セッションの YARN アプリケーション ID は /tmp/.yarn-properties-${USER} に保存されます。ID を一覧表示するには、yarn application -list コマンドを使用します。

        2. クラスタ マスター VM にプリインストールされている Flink yarn-session.sh スクリプトをカスタム設定で実行します。

          カスタム設定の例:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. Flink /usr/bin/flink-yarn-daemon ラッパー スクリプトをデフォルト設定で実行します。

          . /usr/bin/flink-yarn-daemon
          
      2. セッションにジョブを送信する: 次のコマンドを実行して、Flink ジョブをセッションに送信します。

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL: ジョブが実行される Flink マスター VM の URL(ホストとポートを含む)。URL から http:// prefix を削除します。この URL は、Flink セッションを開始したときにコマンド出力に表示されます。次のコマンドを実行して、この URL を Tracking-URL フィールドに一覧表示できます。
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. セッション内のジョブを一覧表示する: セッション内の Flink ジョブを一覧表示するには、次のいずれかを行います。

        • 引数なしで flink list を実行します。このコマンドは、/tmp/.yarn-properties-${USER} でセッションの YARN アプリケーション ID を検索します。

        • /tmp/.yarn-properties-${USER} または yarn application -list の出力からセッションの YARN アプリケーション ID を取得し、<code>flink list -yid YARN_APPLICATION_ID を実行します。

        • flink list -m FLINK_MASTER_URL を実行します。

      4. セッションを停止する: セッションを停止するには、/tmp/.yarn-properties-${USER} または yarn application -list の出力からセッションの YARN アプリケーション ID を取得して、次のいずれかのコマンドを実行します。:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

FlinkRunner を使用して、Dataproc 上で Apache Beam ジョブを実行できます。

次の方法で、Flink で Beam ジョブを実行できます。

  1. Java Beam ジョブ
  2. ポータブル Beam ジョブ

Java Beam ジョブ

Beam ジョブを JAR ファイルにパッケージ化します。バンドルされた JAR ファイルに、ジョブの実行に必要な依存関係を指定します。

次の例では、Dataproc クラスタのマスターノードから Java Beam ジョブが実行されます。

  1. Flink コンポーネントを有効にして Dataproc クラスタを作成します。

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: クラスタにインストールされている Flink のバージョンを決定するクラスタのイメージ バージョン(たとえば、最新と以前の 4 つの 2.0.x イメージ リリース バージョンについては、Apache Flink コンポーネントのバージョンをご覧ください)。
    • --region: サポートされている Dataproc リージョン
    • --enable-component-gateway: Flink ジョブ マネージャー UI へのアクセスを有効にします。
    • --scopes: クラスタによる API へのアクセスを有効にします(スコープのベスト プラクティスをご覧ください)。 Google Cloud Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform スコープがデフォルトで有効になります(このフラグ設定を含める必要はありません)。
  2. SSH ユーティリティを使用して、Flink クラスタ マスターノードでターミナル ウィンドウを開きます。

  3. Dataproc クラスタのマスターノードで Flink YARN セッションを開始します。

    . /usr/bin/flink-yarn-daemon
    

    Dataproc クラスタの Flink バージョンをメモしておきます。

    flink --version
    
  4. ローカルマシン上で、正規の Beam ワードカウントの例を Java に生成します。

    Dataproc クラスタ上で、Flink バージョンと互換性のある Beam バージョンを選択します。Beam-Flink バージョンの互換性を一覧表している Flink バージョンの互換性の表をご覧ください。

    生成された POM ファイルを開きます。タグ <flink.artifact.name> で指定された Beam Flink ランナーのバージョンを確認します。Flink アーティファクト名の Beam Flink ランナー バージョンが、クラスタの Flink バージョンと一致しない場合は、バージョン番号を更新して一致するようにします。

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. ワードカウントの例をパッケージ化します。

    mvn package -Pflink-runner
    
  6. パッケージ化された uber JAR ファイル word-count-beam-bundled-0.1.jar(~ 135 MB)を Dataproc クラスタのマスターノードにアップロードします。gcloud storage cp を使用すると、Cloud Storage から Dataproc クラスタへのファイル転送を高速化できます。

    1. ローカル ターミナル上で、Cloud Storage バケットを作成し、uber JAR をアップロードします。

      gcloud storage buckets create BUCKET_NAME
      
      gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Dataproc のマスターノード上で、uber JAR をダウンロードします。

      gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Dataproc クラスタのマスターノード上で Java Beam ジョブを実行します。

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. 結果が Cloud Storage バケットに書き込まれていることを確認します。

    gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Flink YARN セッションを停止します。

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

ポータブル Beam ジョブ

Python、Go、その他のサポートされている言語で記述された Beam ジョブを実行するには、Beam の Flink Runnerで説明されているように、FlinkRunnerPortableRunner を使用します(ポータビリティ フレームワークのロードマップもご覧ください)。

次の例では、Dataproc クラスタのマスターノードから Python でポータブル Beam ジョブが実行されます。

  1. Flink コンポーネントと Docker コンポーネントの両方を有効にして Dataproc クラスタを作成します。

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    注:

    • --optional-components: Flink と Docker。
    • --image-version: クラスタにインストールされている Flink のバージョンを決定するクラスタのイメージ バージョン(たとえば、最新と以前の 4 つの 2.0.x イメージ リリース バージョンについては、Apache Flink コンポーネントのバージョンをご覧ください)。
    • --region: 使用可能な Dataproc リージョン
    • --enable-component-gateway: Flink ジョブ マネージャー UI へのアクセスを有効にします。
    • --scopes: クラスタによる Google Cloud API へのアクセスを有効にします(スコープのベスト プラクティスをご覧ください)。Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform スコープがデフォルトで有効になります(このフラグ設定を含める必要はありません)。
  2. gcloud CLI をローカルで使用するか、Cloud Shell を使用して、Cloud Storage バケットを作成します。サンプルのワードカウント プログラムを実行するときに BUCKET_NAME を指定します。

    gcloud storage buckets create BUCKET_NAME
    
  3. クラスタ VM のターミナル ウィンドウで、Flink YARN セッションを開始します。Flink のマスター URL、ジョブが実行される Flink マスターのアドレスに注意してください。サンプルのワードカウント プログラムを実行するときに FLINK_MASTER_URL を指定します。

    . /usr/bin/flink-yarn-daemon
    

    Dataproc クラスタを実行している Flink のバージョンを表示してメモします。サンプルのワードカウント プログラムを実行するときに FLINK_VERSION を指定します。

    flink --version
    
  4. クラスタ マスターノードに、ジョブに必要な Python ライブラリをインストールします。

  5. クラスタの Flink バージョンと互換性のある Beam バージョンをインストールします。

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. クラスタ マスターノードでワードカウントの例を実行します。

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    注:

    • --runner: FlinkRunner
    • --flink_version: FLINK_VERSION、前述。
    • --flink_master: FLINK_MASTER_URL、前述。
    • --flink_submit_uber_jar: uber JAR を使用して Beam ジョブを実行します。
    • --output: BUCKET_NAME、以前に作成。
  7. 結果がバケットに書き込まれていることを確認します。

    gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Flink YARN セッションを停止します。

    1. アプリケーション ID を取得します。
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

Dataproc Flink コンポーネントは、Kerberos クラスタをサポートしています。Flink ジョブを送信して保持したり、Flink クラスタを開始したりするには、有効な Kerberos チケットが必要です。デフォルトでは、Kerberos チケットは 7 日間有効です。

Flink ジョブ マネージャー ウェブ インターフェースは、Flink ジョブまたは Flink セッション クラスタの実行中に使用できます。ウェブ インターフェースを使用するには:

  1. Dataproc Flink クラスタを作成します
  2. クラスタを作成後、Google Cloud コンソールの [クラスタの詳細] ページの [ウェブ インターフェース] タブでコンポーネント ゲートウェイYARN ResourceManager リンクをクリックします。
  3. YARN リソース マネージャー UI 上で、Flink クラスタ アプリケーションのエントリを特定します。ジョブの完了ステータスに応じて、[ApplicationMaster] リンクか [履歴] リンクが一覧表示されます。
  4. 長時間実行ストリーミング ジョブの場合は、[ApplicationManager] リンクをクリックして Flink ダッシュボードを開きます。完了したジョブの場合は、[履歴] リンクをクリックしてジョブの詳細を表示します。