Dataproc クラスタを作成する際には、オプション コンポーネント機能を使用して、Flink などの追加コンポーネントを有効化できます。このページでは、Apache Flink のオプション コンポーネント(Flink クラスタ)を有効にして Dataproc クラスタを作成し、そのクラスタで Flink ジョブを実行する方法について説明します。
Flink クラスタを使用すると、次のことができます。
Google Cloud コンソール、Google Cloud CLI、または Dataproc API から Dataproc
Jobs
リソースを使用して Flink ジョブを実行する。Flink クラスタのマスターノードで実行されている
flink
CLI を使用して Flink ジョブを実行する。Kerberos クラスタで Flink を実行する
Dataproc Flink クラスタを作成する
Google Cloud コンソール、Google Cloud CLI、または Dataproc API を使用して、クラスタで Flink コンポーネントが有効になっている Dataproc クラスタを作成できます。
推奨: Flink コンポーネントで標準の 1 マスター VM クラスタを使用します。 Dataproc 高可用性モードクラスタ(3 つのマスター VM を含む)では、Flink 高可用性モードはサポートされていません。
コンソール
Google Cloud コンソールを使用して Dataproc Flink クラスタを作成するには、次の手順を行います。
Dataproc の [Compute Engine で Dataproc クラスタを作成する] ページを開きます。
- [クラスターを設定] パネルが選択されています。
- [バージョニング] セクションで、イメージのタイプとバージョンを確認または変更します。クラスタ イメージのバージョンによって、クラスタにインストールされている Flink コンポーネントのバージョンが決まります。
- クラスタで Flink コンポーネントを有効にするには、イメージ バージョンが 1.5 以降である必要があります(各 Dataproc イメージ リリースに含まれるコンポーネント バージョンの一覧を表示するには、サポートされている Dataproc バージョンをご覧ください)。
- Dataproc Jobs API を使用して Flink ジョブを実行するには、イメージのバージョンを [TBD] 以降にする必要があります(Dataproc Flink ジョブの実行をご覧ください)。
- [コンポーネント] セクションで次の設定を行います。
- [コンポーネント ゲートウェイ] で [コンポーネント ゲートウェイを有効にする] を選択します。コンポーネント ゲートウェイを有効にして、Flink 履歴サーバー UI に対するコンポーネント ゲートウェイ リンクを有効にする必要があります。コンポーネント ゲートウェイを有効にすると、Flink クラスタで実行されている Flink ジョブ マネージャー ウェブ インターフェースへのアクセスも有効になります。
- [オプション コンポーネント] で、クラスタで有効にする Flink やその他のオプション コンポーネントを選択します。
- [バージョニング] セクションで、イメージのタイプとバージョンを確認または変更します。クラスタ イメージのバージョンによって、クラスタにインストールされている Flink コンポーネントのバージョンが決まります。
[クラスタのカスタマイズ(省略可)] パネルをクリックします。
[クラスタ プロパティ] セクションで、クラスタに追加するオプションのクラスタ プロパティごとに [プロパティを追加] をクリックします。
flink
接頭辞付きのプロパティを追加して、クラスタで実行する Flink アプリケーションのデフォルトとして機能する/etc/flink/conf/flink-conf.yaml
に Flink プロパティを構成できます。例:
flink:historyserver.archive.fs.dir
を設定して、Flink ジョブ履歴ファイルを書き込む Cloud Storage のロケーションを指定します(この場所は、Flink クラスタで実行されている Flink 履歴サーバーによって使用されます)。flink:taskmanager.numberOfTaskSlots=n
で Flink タスクスロットを設定します。
[カスタム クラスタ メタデータ] セクションで [メタデータを追加] をクリックし、オプションのメタデータを追加します。たとえば、クラスタ マスターノードのバックグラウンドで Flink YARN デーモン(
/usr/bin/flink-yarn-daemon
)を実行するには、flink-start-yarn-session
true
を追加して Flink YARN セッションを開始します(Flink セッション モードをご覧ください)。
Dataproc イメージ バージョン 2.0 以前を使用している場合は、セキュリティを管理する(省略可)パネルをクリックし、プロジェクトへのアクセスで、
Enables the cloud-platform scope for this cluster
を選択します。Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform
スコープがデフォルトで有効になります。
- [クラスターを設定] パネルが選択されています。
[作成] をクリックしてクラスタを作成します。
gcloud
gcloud CLI を使用して Dataproc Flink クラスタを作成するには、ターミナル ウィンドウまたは Cloud Shell で次の gcloud dataproc のクラスタ作成 コマンドをローカルに実行します。
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=DATAPROC_IMAGE_VERSION \ --optional-components=FLINK \ --enable-component-gateway \ --properties=PROPERTIES ... other flags
注:
- CLUSTER_NAME: クラスタの名前を指定します。
- REGION: クラスタが配置される Compute Engine のリージョンを指定します。
DATAPROC_IMAGE_VERSION: 必要に応じて、クラスタで使用するイメージのバージョンを指定します。クラスタ イメージのバージョンによって、クラスタにインストールされている Flink コンポーネントのバージョンが決まります。
クラスタで Flink コンポーネントを有効にするには、イメージ バージョンが 1.5 以降である必要があります(各 Dataproc イメージ リリースに含まれるコンポーネント バージョンの一覧を表示するには、サポートされている Dataproc バージョンをご覧ください)。
Dataproc Jobs API を使用して Flink ジョブを実行するには、イメージのバージョンを [TBD] 以降にする必要があります(Dataproc Flink ジョブの実行をご覧ください)。
--optional-components
: クラスタで Flink ジョブと Flink HistoryServer ウェブサービスを実行するには、FLINK
コンポーネントを指定する必要があります。--enable-component-gateway
: Flink 履歴サーバー UI へのコンポーネント ゲートウェイ リンクを有効にするには、コンポーネント ゲートウェイを有効にする必要があります。コンポーネント ゲートウェイを有効にすると、Flink クラスタで実行されている Flink ジョブ マネージャー ウェブ インターフェースへのアクセスも有効になります。PROPERTIES。必要に応じて、1 つ以上のクラスタ プロパティを指定します。
イメージ バージョン
2.0.67
以降と2.1.15
以降を使用して Dataproc クラスタを作成する場合、--properties
フラグを使用して、クラスタで実行する Flink アプリケーションのデフォルトとして機能する/etc/flink/conf/flink-conf.yaml
に Flink プロパティを構成できます。flink:historyserver.archive.fs.dir
を設定すると、Flink ジョブ履歴ファイルを書き込む Cloud Storage のロケーションを指定できます(この場所は、Flink クラスタで実行されている Flink 履歴サーバーによって使用されます)。複数のプロパティの例:
--properties=flink:historyserver.archive.fs.dir=gs://my-bucket/my-flink-cluster/completed-jobs,flink:taskmanager.numberOfTaskSlots=2
その他のフラグ:
- クラスタ マスターノードのバックグラウンドで Flink YARN デーモン(
/usr/bin/flink-yarn-daemon
)を実行するには、オプションの--metadata flink-start-yarn-session=true
フラグ を追加して Flink YARN セッションを開始します(Flink セッション モードをご覧ください)。
- クラスタ マスターノードのバックグラウンドで Flink YARN デーモン(
2.0 以前のイメージ バージョンを使用する場合は、
--scopes=https://www.googleapis.com/auth/cloud-platform
フラグを追加して、クラスタによる Google Cloud API へのアクセスを有効にできます(スコープのベスト プラクティスを参照)。Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform
スコープがデフォルトで有効になります。
API
Dataproc API を使用して Dataproc Flink クラスタを作成するには、次のように clusters.create リクエストを送信します。
注:
SoftwareConfig.Component を
FLINK
に設定します。必要に応じて、
SoftwareConfig.imageVersion
を設定して、クラスタで使用するイメージのバージョンを指定できます。クラスタ イメージのバージョンによって、クラスタにインストールされている Flink コンポーネントのバージョンが決まります。クラスタで Flink コンポーネントを有効にするには、イメージ バージョンが 1.5 以降である必要があります(各 Dataproc イメージ リリースに含まれるコンポーネント バージョンの一覧を表示するには、サポートされている Dataproc バージョンをご覧ください)。
Dataproc Jobs API を使用して Flink ジョブを実行するには、イメージのバージョンを [TBD] 以降にする必要があります(Dataproc Flink ジョブの実行をご覧ください)。
EndpointConfig.enableHttpPortAccess を
true
に設定して、Flink 履歴サーバー UI へのコンポーネント ゲートウェイを有効にするリンクに設定します。コンポーネント ゲートウェイを有効にすると、Flink クラスタで実行されている Flink ジョブ マネージャー ウェブ インターフェースへのアクセスも有効になります。必要に応じて、
SoftwareConfig.properties
を設定し、1 つ以上のクラスタ プロパティを指定できます。- クラスタで実行する Flink アプリケーションのデフォルトとして機能する Flink プロパティを指定できます。たとえば、
flink:historyserver.archive.fs.dir
を設定すると、Flink ジョブ履歴ファイルを書き込む Cloud Storage のロケーションを指定できます(このロケーションは、Flink クラスタで実行されている Flink 履歴サーバーによって使用されます)。
- クラスタで実行する Flink アプリケーションのデフォルトとして機能する Flink プロパティを指定できます。たとえば、
必要に応じて、次の設定もできます。
GceClusterConfig.metadata
。たとえば、flink-start-yarn-session
true
を指定して、クラスタ マスターノードのバックグラウンドで Flink YARN デーモン(/usr/bin/flink-yarn-daemon
)を実行し、Flink YARN セッションを開始します(Flink セッション モードをご覧ください)。- 2.0 以前のイメージ バージョンを使ってクラスタによる Google Cloud API へのアクセスを有効にする場合は、GceClusterConfig.serviceAccountScopes を
https://www.googleapis.com/auth/cloud-platform
(cloud-platform
スコープ)に設定します(スコープのベスト プラクティスをご覧ください)。Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform
スコープがデフォルトで有効になります。
Flink クラスタの作成後
- コンポーネント ゲートウェイの
Flink History Server
リンクを使用して、Flink クラスタで実行されている Flink 履歴サーバーを表示します。 - コンポーネント ゲートウェイの
YARN ResourceManager link
を使用して、Flink クラスタで実行されている Flink ジョブ マネージャー ウェブ インターフェースを表示します。 - Dataproc 永続履歴サーバーを作成して、既存の Flink クラスタと削除された Flink クラスタによって書き込まれた Flink ジョブ履歴ファイルを表示します。
Dataproc Jobs
リソースを使用して Flink ジョブを実行する
Google Cloud コンソール、Google Cloud CLI、または Dataproc API から Dataproc の Jobs
リソースを使用して Flink ジョブを実行できます。
コンソール
コンソールから Flink ワードカウント ジョブのサンプルを送信するには:
ブラウザの Google Cloud コンソールで Dataproc の [ジョブを送信] ページを開きます。
[ジョブを送信] ページのフィールドに入力します。
- クラスタリストから [クラスタ] 名を選択します。
- [ジョブタイプ] を
Flink
に設定します。 - [メインクラスまたは JAR] を
org.apache.flink.examples.java.wordcount.WordCount
に設定します。 - [JAR ファイル] を
file:///usr/lib/flink/examples/batch/WordCount.jar
に設定します。file:///
は、クラスタにあるファイルを示します。Dataproc は Flink クラスタの作成時にWordCount.jar
をインストールしました。- このフィールドには、Cloud Storage パス(
gs://BUCKET/JARFILE
)または Hadoop 分散ファイル システム(HDFS)パス(hdfs://PATH_TO_JAR
)も指定できます。
[送信] をクリックします。
- ジョブドライバ出力が [ジョブの詳細] ページに表示されます。
- Google Cloud コンソールで Dataproc の [ジョブ] ページに Flink ジョブが表示されます。
- ジョブを停止または削除するには、[ジョブ] ページまたは [ジョブの詳細] ページで [停止] または [削除] をクリックします。
gcloud
Dataproc Flink クラスタに Flink ジョブを送信するには、ターミナル ウィンドウまたは Cloud Shell で gcloud CLI の gcloud dataproc ジョブの送信 コマンドをローカルに実行します。
gcloud dataproc jobs submit flink \ --cluster=CLUSTER_NAME \ --region=REGION \ --class=MAIN_CLASS \ --jar=JAR_FILE \ -- JOB_ARGS
注:
- CLUSTER_NAME: ジョブを送信する Dataproc Flink クラスタの名前を指定します。
- REGION: クラスタが配置される Compute Engine のリージョンを指定します。
- MAIN_CLASS: Flink アプリケーションの
main
クラスを指定します。次に例を示します。org.apache.flink.examples.java.wordcount.WordCount
- JAR_FILE: Flink アプリケーションの jar ファイルを指定します。以下を指定できます。
file:/// 接頭辞を使用して、クラスタにインストールされている jar ファイル:
file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
file:///usr/lib/flink/examples/batch/WordCount.jar
- Cloud Storage 内の jar ファイル:
gs://BUCKET/JARFILE
- HDFS 内の jar ファイル:
hdfs://PATH_TO_JAR
JOB_ARGS: 必要に応じて、ダブルダッシュ(
--
)の後にジョブ引数を追加します。ジョブを送信すると、ジョブドライバ出力がローカルターミナルまたは Cloud Shell ターミナルに表示されます。
Program execution finished Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished. Job Runtime: 13610 ms ... (after,1) (and,12) (arrows,1) (ay,1) (be,4) (bourn,1) (cast,1) (coil,1) (come,1)
REST
このセクションでは、Dataproc jobs.submit API を使用して Dataproc Flink クラスタに Flink ジョブを送信する方法を説明します。
リクエストのデータを使用する前に、次のように置き換えます。
- PROJECT_ID: Google Cloud プロジェクト ID
- REGION: クラスタ リージョン
- CLUSTER_NAME: ジョブを送信する Dataproc Flink クラスタの名前を指定します。
HTTP メソッドと URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit
リクエストの本文(JSON):
{ "job": { "placement": { "clusterName": "CLUSTER_NAME" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] } } }
リクエストを送信するには、次のいずれかのオプションを展開します。
次のような JSON レスポンスが返されます。
{ "reference": { "projectId": "PROJECT_ID", "jobId": "JOB_ID" }, "placement": { "clusterName": "CLUSTER_NAME", "clusterUuid": "CLUSTER_UUID" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "args": [ "1000" ], "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] }, "status": { "state": "PENDING", "stateStartTime": "2020-10-07T20:16:21.759Z" }, "jobUuid": "JOB_UUID" }
- Google Cloud コンソールで Dataproc の [ジョブ] ページに Flink ジョブが表示されます。
- Google Cloud コンソールの [ジョブ] ページまたは [ジョブの詳細] ページで [停止] または [削除] をクリックすると、ジョブの停止または削除ができます。
flink
CLI を使用して Flink ジョブを実行する
Dataproc Jobs
リソースを使用して Flink ジョブを実行する代わりに、flink
CLI を使用して Flink クラスタのマスターノードで Flink ジョブを実行できます。
以降のセクションでは、Dataproc Flink クラスタで flink
CLI ジョブを実行するさまざまな方法について説明します。
マスターノードに SSH で接続: SSH ユーティリティを使用して、クラスタのマスター VM でターミナル ウィンドウを開きます。
クラスパスを設定する: Flink クラスタ マスター VM の SSH ターミナル ウィンドウから Hadoop クラスパスを初期化します。
export HADOOP_CLASSPATH=$(hadoop classpath)
Flink ジョブを実行する: Flink ジョブをさまざまな YARN のデプロイモードで実行できます。アプリケーション、ジョブごと、セッション モードで行います。
アプリケーション モード: Flink アプリケーション モードは、Dataproc イメージ バージョン 2.0 以降でサポートされています。このモードでは、YARN Job Manager でジョブの
main()
メソッドが実行されます。クラスタは、ジョブが終了するとシャットダウンします。ジョブ送信の例:
flink run-application \ -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -Djobmanager.heap.mb=820 \ -Dtaskmanager.heap.mb=1640 \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=4 \ /usr/lib/flink/examples/batch/WordCount.jar
実行中のジョブを一覧表示します。
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
実行中のジョブをキャンセルします。
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
ジョブごとモード: この Flink モードでは、クライアント側でジョブの
main()
メソッドを実行します。ジョブ送信の例:
flink run \ -m yarn-cluster \ -p 4 \ -ys 2 \ -yjm 1024m \ -ytm 2048m \ /usr/lib/flink/examples/batch/WordCount.jar
セッション モード: 長時間実行される Flink YARN セッションを開始してから、1 つ以上のジョブをセッションに送信します。
セッションの開始: 次のいずれかの方法で Flink セッションを開始できます。
Flink クラスタを作成し、
gcloud dataproc clusters create
コマンドに--metadata flink-start-yarn-session=true
フラグを追加します(Dataproc Flink クラスタを作成するをご覧ください)。このフラグを有効にすると、クラスタの作成後に Dataproc が/usr/bin/flink-yarn-daemon
を実行し、クラスタで Flink セッションを開始します。セッションの YARN アプリケーション ID は
/tmp/.yarn-properties-${USER}
に保存されます。ID を一覧表示するには、yarn application -list
コマンドを使用します。カスタム設定を使用して、クラスタ マスター VM にプリインストールされている Flink
yarn-session.sh
スクリプトを実行します。カスタム設定の例:
/usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached
デフォルト設定で
/usr/bin/flink-yarn-daemon
ラッパー スクリプトを実行します。. /usr/bin/flink-yarn-daemon
セッションにジョブを送信する: 次のコマンドを実行して、セッションに Flink ジョブを送信します。
flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
- FLINK_MASTER_URL: ジョブが実行される Flink マスター VM の URL(ホストとポートを含む)。URL から
http:// prefix
を削除します。この URL は、Flink セッションを開始するときにコマンド出力に表示されます。次のコマンドを実行して、この URL をTracking-URL
フィールドに一覧表示します。
yarn application -list -appId=<yarn-app-id> | sed 's#http://##' ```
- FLINK_MASTER_URL: ジョブが実行される Flink マスター VM の URL(ホストとポートを含む)。URL から
セッション内のジョブを一覧表示する: セッション内の Flink ジョブを一覧表示するには、次のいずれかを行います。
引数なしで
flink list
を実行します。このコマンドは、/tmp/.yarn-properties-${USER}
でセッションの YARN アプリケーション ID を検索します。/tmp/.yarn-properties-${USER}
またはyarn application -list
の出力からセッションの YARN アプリケーション ID を取得し、<code>
flink list -yid YARN_APPLICATION_ID を実行します。flink list -m FLINK_MASTER_URL
を実行します。
セッションを停止する: セッションを停止するには、
/tmp/.yarn-properties-${USER}
またはyarn application -list
の出力からセッションの YARN アプリケーション ID を取得して、次のいずれかのコマンドを実行します。:echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID
Flink で Apache Beam ジョブを実行する
FlinkRunner
を使用して、Dataproc 上で Apache Beam ジョブを実行できます。
次の方法で、Flink で Beam ジョブを実行できます。
- Java Beam ジョブ
- ポータブル Beam ジョブ
Java Beam ジョブ
Beam ジョブを JAR ファイルにパッケージ化します。バンドルされた JAR ファイルに、ジョブの実行に必要な依存関係を指定します。
次の例では、Dataproc クラスタのマスターノードから Java Beam ジョブが実行されます。
Flink コンポーネントを有効にして Dataproc クラスタを作成します。
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink.--image-version
: クラスタにインストールされている Flink のバージョンを決定するクラスタのイメージ バージョン(たとえば、最新と以前の 4 つの 2.0.x イメージ リリース バージョンについては、Apache Flink コンポーネントのバージョンをご覧ください)。--region
: サポートされている Dataproc リージョン。--enable-component-gateway
: Flink ジョブ マネージャー UI へのアクセスを有効にします。--scopes
: クラスタによる Google Cloud API へのアクセスを有効にします(スコープのベスト プラクティスを参照)。 Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform
スコープがデフォルトで有効になります(このフラグの設定を指定する必要はありません)。
SSH ユーティリティを使用して、Flink クラスタのマスターノードでターミナル ウィンドウを開きます。
Dataproc クラスタのマスターノード上で Flink YARN セッションを開始します。
. /usr/bin/flink-yarn-daemon
Dataproc クラスタの Flink バージョンをメモしておきます。
flink --version
ローカルマシン上で、正規の Beam ワードカウントの例を Java に生成します。
Dataproc クラスタ上で、Flink バージョンと互換性のある Beam バージョンを選択します。Beam-Flink バージョンの互換性を一覧表している Flink バージョンの互換性の表をご覧ください。
生成された POM ファイルを開きます。タグ
<flink.artifact.name>
で指定された Beam Flink ランナーのバージョンを確認します。Flink アーティファクト名の Beam Flink ランナー バージョンが、クラスタの Flink バージョンと一致しない場合は、バージョン番号を更新して一致するようにします。mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
ワードカウントの例をパッケージ化します。
mvn package -Pflink-runner
パッケージ化された uber JAR ファイル
word-count-beam-bundled-0.1.jar
(~ 135 MB)を Dataproc クラスタのマスターノードにアップロードします。gsutil cp
を使用すると、Cloud Storage から Dataproc クラスタへのファイル転送を高速化できます。ローカル ターミナル上で、Cloud Storage バケットを作成し、uber JAR をアップロードします。
gsutil mb BUCKET_NAME
gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
Dataproc のマスターノード上で、uber JAR をダウンロードします。
gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
Dataproc クラスタのマスターノード上で Java Beam ジョブを実行します。
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
結果が Cloud Storage バケットに書き込まれていることを確認します。
gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
Flink YARN セッションを停止します。
yarn application -list
yarn application -kill YARN_APPLICATION_ID
ポータブル Beam ジョブ
Python、Go、その他のサポートされている言語で記述された Beam ジョブを実行するには、Beam の Flink Runnerで説明されているように、FlinkRunner
と PortableRunner
を使用します(ポータビリティ フレームワークのロードマップもご覧ください)。
次の例では、Dataproc クラスタのマスターノードから Python でポータブル Beam ジョブが実行されます。
Flink コンポーネントと Docker コンポーネントの両方を有効にして Dataproc クラスタを作成します。
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
注:
--optional-components
: Flink と Docker。--image-version
: クラスタにインストールされている Flink のバージョンを決定するクラスタのイメージ バージョン(たとえば、最新と以前の 4 つの 2.0.x イメージ リリース バージョンについては、Apache Flink コンポーネントのバージョンをご覧ください)。--region
: 使用可能な Dataproc リージョン。--enable-component-gateway
: Flink ジョブ マネージャー UI へのアクセスを有効にします。--scopes
: クラスタによる Google Cloud API へのアクセスを有効にします(スコープのベスト プラクティスを参照)。 Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform
スコープがデフォルトで有効になります(このフラグの設定を指定する必要はありません)。
gsutil
をローカルまたは Cloud Shell で使用して、Cloud Storage バケットを作成します。サンプルのワードカウント プログラムの実行時に、BUCKET_NAME を指定します。gsutil mb BUCKET_NAME
クラスタ VM のターミナル ウィンドウで、Flink YARN セッションを開始します。Flink のマスター URL、ジョブが実行される Flink マスターのアドレスに注意してください。サンプルのワードカウント プログラムを実行するときに、FLINK_MASTER_URL を指定します。
. /usr/bin/flink-yarn-daemon
Dataproc クラスタを実行している Flink のバージョンを表示してメモします。サンプルのワードカウント プログラムを実行するときに、FLINK_VERSION を指定します。
flink --version
ジョブに必要な Python ライブラリをクラスタ マスターノードにインストールします。
クラスタに Flink バージョンと互換性のある Beam バージョンをインストールします。
python -m pip install apache-beam[gcp]==BEAM_VERSION
クラスタ マスターノードでワードカウントの例を実行します。
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
注:
--runner
:FlinkRunner
--flink_version
: FLINK_VERSION、前述。--flink_master
: FLINK_MASTER_URL、前述。--flink_submit_uber_jar
: uber JAR を使用して Beam ジョブを実行します。--output
: 先ほど作成した BUCKET_NAME。
結果がバケットに書き込まれていることを確認します。
gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Flink YARN セッションを停止します。
- アプリケーション ID を取得します。
yarn application -list
1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.yarn application -kill
Kerberos クラスタで Flink を実行する
Dataproc Flink コンポーネントは、Kerberos クラスタをサポートしています。Flink ジョブを送信して保持したり、Flink クラスタを開始したりするには、有効な Kerberos チケットが必要です。デフォルトでは、Kerberos チケットは 7 日間有効です。
Flink ジョブ マネージャー UI にアクセスする
Flink ジョブ マネージャー ウェブ インターフェースは、Flink ジョブまたは Flink セッション クラスタの実行中に使用できます。ウェブ インターフェースを使用するには:
- Dataproc Flink クラスタを作成します。
- クラスタを作成後、Google Cloud コンソールの [クラスタの詳細] ページの [ウェブ インターフェース] タブでコンポーネント ゲートウェイの YARN ResourceManager リンクをクリックします。
- YARN リソース マネージャー UI 上で、Flink クラスタ アプリケーションのエントリを特定します。ジョブの完了ステータスに応じて、[ApplicationMaster] リンクか [履歴] リンクが一覧表示されます。
- 長時間実行ストリーミング ジョブの場合は、[ApplicationManager] リンクをクリックして Flink ダッシュボードを開きます。完了したジョブの場合は、[履歴] リンクをクリックしてジョブの詳細を表示します。