Dataproc クラスタを作成する際には、オプション コンポーネント機能を使用して、Flink などの追加コンポーネントを有効にすることができます。このページでは、Apache Flink オプション コンポーネントが有効になっている Dataproc クラスタ(Flink クラスタ)を作成し、クラスタで Flink ジョブを実行する方法について説明します。
Flink クラスタでは、次のことができます。
- Google Cloud コンソール、Google Cloud CLI、または Dataproc API から Dataproc - Jobsリソースを使用して Flink ジョブを実行する。
- Flink クラスタのマスターノードで実行されている - flinkCLI を使用して Flink ジョブを実行する。
Dataproc Flink クラスタを作成する
Google Cloud コンソール、Google Cloud CLI、または Dataproc API を使用して、Flink コンポーネントが有効になっている Dataproc クラスタを作成できます。
推奨事項: Flink コンポーネントでは、標準のマスター VM クラスタを 1 つ使用してください。Dataproc 高可用性モードクラスタ(3 つのマスター VM を含む)では、Flink 高可用性モードはサポートされていません。
コンソール
Google Cloud コンソールで Dataproc Flink クラスタを作成するには、次の操作を行います。
- Dataproc の [Compute Engine で Dataproc クラスタを作成する] ページを開きます。 - [クラスタの設定] パネルが選択されています。- [バージョニング] セクションで、イメージのタイプとバージョンを確認または変更します。クラスタ イメージのバージョンに応じて、クラスタにインストールされる Flink コンポーネントのバージョンが決まります。- クラスタで Flink コンポーネントを有効にするには、イメージ バージョンが 1.5 以降である必要があります(各 Dataproc イメージ リリースに含まれるコンポーネント バージョンの一覧を表示するには、サポートされている Dataproc バージョンをご覧ください)。
- Dataproc Jobs API で Flink ジョブを実行するには、イメージ バージョンが [TBD] 以降である必要があります(Dataproc Flink ジョブを実行するをご覧ください)。
 
- [コンポーネント] セクションで次の設定を行います。
- [コンポーネント ゲートウェイ] で [コンポーネント ゲートウェイを有効にする] を選択します。Flink History Server UI へのコンポーネント ゲートウェイ リンクを有効にするには、コンポーネント ゲートウェイを有効にする必要があります。コンポーネント ゲートウェイを有効にすると、Flink クラスタで実行されている Flink Job Manager ウェブ インターフェースにもアクセスできるようになります。
- [オプション コンポーネント] で、クラスタで有効にする Flink やその他のオプション コンポーネントを選択します。
 
 
- [バージョニング] セクションで、イメージのタイプとバージョンを確認または変更します。クラスタ イメージのバージョンに応じて、クラスタにインストールされる Flink コンポーネントのバージョンが決まります。
- [クラスタのカスタマイズ(省略可)] パネルをクリックします。 - [クラスタのプロパティ] セクションで、クラスタに追加するオプションのクラスタ プロパティごとに [プロパティを追加] をクリックします。 - flink接頭辞付きのプロパティを追加して、クラスタで実行する Flink アプリケーションのデフォルトとして機能する- /etc/flink/conf/flink-conf.yamlに Flink プロパティを構成できます。- 例: - flink:historyserver.archive.fs.dirを設定して、Flink ジョブ履歴ファイルを書き込む Cloud Storage のロケーションを指定します(このロケーションは、Flink クラスタで実行されている Flink History Server によって使用されます)。
- flink:taskmanager.numberOfTaskSlots=nを使用して Flink タスクスロットを設定します。
 
- [カスタム クラスタ メタデータ] セクションで [メタデータを追加] をクリックして、オプションのメタデータを追加します。たとえば、 - flink-start-yarn-session- trueを追加して、クラスタ マスターノードのバックグラウンドで Flink YARN デーモン(- /usr/bin/flink-yarn-daemon)を実行し、Flink YARN セッションを開始します(Flink セッション モードをご覧ください)。
 
- Dataproc イメージ バージョン 2.0 以前を使用している場合は、[セキュリティ管理(省略可)] パネルをクリックし、[プロジェクト アクセス] で - Enables the cloud-platform scope for this clusterを選択します。Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、- cloud-platformスコープがデフォルトで有効になります。
 
