Dataproc クラスタを作成する際には、オプション コンポーネント機能を使用して、Flink などの追加コンポーネントを有効化できます。このページでは、Apache Flink オプション コンポーネントが有効になっている Dataproc クラスタ(Flink クラスタ)を作成し、クラスタで Flink ジョブを実行する方法について説明します。
Flink クラスタを使用すると、次のことができます。
Google Cloud コンソール、Google Cloud CLI、または Dataproc API から Dataproc
Jobs
リソースを使用して Flink ジョブを実行します。Flink クラスタのマスターノードで実行されている
flink
CLI を使用して Flink ジョブを実行する。
Dataproc Flink クラスタを作成する
Google Cloud コンソール、Google Cloud CLI、または Dataproc API を使用して、クラスタで Flink コンポーネントが有効になっている Dataproc クラスタを作成できます。
推奨事項: Flink コンポーネントで標準の 1 マスター VM クラスタを使用します。Dataproc 高可用性モードクラスタ(3 つのマスター VM を含む)では、Flink 高可用性モードはサポートされていません。
Console
Google Cloud コンソールを使用して Dataproc Flink クラスタを作成するには、次の操作を行います。
Dataproc の [Compute Engine に Dataproc クラスタを作成する] ページを開きます。
- [クラスターを設定] パネルが選択されています。
- [バージョニング] セクションで、イメージのタイプとバージョンを確認または変更します。クラスタ イメージのバージョンに応じて、クラスタにインストールされる Flink コンポーネントのバージョンが決まります。
- クラスタで Flink コンポーネントを有効にするには、イメージ バージョンが 1.5 以降である必要があります(各 Dataproc イメージ リリースに含まれるコンポーネント バージョンの一覧を表示するには、サポートされている Dataproc バージョンをご覧ください)。
- Dataproc Jobs API で Flink ジョブを実行するには、イメージ バージョンが [TBD] 以降である必要があります(Dataproc Flink ジョブを実行するをご覧ください)。
- [コンポーネント] セクションで次の設定を行います。
- [コンポーネント ゲートウェイ] で [コンポーネント ゲートウェイを有効にする] を選択します。Flink History Server UI へのコンポーネント ゲートウェイ リンクを有効にするには、コンポーネント ゲートウェイを有効にする必要があります。コンポーネント ゲートウェイを有効にすると、Flink クラスタで実行されている Flink ジョブ マネージャー ウェブ インターフェースにもアクセスできるようになります。
- [オプション コンポーネント] で、クラスタで有効にする Flink やその他のオプション コンポーネントを選択します。
- [バージョニング] セクションで、イメージのタイプとバージョンを確認または変更します。クラスタ イメージのバージョンに応じて、クラスタにインストールされる Flink コンポーネントのバージョンが決まります。
[クラスタのカスタマイズ(省略可)] パネルをクリックします。
[クラスタのプロパティ] セクションで、クラスタに追加するオプションのクラスタ プロパティごとに [プロパティを追加] をクリックします。
flink
接頭辞付きのプロパティを追加して、クラスタで実行する Flink アプリケーションのデフォルトとして機能する/etc/flink/conf/flink-conf.yaml
に Flink プロパティを構成できます。例:
flink:historyserver.archive.fs.dir
を設定して、Flink ジョブ履歴ファイルを書き込む Cloud Storage のロケーションを指定します(このロケーションは、Flink クラスタで実行されている Flink 履歴サーバーによって使用されます)。flink:taskmanager.numberOfTaskSlots=n
を使用して Flink タスク スロットを設定します。
[カスタム クラスタ メタデータ] セクションで [メタデータを追加] をクリックして、オプションのメタデータを追加します。たとえば、
flink-start-yarn-session
true
を追加して、クラスタ マスターノードのバックグラウンドで Flink YARN デーモン(/usr/bin/flink-yarn-daemon
)を実行し、Flink YARN セッションを開始します(Flink セッション モードをご覧ください)。
Dataproc イメージ バージョン 2.0 以前を使用している場合は、セキュリティを管理する(省略可)パネルをクリックし、プロジェクトへのアクセスで、
Enables the cloud-platform scope for this cluster
を選択します。Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform
スコープがデフォルトで有効になります。
- [クラスターを設定] パネルが選択されています。
[作成] をクリックしてクラスタを作成します。
gcloud
gcloud CLI を使用して Dataproc Flink クラスタを作成するには、ターミナル ウィンドウまたは Cloud Shell で次の gcloud dataproc のクラスタ作成 コマンドをローカルに実行します。
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=DATAPROC_IMAGE_VERSION \ --optional-components=FLINK \ --enable-component-gateway \ --properties=PROPERTIES ... other flags
注:
- CLUSTER_NAME: クラスタの名前を指定します。
- REGION: クラスタが配置される Compute Engine リージョンを指定します。
DATAPROC_IMAGE_VERSION: 必要に応じて、クラスタで使用するイメージ バージョンを指定します。クラスタ イメージのバージョンに応じて、クラスタにインストールされる Flink コンポーネントのバージョンが決まります。
クラスタで Flink コンポーネントを有効にするには、イメージ バージョンが 1.5 以降である必要があります(各 Dataproc イメージ リリースに含まれるコンポーネント バージョンの一覧を表示するには、サポートされている Dataproc バージョンをご覧ください)。
Dataproc Jobs API で Flink ジョブを実行するには、イメージ バージョンが [TBD] 以降である必要があります(Dataproc Flink ジョブを実行するをご覧ください)。
--optional-components
: クラスタで Flink ジョブと Flink HistoryServer ウェブサービスを実行するには、FLINK
コンポーネントを指定する必要があります。--enable-component-gateway
: Flink 履歴サーバー UI へのコンポーネント ゲートウェイ リンクを有効にするには、コンポーネント ゲートウェイを有効にする必要があります。コンポーネント ゲートウェイを有効にすると、Flink クラスタで実行されている Flink ジョブ マネージャー ウェブ インターフェースにもアクセスできるようになります。PROPERTIES。必要に応じて、1 つ以上のクラスタ プロパティを指定します。
イメージ バージョン
2.0.67
以降と2.1.15
以降を使用して Dataproc クラスタを作成する場合、--properties
フラグを使用して、クラスタで実行する Flink アプリケーションのデフォルトとして機能する/etc/flink/conf/flink-conf.yaml
に Flink プロパティを構成できます。flink:historyserver.archive.fs.dir
を設定すると、Flink ジョブ履歴ファイルを書き込む Cloud Storage のロケーションを指定できます(このロケーションは、Flink クラスタで実行されている Flink 履歴サーバーによって使用されます)。複数のプロパティの例:
--properties=flink:historyserver.archive.fs.dir=gs://my-bucket/my-flink-cluster/completed-jobs,flink:taskmanager.numberOfTaskSlots=2
その他のフラグ:
- クラスタ マスターノードのバックグラウンドで Flink YARN デーモン(
/usr/bin/flink-yarn-daemon
)を実行するには、オプションの--metadata flink-start-yarn-session=true
フラグ を追加して Flink YARN セッションを開始します(Flink セッション モードをご覧ください)。
- クラスタ マスターノードのバックグラウンドで Flink YARN デーモン(
2.0 以前のイメージ バージョンを使用する場合は、
--scopes=https://www.googleapis.com/auth/cloud-platform
フラグを追加して、クラスタによる Google Cloud API へのアクセスを有効にできます(スコープのベスト プラクティスを参照)。Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform
スコープがデフォルトで有効になります。
API
Dataproc API を使用して Dataproc Flink クラスタを作成するには、次のように clusters.create リクエストを送信します。
注:
SoftwareConfig.Component を
FLINK
に設定します。必要に応じて
SoftwareConfig.imageVersion
を設定して、クラスタで使用するイメージ バージョンを指定できます。クラスタ イメージのバージョンに応じて、クラスタにインストールされる Flink コンポーネントのバージョンが決まります。クラスタで Flink コンポーネントを有効にするには、イメージ バージョンが 1.5 以降である必要があります(各 Dataproc イメージ リリースに含まれるコンポーネント バージョンの一覧を表示するには、サポートされている Dataproc バージョンをご覧ください)。
Dataproc Jobs API で Flink ジョブを実行するには、イメージ バージョンが [TBD] 以降である必要があります(Dataproc Flink ジョブを実行するをご覧ください)。
EndpointConfig.enableHttpPortAccess を
true
に設定して、Flink History Server UI へのコンポーネント ゲートウェイのリンクを有効にします。コンポーネント ゲートウェイを有効にすると、Flink クラスタで実行されている Flink ジョブ マネージャー ウェブ インターフェースにもアクセスできるようになります。必要に応じて
SoftwareConfig.properties
を設定して、1 つ以上のクラスタ プロパティを指定できます。- クラスタで実行する Flink アプリケーションのデフォルトとして機能する Flink プロパティを指定できます。たとえば、
flink:historyserver.archive.fs.dir
を設定すると、Flink ジョブ履歴ファイルを書き込む Cloud Storage のロケーションを指定できます(このロケーションは、Flink クラスタで実行されている Flink 履歴サーバーによって使用されます)。
- クラスタで実行する Flink アプリケーションのデフォルトとして機能する Flink プロパティを指定できます。たとえば、
必要に応じて、次の項目を設定できます。
GceClusterConfig.metadata
。たとえば、flink-start-yarn-session
true
を指定して、クラスタ マスターノードのバックグラウンドで Flink YARN デーモン(/usr/bin/flink-yarn-daemon
)を実行し、Flink YARN セッションを開始します(Flink セッション モードをご覧ください)。- 2.0 以前のイメージ バージョンを使ってクラスタによる Google Cloud API へのアクセスを有効にする場合は、GceClusterConfig.serviceAccountScopes を
https://www.googleapis.com/auth/cloud-platform
(cloud-platform
スコープ)に設定します(スコープのベスト プラクティスをご覧ください)。Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform
スコープがデフォルトで有効になります。
Flink クラスタを作成した後
- コンポーネント ゲートウェイの
Flink History Server
リンクを使用して、Flink クラスタで実行されている Flink History Server を表示します。 - コンポーネント ゲートウェイの
YARN ResourceManager link
を使用して、Flink クラスタで実行されている Flink ジョブ マネージャー ウェブ インターフェースを表示します。 - Dataproc 永続履歴サーバーを作成して、既存の Flink クラスタと削除された Flink クラスタによって書き込まれた Flink ジョブ履歴ファイルを表示します。
Dataproc Jobs
リソースを使用して Flink ジョブを実行する
Flink ジョブは、Google Cloud コンソール、Google Cloud CLI、または Dataproc API から Dataproc Jobs
リソースを使用して実行できます。
Console
コンソールからサンプルの Flink ワードカウント ジョブを送信するには:
ブラウザの Google Cloud コンソールで Dataproc の [ジョブを送信] ページを開きます。
[ジョブを送信] ページのフィールドに入力します。
- クラスタリストから [クラスタ] 名を選択します。
- [ジョブタイプ] を
Flink
に設定します。 - [メインクラスまたは JAR] を
org.apache.flink.examples.java.wordcount.WordCount
に設定します。 - [JAR ファイル] を
file:///usr/lib/flink/examples/batch/WordCount.jar
に設定します。file:///
は、クラスタ上にあるファイルを表します。Dataproc は、Flink クラスタを作成するときにWordCount.jar
をインストールしています。- このフィールドには、Cloud Storage パス(
gs://BUCKET/JARFILE
)または Hadoop 分散ファイル システム(HDFS)パス(hdfs://PATH_TO_JAR
)も指定できます。
[送信] をクリックします。
- ジョブドライバの出力は [ジョブの詳細] ページに表示されます。
- Flink ジョブは、Google Cloud コンソールの Dataproc の [ジョブ] ページに表示されます。
- ジョブを停止または削除するには、[ジョブ] ページまたは [ジョブの詳細] ページで [停止] または [削除] をクリックします。
gcloud
Dataproc Flink クラスタに Flink ジョブを送信するには、ターミナル ウィンドウまたは Cloud Shell で gcloud CLI の gcloud dataproc ジョブの送信 コマンドをローカルに実行します。
gcloud dataproc jobs submit flink \ --cluster=CLUSTER_NAME \ --region=REGION \ --class=MAIN_CLASS \ --jar=JAR_FILE \ -- JOB_ARGS
注:
- CLUSTER_NAME: ジョブを送信する Dataproc Flink クラスタの名前を指定します。
- REGION: クラスタが配置される Compute Engine リージョンを指定します。
- MAIN_CLASS: Flink アプリケーションの
main
クラスを指定します。次に例を示します。org.apache.flink.examples.java.wordcount.WordCount
- JAR_FILE: Flink アプリケーションの jar ファイルを指定します。以下を指定できます。
file:/// 接頭辞を使用して、クラスタにインストールされている jar ファイル:
file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
file:///usr/lib/flink/examples/batch/WordCount.jar
- Cloud Storage の jar ファイル:
gs://BUCKET/JARFILE
- HDFS の jar ファイル:
hdfs://PATH_TO_JAR
JOB_ARGS: 必要に応じて、2 個のダッシュ(
--
)の後にジョブ引数を追加します。ジョブを送信すると、ジョブドライバの出力がローカル ターミナルまたは Cloud Shell ターミナルに表示されます。
Program execution finished Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished. Job Runtime: 13610 ms ... (after,1) (and,12) (arrows,1) (ay,1) (be,4) (bourn,1) (cast,1) (coil,1) (come,1)
REST
このセクションでは、Dataproc の jobs.submit API を使用して、Dataproc Flink クラスタに Flink ジョブを送信する方法について説明します。
リクエストのデータを使用する前に、次のように置き換えます。
- PROJECT_ID: Google Cloud プロジェクト ID
- REGION: クラスタ リージョン
- CLUSTER_NAME: ジョブを送信する Dataproc Flink クラスタの名前を指定します
HTTP メソッドと URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit
リクエストの本文(JSON):
{ "job": { "placement": { "clusterName": "CLUSTER_NAME" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] } } }
リクエストを送信するには、次のいずれかのオプションを展開します。
次のような JSON レスポンスが返されます。
{ "reference": { "projectId": "PROJECT_ID", "jobId": "JOB_ID" }, "placement": { "clusterName": "CLUSTER_NAME", "clusterUuid": "CLUSTER_UUID" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "args": [ "1000" ], "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] }, "status": { "state": "PENDING", "stateStartTime": "2020-10-07T20:16:21.759Z" }, "jobUuid": "JOB_UUID" }
- Flink ジョブは、Google Cloud コンソールの Dataproc の [ジョブ] ページに表示されます。
- Google Cloud コンソールの [ジョブ] ページまたは [ジョブの詳細] ページで [停止] または [削除] をクリックすると、ジョブの停止または削除ができます。
flink
CLI を使用して Flink ジョブを実行する
Dataproc Jobs
リソースを使用して Flink ジョブを実行する代わりに、flink
CLI を使用して Flink クラスタのマスターノードで Flink ジョブを実行できます。
以降のセクションでは、Dataproc Flink クラスタで flink
CLI ジョブを実行するさまざまな方法について説明します。
マスターノードに SSH 接続する: SSH ユーティリティを使用して、クラスタ マスター VM でターミナル ウィンドウを開きます。
クラスパスを設定する: Flink クラスタ マスター VM の SSH ターミナル ウィンドウから Hadoop クラスパスを初期化します。
export HADOOP_CLASSPATH=$(hadoop classpath)
Flink ジョブを実行する: Flink ジョブをさまざまな YARN のデプロイモードで実行できます。アプリケーション、ジョブごと、セッション モードで行います。
アプリケーション モード: Flink アプリケーション モードは、Dataproc イメージ バージョン 2.0 以降でサポートされています。このモードでは、YARN Job Manager でジョブの
main()
メソッドを実行します。ジョブが完了すると、クラスタがシャットダウンします。ジョブ送信の例:
flink run-application \ -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -Djobmanager.heap.mb=820 \ -Dtaskmanager.heap.mb=1640 \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=4 \ /usr/lib/flink/examples/batch/WordCount.jar
実行中のジョブを一覧表示します。
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
実行中のジョブをキャンセルします。
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
ジョブごとモード: この Flink モードでは、クライアント側でジョブの
main()
メソッドを実行します。ジョブ送信の例:
flink run \ -m yarn-cluster \ -p 4 \ -ys 2 \ -yjm 1024m \ -ytm 2048m \ /usr/lib/flink/examples/batch/WordCount.jar
セッション モード: 長時間実行される Flink YARN セッションを開始してから、1 つ以上のジョブをセッションに送信します。
セッションを開始する: Flink セッションは、次のいずれかの方法で開始できます。
Flink クラスタを作成し、
gcloud dataproc clusters create
コマンドに--metadata flink-start-yarn-session=true
フラグを追加します(Dataproc Flink クラスタを作成するをご覧ください)。このフラグを有効にすると、クラスタの作成後に Dataproc が/usr/bin/flink-yarn-daemon
を実行し、クラスタで Flink セッションを開始します。セッションの YARN アプリケーション ID は
/tmp/.yarn-properties-${USER}
に保存されます。ID を一覧表示するには、yarn application -list
コマンドを使用します。クラスタ マスター VM にプリインストールされている Flink
yarn-session.sh
スクリプトをカスタム設定で実行します。カスタム設定の例:
/usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached
Flink
/usr/bin/flink-yarn-daemon
ラッパー スクリプトをデフォルト設定で実行します。. /usr/bin/flink-yarn-daemon
セッションにジョブを送信する: 次のコマンドを実行して、Flink ジョブをセッションに送信します。
flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
- FLINK_MASTER_URL: ジョブが実行される Flink マスター VM の URL(ホストとポートを含む)。URL から
http:// prefix
を削除します。この URL は、Flink セッションを開始したときにコマンド出力に表示されます。次のコマンドを実行して、この URL をTracking-URL
フィールドに一覧表示できます。
yarn application -list -appId=<yarn-app-id> | sed 's#http://##' ```
- FLINK_MASTER_URL: ジョブが実行される Flink マスター VM の URL(ホストとポートを含む)。URL から
セッション内のジョブを一覧表示する: セッション内の Flink ジョブを一覧表示するには、次のいずれかを行います。
引数なしで
flink list
を実行します。このコマンドは、/tmp/.yarn-properties-${USER}
でセッションの YARN アプリケーション ID を検索します。/tmp/.yarn-properties-${USER}
またはyarn application -list
の出力からセッションの YARN アプリケーション ID を取得し、<code>
flink list -yid YARN_APPLICATION_ID を実行します。flink list -m FLINK_MASTER_URL
を実行します。
セッションを停止する: セッションを停止するには、
/tmp/.yarn-properties-${USER}
またはyarn application -list
の出力からセッションの YARN アプリケーション ID を取得して、次のいずれかのコマンドを実行します。:echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID
Flink で Apache Beam ジョブを実行する
FlinkRunner
を使用して、Dataproc 上で Apache Beam ジョブを実行できます。
次の方法で、Flink で Beam ジョブを実行できます。
- Java Beam ジョブ
- ポータブル Beam ジョブ
Java Beam ジョブ
Beam ジョブを JAR ファイルにパッケージ化します。バンドルされた JAR ファイルに、ジョブの実行に必要な依存関係を指定します。
次の例では、Dataproc クラスタのマスターノードから Java Beam ジョブが実行されます。
Flink コンポーネントを有効にして Dataproc クラスタを作成します。
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink.--image-version
: クラスタにインストールされている Flink のバージョンを決定するクラスタのイメージ バージョン(たとえば、最新と以前の 4 つの 2.0.x イメージ リリース バージョンについては、Apache Flink コンポーネントのバージョンをご覧ください)。--region
: サポートされている Dataproc リージョン。--enable-component-gateway
: Flink ジョブ マネージャー UI へのアクセスを有効にします。--scopes
: クラスタによる Google Cloud APIs へのアクセスを有効にします(スコープのベスト プラクティスをご覧ください)。Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform
スコープがデフォルトで有効になります(このフラグ設定を含める必要はありません)。
SSH ユーティリティを使用して、Flink クラスタ マスターノードでターミナル ウィンドウを開きます。
Dataproc クラスタのマスターノードで Flink YARN セッションを開始します。
. /usr/bin/flink-yarn-daemon
Dataproc クラスタの Flink バージョンをメモしておきます。
flink --version
ローカルマシン上で、正規の Beam ワードカウントの例を Java に生成します。
Dataproc クラスタ上で、Flink バージョンと互換性のある Beam バージョンを選択します。Beam-Flink バージョンの互換性を一覧表している Flink バージョンの互換性の表をご覧ください。
生成された POM ファイルを開きます。タグ
<flink.artifact.name>
で指定された Beam Flink ランナーのバージョンを確認します。Flink アーティファクト名の Beam Flink ランナー バージョンが、クラスタの Flink バージョンと一致しない場合は、バージョン番号を更新して一致するようにします。mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
ワードカウントの例をパッケージ化します。
mvn package -Pflink-runner
パッケージ化された uber JAR ファイル
word-count-beam-bundled-0.1.jar
(~ 135 MB)を Dataproc クラスタのマスターノードにアップロードします。gcloud storage cp
を使用すると、Cloud Storage から Dataproc クラスタへのファイル転送を高速化できます。ローカル ターミナル上で、Cloud Storage バケットを作成し、uber JAR をアップロードします。
gcloud storage buckets create BUCKET_NAME
gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
Dataproc のマスターノード上で、uber JAR をダウンロードします。
gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
Dataproc クラスタのマスターノード上で Java Beam ジョブを実行します。
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
結果が Cloud Storage バケットに書き込まれていることを確認します。
gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
Flink YARN セッションを停止します。
yarn application -list
yarn application -kill YARN_APPLICATION_ID
ポータブル Beam ジョブ
Python、Go、その他のサポートされている言語で記述された Beam ジョブを実行するには、Beam の Flink Runnerで説明されているように、FlinkRunner
と PortableRunner
を使用します(ポータビリティ フレームワークのロードマップもご覧ください)。
次の例では、Dataproc クラスタのマスターノードから Python でポータブル Beam ジョブが実行されます。
Flink コンポーネントと Docker コンポーネントの両方を有効にして Dataproc クラスタを作成します。
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
注:
--optional-components
: Flink と Docker。--image-version
: クラスタにインストールされている Flink のバージョンを決定するクラスタのイメージ バージョン(たとえば、最新と以前の 4 つの 2.0.x イメージ リリース バージョンについては、Apache Flink コンポーネントのバージョンをご覧ください)。--region
: 使用可能な Dataproc リージョン。--enable-component-gateway
: Flink ジョブ マネージャー UI へのアクセスを有効にします。--scopes
: クラスタによる Google Cloud APIs へのアクセスを有効にします(スコープのベスト プラクティスをご覧ください)。Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform
スコープがデフォルトで有効になります(このフラグ設定を含める必要はありません)。
gcloud CLI をローカルで使用するか、Cloud Shell を使用して、Cloud Storage バケットを作成します。サンプルのワードカウント プログラムを実行するときに BUCKET_NAME を指定します。
gcloud storage buckets create BUCKET_NAME
クラスタ VM のターミナル ウィンドウで、Flink YARN セッションを開始します。Flink のマスター URL、ジョブが実行される Flink マスターのアドレスに注意してください。サンプルのワードカウント プログラムを実行するときに FLINK_MASTER_URL を指定します。
. /usr/bin/flink-yarn-daemon
Dataproc クラスタを実行している Flink のバージョンを表示してメモします。サンプルのワードカウント プログラムを実行するときに FLINK_VERSION を指定します。
flink --version
クラスタ マスターノードに、ジョブに必要な Python ライブラリをインストールします。
クラスタの Flink バージョンと互換性のある Beam バージョンをインストールします。
python -m pip install apache-beam[gcp]==BEAM_VERSION
クラスタ マスターノードでワードカウントの例を実行します。
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
注:
--runner
:FlinkRunner
--flink_version
: FLINK_VERSION、前述。--flink_master
: FLINK_MASTER_URL、前述。--flink_submit_uber_jar
: uber JAR を使用して Beam ジョブを実行します。--output
: BUCKET_NAME、以前に作成。
結果がバケットに書き込まれていることを確認します。
gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Flink YARN セッションを停止します。
- アプリケーション ID を取得します。
yarn application -list
1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.yarn application -kill
Kerberos クラスタで Flink を実行する
Dataproc Flink コンポーネントは、Kerberos クラスタをサポートしています。Flink ジョブを送信して保持したり、Flink クラスタを開始したりするには、有効な Kerberos チケットが必要です。デフォルトでは、Kerberos チケットは 7 日間有効です。
Flink ジョブ マネージャー UI にアクセスする
Flink ジョブ マネージャー ウェブ インターフェースは、Flink ジョブまたは Flink セッション クラスタの実行中に使用できます。ウェブ インターフェースを使用するには:
- Dataproc Flink クラスタを作成します。
- クラスタを作成後、Google Cloud コンソールの [クラスタの詳細] ページの [ウェブ インターフェース] タブでコンポーネント ゲートウェイの YARN ResourceManager リンクをクリックします。
- YARN リソース マネージャー UI 上で、Flink クラスタ アプリケーションのエントリを特定します。ジョブの完了ステータスに応じて、[ApplicationMaster] リンクか [履歴] リンクが一覧表示されます。
- 長時間実行ストリーミング ジョブの場合は、[ApplicationManager] リンクをクリックして Flink ダッシュボードを開きます。完了したジョブの場合は、[履歴] リンクをクリックしてジョブの詳細を表示します。