Dataproc クラスタを作成する際には、オプション コンポーネント機能を使用して、Flink などの追加コンポーネントをインストールできます。このページでは Flink コンポーネントについて説明します。
Dataproc Flink コンポーネントは、Apache Flink を Dataproc クラスタにインストールします。
コンポーネントをインストールする
Dataproc クラスタの作成時にコンポーネントをインストールします。 Dataproc Flink コンポーネントは、Dataproc のイメージ 1.5 以降で作成されたクラスタにインストールできます。
クラスタ イメージのバージョンによって、クラスタにインストールされている Flink コンポーネントのバージョンが決まります。Dataproc イメージの各リリースに含まれるコンポーネント バージョンのリストについては、サポートされている Dataproc バージョンをご覧ください。
推奨: Flink コンポーネントで標準の 1 マスター VM クラスタを使用します。Dataproc 高可用性モードクラスタ(3 つのマスター VM を含む)は、Flink 高可用性モードをサポートしていません。
gcloud コマンド
Flink コンポーネントを含む Dataproc クラスタを作成するには、--optional-components
フラグを指定した gcloud dataproc clusters create cluster-name コマンドを使用します。
gcloud dataproc clusters create cluster-name \ --optional-components=FLINK \ --region=region \ --enable-component-gateway \ --image-version=DATAPROC_IMAGE_VERSION \ --scopes=https://www.googleapis.com/auth/cloud-platform \ ... other flagsメモ:
--enable-component-gateway
フラグは、Flink ジョブ マネージャー UI へのアクセスを有効にします。--scopes=https://www.googleapis.com/auth/cloud-platform
フラグを指定すると、プロジェクト内の Cloud Platform サービスへの API アクセスが可能になります。- 必要に応じて
--metadata flink-start-yarn-session=true
を追加して、クラスタのバックグラウンドで Flink YARN デーモン(/usr/bin/flink-yarn-daemon
)を実行し、Flink YARN セッションを開始することもできます(Flink セッション モードをご覧ください)。gcloud dataproc clusters create cluster-name \ --optional-components=FLINK \ --region=region \ --enable-component-gateway \ --image-version=DATAPROC_IMAGE_VERSION \ --scopes=https://www.googleapis.com/auth/cloud-platform \ --metadata flink-start-yarn-session=true \ ... other flags
REST API
Dataproc API を使用して Flink コンポーネントを指定するには、clusters.create リクエストの一部として SoftwareConfig.Component を使用します。
Flink ジョブ マネージャー UI への接続を有効にするには、EndpointConfig.enableHttpPortAccess を true
に設定します。
Console
- コンポーネントとコンポーネント ゲートウェイを有効にします。
- Google Cloud コンソールで、Dataproc の [Compute Engine に Dataproc クラスタを作成する] ページを開きます。[クラスターを設定] パネルが選択されています。
- [バージョニング] セクションで、イメージのタイプとバージョンを確認または変更します。
- [コンポーネント] セクションで次の設定を行います。
- [コンポーネント ゲートウェイ] で [コンポーネント ゲートウェイを有効にする] を選択します。
- [オプション コンポーネント] で、クラスタにインストールする Flink やその他のオプション コンポーネントを選択します。
Flink を構成する
Flink のプロパティを設定するには:
初期化アクションを使用して、
/etc/flink/conf/flink-conf.yaml
のデフォルトの Flink プロパティを更新します。またはFlink ジョブの送信時や Flink セッションの開始時に、コマンドライン フラグを使って Flink プロパティを指定します。
クラスパスを設定する
Flink クラスタ マスター VM の SSH ターミナル ウィンドウから Hadoop クラスパスを初期化します。
export HADOOP_CLASSPATH=$(hadoop classpath)
Flink ジョブを実行する
Flink は、さまざまな YARN のデプロイモード(アプリケーション モード、ジョブ単位モード、セッション モード)で実行できます。
アプリケーション モード
Flink アプリケーション モードは Dataproc イメージ バージョン 2.0 以降でサポートされています。このモードでは、YARN のジョブ マネージャーでジョブの main()
メソッドが実行されます。ジョブが終了すると、クラスタはシャットダウンします。
ジョブ送信の例:
flink run-application \
-t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=2048m \
-Djobmanager.heap.mb=820 \
-Dtaskmanager.heap.mb=1640 \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dparallelism.default=4 \
/usr/lib/flink/examples/batch/WordCount.jar
実行中のジョブを一覧表示します。
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
実行中のジョブをキャンセルします。
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
ジョブ単位モード
この Flink モードでは、クライアント側でジョブの main()
メソッドを実行します。
ジョブ送信の例:
flink run \
-m yarn-cluster \
-p 4 \
-ys 2 \
-yjm 1024m \
-ytm 2048m \
/usr/lib/flink/examples/batch/WordCount.jar
セッション モード
長時間実行される Flink YARN セッションを開始してから、1 つ以上のジョブをセッションに送信します。
Flink セッションは次のいずれかの方法で開始できます。
Flink クラスタを作成したら、クラスタ マスター VM にプリインストールされている Flink
yarn-session.sh
スクリプトをカスタム設定で実行します。カスタム設定の例:
/usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached ```
Flink クラスタが作成されたら、Flink
/usr/bin/flink-yarn-daemon
ラッパー スクリプトをデフォルト設定で追加します。. /usr/bin/flink-yarn-daemon
gcloud dataproc clusters create
コマンドに--metadata flink-start-yarn-session=true
フラグを追加して、Flink クラスタを作成します。このフラグを有効にすると、クラスタが作成された後、Dataproc が/usr/bin/flink-yarn-daemon
を実行して、クラスタで Flink セッションを開始します。
セッションの YARN アプリケーション ID は /tmp/.yarn-properties-${USER}
に保存されます。ID は yarn application -list
コマンドで一覧表示できます。
セッションにジョブを送信する
Flink セッションを開始すると、コマンド出力に、ジョブが実行される Flink マスター VM の URL(ホストとポートを含む)が一覧表示されます。
Flink マスター URL を表示する別の方法: 次のコマンド出力は、Tracking-URL
フィールドに Flink マスター URL を一覧表示します。
yarn application -list -appId=<yarn-app-id> | sed 's#http://##'`
次のコマンドを実行して、Flink ジョブをセッションに送信します。http:// prefix
を削除した後に、FLINK_MASTER_URL を Flink マスター URL に置き換えます。
flink run -m FLINK_MASTER_URL /usr/lib/flink/examples/batch/WordCount.jar
セッション内のジョブを一覧表示する
セッション内の Flink ジョブを一覧表示するには、次のいずれかを行います。
引数を指定せずに
flink list
を実行します。このコマンドは、/tmp/.yarn-properties-${USER}
でセッションの YARN アプリケーション ID を検索します。flink list -yid YARN_APPLICATION_ID
を実行します。/tmp/.yarn-properties-${USER}
またはyarn application -list
を実行して YARN アプリケーション ID を取得します。flink list -m FLINK_MASTER_URL
を実行します。
セッションを停止する
セッションを停止するには、/tmp/.yarn-properties-${USER}
または yarn application -list
の出力から、セッションの YARN アプリケーション ID を取得し、次のコマンドのいずれかを実行します。
echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID
Apache Beam ジョブを実行する
FlinkRunner
を使用して、Dataproc 上で Apache Beam ジョブを実行できます。
次の方法で、Flink で Beam ジョブを実行できます。
- Java Beam ジョブ
- ポータブル Beam ジョブ
Java Beam ジョブ
Beam ジョブを JAR ファイルにパッケージ化します。バンドルされた JAR ファイルに、ジョブの実行に必要な依存関係を指定します。
次の例では、Dataproc クラスタのマスターノードから Java Beam ジョブが実行されます。
Flink コンポーネントを有効にして Dataproc クラスタを作成します。
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink.--image-version
: クラスタにインストールされている Flink のバージョンを決定するクラスタのイメージ バージョン(たとえば、最新と以前の 4 つの 2.0.x イメージ リリース バージョンについては、Apache Flink コンポーネントのバージョンをご覧ください)。--region
: サポートされている Dataproc リージョン。--enable-component-gateway
: Flink ジョブ マネージャー UI へのアクセスを有効にします。--scopes
: プロジェクト内の Cloud Platform サービスへの API アクセスを有効にします。
SSH ユーティリティを使用して、FLink クラスタのマスターノードでターミナル ウィンドウを開きます。
Dataproc クラスタのマスターノードで Flink YARN セッションを開始します。
. /usr/bin/flink-yarn-daemon
Dataproc クラスタの Flink バージョンをメモしておきます。
flink --version
ローカルマシン上で、正規の Beam ワードカウントの例を Java に生成します。
Dataproc クラスタ上で、Flink バージョンと互換性のある Beam バージョンを選択します。Beam-Flink バージョンの互換性を一覧表している Flink バージョンの互換性の表をご覧ください。
生成された POM ファイルを開きます。タグ
<flink.artifact.name>
で指定された Beam Flink ランナーのバージョンを確認します。Flink アーティファクト名の Beam Flink ランナー バージョンが、クラスタの Flink バージョンと一致しない場合は、バージョン番号を更新して一致するようにします。mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
ワードカウントの例をパッケージ化します。
mvn package -Pflink-runner
パッケージ化された uber JAR ファイル
word-count-beam-bundled-0.1.jar
(~ 135 MB)を Dataproc クラスタのマスターノードにアップロードします。gsutil cp
を使用すると、Cloud Storage から Dataproc クラスタへのファイル転送を高速化できます。ローカル ターミナル上で、Cloud Storage バケットを作成し、uber JAR をアップロードします。
gsutil mb BUCKET_NAME
gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
Dataproc のマスターノード上で、uber JAR をダウンロードします。
gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
Dataproc クラスタのマスターノード上で Java Beam ジョブを実行します。
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
結果が Cloud Storage バケットに書き込まれていることを確認します。
gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
Flink YARN セッションを停止します。
yarn application -list
yarn application -kill YARN_APPLICATION_ID
ポータブル Beam ジョブ
Python、Go、その他のサポートされている言語で記述された Beam ジョブを実行するには、Beam の Flink Runnerで説明されているように、FlinkRunner
と PortableRunner
を使用します(ポータビリティ フレームワークのロードマップもご覧ください)。
次の例では、Dataproc クラスタのマスターノードから Python でポータブル Beam ジョブが実行されます。
Flink コンポーネントと Docker コンポーネントの両方を有効にして Dataproc クラスタを作成します。
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
注:
--optional-components
: Flink と Docker。--image-version
: クラスタにインストールされている Flink のバージョンを決定するクラスタのイメージ バージョン(たとえば、最新と以前の 4 つの 2.0.x イメージ リリース バージョンについては、Apache Flink コンポーネントのバージョンをご覧ください)。--region
: 使用可能な Dataproc リージョン。--enable-component-gateway
: Flink ジョブ マネージャー UI へのアクセスを有効にします。--scopes
: プロジェクト内の Cloud Platform サービスへの API アクセスを有効にします。
ローカルまたは Cloud Shell で
gsutil
を使用して Cloud Storage バケットを作成します。サンプルのワードカウント プログラムを実行するときに、BUCKET_NAME を指定します。gsutil mb BUCKET_NAME
クラスタ VM のターミナル ウィンドウで、Flink YARN セッションを開始します。Flink のマスター URL、ジョブが実行される Flink マスターのアドレスに注意してください。サンプルのワードカウント プログラムを実行するときに、FLINK_MASTER_URL を指定します。
. /usr/bin/flink-yarn-daemon
Dataproc クラスタを実行している Flink のバージョンを表示してメモします。サンプルのワードカウント プログラムを実行するときに、FLINK_VERSION を指定します。
flink --version
ジョブに必要な Python ライブラリをクラスタのマスターノードにインストールします。
クラスタの Flink バージョンと互換性のある Beam バージョンをインストールします。
python -m pip install apache-beam[gcp]==BEAM_VERSION
クラスタのマスターノードでワードカウントの例を実行します。
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
注:
--runner
:FlinkRunner
--flink_version
: FLINK_VERSION、前述。--flink_master
: FLINK_MASTER_URL、前述。--flink_submit_uber_jar
: uber JAR を使用して Beam ジョブを実行します。--output
: 先ほど作成した BUCKET_NAME。
結果がバケットに書き込まれていることを確認します。
gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Flink YARN セッションを停止します。
- アプリケーション ID を取得します。
yarn application -list
1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.yarn application -kill
Kerberos クラスタで Flink を実行する
Dataproc Flink コンポーネントは、Kerberos クラスタをサポートしています。Flink ジョブを送信して保持したり、Flink クラスタを開始したりするには、有効な Kerberos チケットが必要です。デフォルトでは、Kerberos チケットは 7 日間有効です。
Flink ジョブ マネージャー UI にアクセスする
Flink ジョブ マネージャー ウェブ インターフェースは、Flink ジョブまたは Flink セッション クラスタの実行中に使用できます。ウェブ インターフェースを使用するには:
- コンポーネント ゲートウェイを有効にして Dataproc クラスタを作成する。
- クラスタを作成後、Google Cloud コンソールの [クラスタの詳細] ページの [ウェブ インターフェース] タブでコンポーネント ゲートウェイの YARN ResourceManager リンクをクリックします。
- YARN リソース マネージャー UI 上で、Flink クラスタ アプリケーションのエントリを特定します。ジョブの完了ステータスに応じて、[ApplicationMaster] リンクか [履歴] リンクが一覧表示されます。
- 長時間実行ストリーミング ジョブの場合は、[ApplicationManager] リンクをクリックして Flink ダッシュボードを開きます。完了したジョブの場合は、[履歴] リンクをクリックしてジョブの詳細を表示します。