- [クラスタの設定] パネルが選択されています。
- [作成] をクリックしてクラスタを作成します。 
gcloud
gcloud CLI を使用して Dataproc Flink クラスタを作成するには、ターミナル ウィンドウまたは Cloud Shell で次の gcloud dataproc のクラスタ作成コマンドをローカルに実行します。
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=DATAPROC_IMAGE_VERSION \ --optional-components=FLINK \ --enable-component-gateway \ --properties=PROPERTIES ... other flags
注:
- CLUSTER_NAME: クラスタの名前を指定します。
- REGION: クラスタが配置される Compute Engine リージョンを指定します。
- DATAPROC_IMAGE_VERSION: 必要に応じて、クラスタで使用するイメージ バージョンを指定します。クラスタ イメージのバージョンに応じて、クラスタにインストールされる Flink コンポーネントのバージョンが決まります。 - クラスタで Flink コンポーネントを有効にするには、イメージ バージョンが 1.5 以降である必要があります(各 Dataproc イメージ リリースに含まれるコンポーネント バージョンの一覧を表示するには、サポートされている Dataproc バージョンをご覧ください)。 
- Dataproc Jobs API で Flink ジョブを実行するには、イメージ バージョンが [TBD] 以降である必要があります(Dataproc Flink ジョブを実行するをご覧ください)。 
 
- --optional-components: クラスタで Flink ジョブと Flink History Server ウェブサービスを実行するには、- FLINKコンポーネントを指定する必要があります。
- --enable-component-gateway: Flink History Server UI へのコンポーネント ゲートウェイ リンクを有効にするには、コンポーネント ゲートウェイを有効にする必要があります。コンポーネント ゲートウェイを有効にすると、Flink クラスタで実行されている Flink Job Manager ウェブ インターフェースにもアクセスできるようになります。
- PROPERTIES。必要に応じて、1 つ以上のクラスタ プロパティを指定します。 - イメージ バージョン - 2.0.67以降と- 2.1.15以降を使用して Dataproc クラスタを作成する場合、- --propertiesフラグを使用して、クラスタで実行する Flink アプリケーションのデフォルトとして機能する- /etc/flink/conf/flink-conf.yamlに Flink プロパティを構成できます。
- flink:historyserver.archive.fs.dirを設定すると、Flink ジョブ履歴ファイルを書き込む Cloud Storage のロケーションを指定できます(このロケーションは、Flink クラスタで実行されている Flink History Server によって使用されます)。
- 複数のプロパティの例: 
 - --properties=flink:historyserver.archive.fs.dir=gs://my-bucket/my-flink-cluster/completed-jobs,flink:taskmanager.numberOfTaskSlots=2
- その他のフラグ: - クラスタ マスターノードのバックグラウンドで Flink YARN デーモン(/usr/bin/flink-yarn-daemon)を実行するには、オプションの--metadata flink-start-yarn-session=trueフラグを追加して Flink YARN セッションを開始します(Flink セッション モードをご覧ください)。
 
- クラスタ マスターノードのバックグラウンドで Flink YARN デーモン(
- 2.0 以前のイメージ バージョンを使用する場合は、 - --scopes=https://www.googleapis.com/auth/cloud-platformフラグを追加して、クラスタによる Google Cloud API へのアクセスを有効にできます(スコープのベスト プラクティスを参照)。Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、- cloud-platformスコープがデフォルトで有効になります。
API
Dataproc API を使用して Dataproc Flink クラスタを作成するには、次のように clusters.create リクエストを送信します。
注:
- SoftwareConfig.Component を - FLINKに設定します。
- 必要に応じて - SoftwareConfig.imageVersionを設定して、クラスタで使用するイメージ バージョンを指定できます。クラスタ イメージのバージョンに応じて、クラスタにインストールされる Flink コンポーネントのバージョンが決まります。- クラスタで Flink コンポーネントを有効にするには、イメージ バージョンが 1.5 以降である必要があります(各 Dataproc イメージ リリースに含まれるコンポーネント バージョンの一覧を表示するには、サポートされている Dataproc バージョンをご覧ください)。 
- Dataproc Jobs API で Flink ジョブを実行するには、イメージ バージョンが [TBD] 以降である必要があります(Dataproc Flink ジョブを実行するをご覧ください)。 
 
- EndpointConfig.enableHttpPortAccess を - trueに設定して、Flink History Server UI へのコンポーネント ゲートウェイのリンクを有効にします。コンポーネント ゲートウェイを有効にすると、Flink クラスタで実行されている Flink Job Manager ウェブ インターフェースにもアクセスできるようになります。
- 必要に応じて - SoftwareConfig.propertiesを設定して、1 つ以上のクラスタ プロパティを指定できます。- クラスタで実行する Flink アプリケーションのデフォルトとして機能する Flink プロパティを指定できます。flink:historyserver.archive.fs.dirを設定すると、Flink ジョブ履歴ファイルを書き込む Cloud Storage のロケーションを指定できます(このロケーションは、Flink クラスタで実行されている Flink History Server によって使用されます)。
 
- クラスタで実行する Flink アプリケーションのデフォルトとして機能する Flink プロパティを指定できます。
- 必要に応じて、次の項目を設定できます。 - GceClusterConfig.metadata。たとえば、- flink-start-yarn-session- trueを指定して、クラスタ マスターノードのバックグラウンドで Flink YARN デーモン(- /usr/bin/flink-yarn-daemon)を実行し、Flink YARN セッションを開始します(Flink セッション モードをご覧ください)。
- 2.0 以前のイメージ バージョンを使ってクラスタによる Google CloudAPI へのアクセスを有効にする場合は、GceClusterConfig.serviceAccountScopes を https://www.googleapis.com/auth/cloud-platform(cloud-platformスコープ)に設定します(スコープのベスト プラクティスをご覧ください)。Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platformスコープがデフォルトで有効になります。
 
Flink クラスタを作成した後
- コンポーネント ゲートウェイの Flink History Serverリンクを使用して、Flink クラスタで実行されている Flink History Server を表示します。
- コンポーネント ゲートウェイの YARN ResourceManager linkを使用して、Flink クラスタで実行されている Flink Job Manager ウェブ インターフェースを表示します。
- Dataproc 永続履歴サーバーを作成して、既存の Flink クラスタと削除された Flink クラスタによって書き込まれた Flink ジョブ履歴ファイルを表示します。
Dataproc Jobs リソースを使用して Flink ジョブを実行する
Flink ジョブは、Google Cloud コンソール、Google Cloud CLI、または Dataproc API から Dataproc Jobs リソースを使用して実行できます。
コンソール
コンソールからサンプルの Flink ワードカウント ジョブを送信するには:
- ブラウザのGoogle Cloud コンソールで Dataproc の [ジョブの送信] ページを開きます。 
- [ジョブの送信] ページのフィールドに入力します。 - クラスタリストから [クラスタ] 名を選択します。
- [ジョブタイプ] を Flinkに設定します。
- [メインクラスまたは JAR] を org.apache.flink.examples.java.wordcount.WordCountに設定します。
- [JAR ファイル] を file:///usr/lib/flink/examples/batch/WordCount.jarに設定します。- file:///は、クラスタ上にあるファイルを表します。Dataproc は、Flink クラスタを作成するときに- WordCount.jarをインストールしています。
- このフィールドには、Cloud Storage パス(gs://BUCKET/JARFILE)または Hadoop Distributed File System(HDFS)のパス(hdfs://PATH_TO_JAR)も指定できます。
 
 
- [送信] をクリックします。 - ジョブドライバの出力は [ジョブの詳細] ページに表示されます。
   - Flink ジョブは、 Google Cloud コンソールの Dataproc の [ジョブ] ページに表示されます。
- ジョブを停止または削除するには、[ジョブ] ページまたは [ジョブの詳細] ページで [停止] または [削除] をクリックします。
 
gcloud
Dataproc Flink クラスタに Flink ジョブを送信するには、ターミナル ウィンドウまたは Cloud Shell で gcloud CLI の gcloud dataproc ジョブの送信コマンドをローカルに実行します。
gcloud dataproc jobs submit flink \ --cluster=CLUSTER_NAME \ --region=REGION \ --class=MAIN_CLASS \ --jar=JAR_FILE \ -- JOB_ARGS
注:
- CLUSTER_NAME: ジョブを送信する Dataproc Flink クラスタの名前を指定します。
- REGION: クラスタが配置される Compute Engine リージョンを指定します。
- MAIN_CLASS: Flink アプリケーションの mainクラスを指定します。次に例を示します。- org.apache.flink.examples.java.wordcount.WordCount
 
- JAR_FILE: Flink アプリケーションの jar ファイルを指定します。次のように指定できます。
- file:/// 接頭辞を使用して、クラスタにインストールされている jar ファイル:- file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
- file:///usr/lib/flink/examples/batch/WordCount.jar
 
- Cloud Storage の jar ファイル:
gs://BUCKET/JARFILE
- HDFS の jar ファイル:
hdfs://PATH_TO_JAR
 
- JOB_ARGS: 必要に応じて、2 個のダッシュ( - --)の後にジョブ引数を追加します。
- ジョブを送信すると、ジョブドライバの出力がローカル ターミナルまたは Cloud Shell ターミナルに表示されます。 - Program execution finished Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished. Job Runtime: 13610 ms ... (after,1) (and,12) (arrows,1) (ay,1) (be,4) (bourn,1) (cast,1) (coil,1) (come,1) 
REST
このセクションでは、Dataproc の jobs.submit API を使用して、Dataproc Flink クラスタに Flink ジョブを送信する方法について説明します。
リクエストのデータを使用する前に、次のように置き換えます。
- PROJECT_ID: Google Cloud プロジェクト ID
- REGION: クラスタ リージョン
- CLUSTER_NAME: ジョブを送信する Dataproc Flink クラスタの名前を指定します。
HTTP メソッドと URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit
リクエストの本文(JSON):
{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}
リクエストを送信するには、次のいずれかのオプションを展開します。
次のような JSON レスポンスが返されます。
{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
- Flink ジョブは、 Google Cloud コンソールの Dataproc の [ジョブ] ページに表示されます。
- Google Cloud コンソールの [ジョブ] ページまたは [ジョブの詳細] ページで [停止] または [削除] をクリックすると、ジョブの停止または削除ができます。
flink CLI を使用して Flink ジョブを実行する
Dataproc Jobs リソースを使用して Flink ジョブを実行する代わりに、flink CLI を使用して Flink クラスタのマスターノードで Flink ジョブを実行できます。
以降のセクションでは、Dataproc Flink クラスタで flink CLI ジョブを実行するさまざまな方法について説明します。
- マスターノードに SSH 接続する: SSH ユーティリティを使用して、クラスタ マスター VM でターミナル ウィンドウを開きます。 
- クラスパスを設定する: Flink クラスタ マスター VM の SSH ターミナル ウィンドウから Hadoop クラスパスを初期化します。 - export HADOOP_CLASSPATH=$(hadoop classpath)
- Flink ジョブを実行する: Flink ジョブをさまざまな YARN のデプロイモードで実行できます。アプリケーション、ジョブごと、セッション モードで行います。 - アプリケーション モード: Flink アプリケーション モードは、Dataproc イメージ バージョン 2.0 以降でサポートされています。このモードでは、YARN Job Manager でジョブの - main()メソッドを実行します。ジョブが完了すると、クラスタがシャットダウンします。- ジョブ送信の例: - flink run-application \ -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -Djobmanager.heap.mb=820 \ -Dtaskmanager.heap.mb=1640 \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=4 \ /usr/lib/flink/examples/batch/WordCount.jar- 実行中のジョブを一覧表示します。 - ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY- 実行中のジョブをキャンセルします。 - ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
- ジョブ単位モード: この Flink モードでは、クライアント側でジョブの - main()メソッドを実行します。- ジョブ送信の例: - flink run \ -m yarn-cluster \ -p 4 \ -ys 2 \ -yjm 1024m \ -ytm 2048m \ /usr/lib/flink/examples/batch/WordCount.jar
- セッション モード: 長時間実行される Flink YARN セッションを開始してから、1 つ以上のジョブをセッションに送信します。 - セッションを開始する: Flink セッションは、次のいずれかの方法で開始できます。 - Flink クラスタを作成し、 - gcloud dataproc clusters createコマンドに- --metadata flink-start-yarn-session=trueフラグを追加します(Dataproc Flink クラスタを作成するをご覧ください)。このフラグを有効にすると、クラスタの作成後に Dataproc が- /usr/bin/flink-yarn-daemonを実行し、クラスタで Flink セッションを開始します。- セッションの YARN アプリケーション ID は - /tmp/.yarn-properties-${USER}に保存されます。ID を一覧表示するには、- yarn application -listコマンドを使用します。
- クラスタ マスター VM にプリインストールされている Flink - yarn-session.shスクリプトをカスタム設定で実行します。- カスタム設定の例: - /usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached
- Flink - /usr/bin/flink-yarn-daemonラッパー スクリプトをデフォルト設定で実行します。- . /usr/bin/flink-yarn-daemon
 
- セッションにジョブを送信する: 次のコマンドを実行して、Flink ジョブをセッションに送信します。 - flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar- FLINK_MASTER_URL: ジョブが実行される Flink マスター VM の URL(ホストとポートを含む)。URL から http:// prefixを削除します。この URL は、Flink セッションを開始したときにコマンド出力に表示されます。次のコマンドを実行して、この URL をTracking-URLフィールドに一覧表示できます。
 - yarn application -list -appId=<yarn-app-id> | sed 's#http://##' ```
- FLINK_MASTER_URL: ジョブが実行される Flink マスター VM の URL(ホストとポートを含む)。URL から 
- セッション内のジョブを一覧表示する: セッション内の Flink ジョブを一覧表示するには、次のいずれかを行います。 - 引数なしで - flink listを実行します。このコマンドは、- /tmp/.yarn-properties-${USER}でセッションの YARN アプリケーション ID を検索します。
- /tmp/.yarn-properties-${USER}または- yarn application -listの出力からセッションの YARN アプリケーション ID を取得し、- <code>flink list -yid YARN_APPLICATION_ID を実行します。
- flink list -m FLINK_MASTER_URLを実行します。
 
- セッションを停止する: セッションを停止するには、 - /tmp/.yarn-properties-${USER}または- yarn application -listの出力からセッションの YARN アプリケーション ID を取得して、次のいずれかのコマンドを実行します。- echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID- yarn application -kill YARN_APPLICATION_ID
 
 
Flink で Apache Beam ジョブを実行する
FlinkRunner を使用して、Dataproc 上で Apache Beam ジョブを実行できます。
次の方法で、Flink で Beam ジョブを実行できます。
- Java Beam ジョブ
- ポータブル Beam ジョブ
Java Beam ジョブ
Beam ジョブを JAR ファイルにパッケージ化します。バンドルされた JAR ファイルに、ジョブの実行に必要な依存関係を指定します。
次の例では、Dataproc クラスタのマスターノードから Java Beam ジョブが実行されます。
- Flink コンポーネントを有効にして Dataproc クラスタを作成します。 - gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform- --optional-components: Flink。
- --image-version: クラスタにインストールされている Flink のバージョンを決定するクラスタのイメージ バージョン(たとえば、最新と以前の 4 つの 2.0.x イメージ リリース バージョンについては、Apache Flink コンポーネントのバージョンをご覧ください)。
- --region: サポートされている Dataproc リージョン。
- --enable-component-gateway: Flink Job Manager UI へのアクセスを有効にします。
- --scopes: クラスタによる Google Cloud API へのアクセスを有効にします(スコープのベスト プラクティスをご覧ください)。Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、- cloud-platformスコープがデフォルトで有効になります(このフラグ設定を含める必要はありません)。
 
- SSH ユーティリティを使用して、Flink クラスタ マスターノードでターミナル ウィンドウを開きます。 
- Dataproc クラスタのマスターノードで Flink YARN セッションを開始します。 - . /usr/bin/flink-yarn-daemon- Dataproc クラスタの Flink バージョンをメモしておきます。 - flink --version
- ローカルマシン上で、正規の Beam ワードカウントのサンプルを Java で生成します。 - Dataproc クラスタ上で、Flink バージョンと互換性のある Beam バージョンを選択します。Beam-Flink バージョンの互換性の一覧を示している Flink バージョンの互換性の表をご覧ください。 - 生成された POM ファイルを開きます。タグ - <flink.artifact.name>で指定された Beam Flink ランナーのバージョンを確認します。Flink アーティファクト名の Beam Flink ランナー バージョンが、クラスタの Flink バージョンと一致しない場合は、バージョン番号を更新して一致するようにします。- mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
- ワードカウントの例をパッケージ化します。 - mvn package -Pflink-runner
- パッケージ化された uber JAR ファイル - word-count-beam-bundled-0.1.jar(~135 MB)を Dataproc クラスタのマスターノードにアップロードします。- gcloud storage cpを使用すると、Cloud Storage から Dataproc クラスタへのファイル転送を高速化できます。- ローカル ターミナル上で、Cloud Storage バケットを作成し、uber JAR をアップロードします。 - gcloud storage buckets create BUCKET_NAME- gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
- Dataproc のマスターノード上で、uber JAR をダウンロードします。 - gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
 
- Dataproc クラスタのマスターノード上で Java Beam ジョブを実行します。 - flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
- 結果が Cloud Storage バケットに書き込まれていることを確認します。 - gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
- Flink YARN セッションを停止します。 - yarn application -list- yarn application -kill YARN_APPLICATION_ID
ポータブル Beam ジョブ
Python、Go、その他のサポートされている言語で記述された Beam ジョブを実行するには、Beam の Flink Runnerで説明されているように、FlinkRunner と PortableRunner を使用します(ポータビリティ フレームワークのロードマップもご覧ください)。
次の例では、Dataproc クラスタのマスターノードから Python でポータブル Beam ジョブが実行されます。
- Flink コンポーネントと Docker コンポーネントの両方を有効にして Dataproc クラスタを作成します。 - gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform- 注: - --optional-components: Flink と Docker。
- --image-version: クラスタにインストールされている Flink のバージョンを決定するクラスタのイメージ バージョン(たとえば、最新と以前の 4 つの 2.0.x イメージ リリース バージョンについては、Apache Flink コンポーネントのバージョンをご覧ください)。
- --region: 使用可能な Dataproc リージョン。
- --enable-component-gateway: Flink Job Manager UI へのアクセスを有効にします。
- --scopes: クラスタによる Google Cloud API へのアクセスを有効にします(スコープのベスト プラクティスをご覧ください)。Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、- cloud-platformスコープがデフォルトで有効になります(このフラグ設定を含める必要はありません)。
 
- gcloud CLI をローカルで使用するか、Cloud Shell を使用して、Cloud Storage バケットを作成します。サンプルのワードカウント プログラムを実行するときに BUCKET_NAME を指定します。 - gcloud storage buckets create BUCKET_NAME
- クラスタ VM のターミナル ウィンドウで、Flink YARN セッションを開始します。Flink のマスター URL、ジョブが実行される Flink マスターのアドレスに注意してください。サンプルのワードカウント プログラムを実行するときに FLINK_MASTER_URL を指定します。 - . /usr/bin/flink-yarn-daemon- Dataproc クラスタを実行している Flink のバージョンを表示してメモします。サンプルのワードカウント プログラムを実行するときに FLINK_VERSION を指定します。 - flink --version
- クラスタ マスターノードに、ジョブに必要な Python ライブラリをインストールします。 
- クラスタの Flink バージョンと互換性のある Beam バージョンをインストールします。 - python -m pip install apache-beam[gcp]==BEAM_VERSION
- クラスタ マスターノードでワードカウントの例を実行します。 - python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out- 注: - --runner:- FlinkRunner
- --flink_version: FLINK_VERSION、前述。
- --flink_master: FLINK_MASTER_URL、前述。
- --flink_submit_uber_jar: uber JAR を使用して Beam ジョブを実行します。
- --output: BUCKET_NAME、以前に作成。
 
- 結果がバケットに書き込まれていることを確認します。 - gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
- Flink YARN セッションを停止します。 - アプリケーション ID を取得します。
 - yarn application -list1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.- yarn application -kill
Kerberos クラスタで Flink を実行する
Dataproc Flink コンポーネントは、Kerberos クラスタをサポートしています。Flink ジョブを送信して保持したり、Flink クラスタを開始したりするには、有効な Kerberos チケットが必要です。デフォルトでは、Kerberos チケットは 7 日間有効です。
Flink Job Manager UI にアクセスする
Flink Job Manager ウェブ インターフェースは、Flink ジョブまたは Flink セッション クラスタの実行中に使用できます。ウェブ インターフェースを使用するには:
- Dataproc Flink クラスタを作成します。
- クラスタを作成後、 Google Cloud コンソールの [クラスタの詳細] ページの [ウェブ インターフェース] タブでコンポーネント ゲートウェイの YARN ResourceManager リンクをクリックします。
- YARN Resource Manager UI 上で、Flink クラスタ アプリケーションのエントリを特定します。ジョブの完了ステータスに応じて、[ApplicationMaster] リンクか [履歴] リンクが一覧表示されます。  
- 長時間実行ストリーミング ジョブの場合は、[ApplicationManager] リンクをクリックして Flink ダッシュボードを開きます。完了したジョブの場合は、[履歴] リンクをクリックしてジョブの詳細を表示します。