Dataproc の Flink オプション コンポーネント

コレクションでコンテンツを整理 必要に応じて、コンテンツの保存と分類を行います。

Dataproc クラスタを作成する際には、オプション コンポーネント機能を使用して、Flink などの追加コンポーネントをインストールできます。このページでは Flink コンポーネントについて説明します。

Dataproc Flink コンポーネントは、Apache Flink を Dataproc クラスタにインストールします。

コンポーネントをインストールする

Dataproc クラスタの作成時にコンポーネントをインストールします。 Dataproc Flink コンポーネントは、Dataproc のイメージ 1.5 以降で作成されたクラスタにインストールできます。

クラスタ イメージのバージョンによって、クラスタにインストールされている Flink コンポーネントのバージョンが決まります。Dataproc イメージの各リリースに含まれるコンポーネント バージョンのリストについては、サポートされている Dataproc バージョンをご覧ください。

推奨: Flink コンポーネントで標準の 1 マスター VM クラスタを使用します。Dataproc 高可用性モードクラスタ(3 つのマスター VM を含む)は、Flink 高可用性モードをサポートしていません。

gcloud コマンド

Flink コンポーネントを含む Dataproc クラスタを作成するには、--optional-componentsフラグを指定した gcloud dataproc clusters create cluster-name コマンドを使用します。

gcloud dataproc clusters create cluster-name \
    --optional-components=FLINK \
    --region=region \
    --enable-component-gateway \
    --image-version=DATAPROC_IMAGE_VERSION \
    --scopes=https://www.googleapis.com/auth/cloud-platform  \
    ... other flags
メモ:
  • --enable-component-gateway フラグは、Flink ジョブ マネージャー UI へのアクセスを有効にします。
  • --scopes=https://www.googleapis.com/auth/cloud-platform フラグを指定すると、プロジェクト内の Cloud Platform サービスへの API アクセスが可能になります。
  • 必要に応じて --metadata flink-start-yarn-session=true を追加して、クラスタのバックグラウンドで Flink YARN デーモン(/usr/bin/flink-yarn-daemon)を実行し、Flink YARN セッションを開始することもできます(Flink セッション モードをご覧ください)。
    gcloud dataproc clusters create cluster-name \
        --optional-components=FLINK \
        --region=region \
        --enable-component-gateway \
        --image-version=DATAPROC_IMAGE_VERSION \
        --scopes=https://www.googleapis.com/auth/cloud-platform  \
        --metadata flink-start-yarn-session=true \
        ... other flags
    

REST API

Dataproc API を使用して Flink コンポーネントを指定するには、clusters.create リクエストの一部として SoftwareConfig.Component を使用します。

Flink ジョブ マネージャー UI への接続を有効にするには、EndpointConfig.enableHttpPortAccesstrue に設定します。

Console

  1. コンポーネントとコンポーネント ゲートウェイを有効にします。
    • Google Cloud コンソールで、Dataproc の [Compute Engine に Dataproc クラスタを作成する] ページを開きます。[クラスターを設定] パネルが選択されています。
    • [バージョニング] セクションで、イメージのタイプとバージョンを確認または変更します。
    • [コンポーネント] セクションで次の設定を行います。
      • [コンポーネント ゲートウェイ] で [コンポーネント ゲートウェイを有効にする] を選択します。
      • [オプション コンポーネント] で、クラスタにインストールする Flink やその他のオプション コンポーネントを選択します。

Flink のプロパティを設定するには:

  1. 初期化アクションを使用して、/etc/flink/conf/flink-conf.yaml のデフォルトの Flink プロパティを更新します。または

  2. Flink ジョブの送信時や Flink セッションの開始時に、コマンドライン フラグを使って Flink プロパティを指定します。

クラスパスを設定する

  1. Flink クラスタ マスター VM の SSH ターミナル ウィンドウから Hadoop クラスパスを初期化します。

    export HADOOP_CLASSPATH=$(hadoop classpath)
    

Flink は、さまざまな YARN のデプロイモード(アプリケーション モード、ジョブ単位モード、セッション モード)で実行できます。

アプリケーション モード

Flink アプリケーション モードは Dataproc イメージ バージョン 2.0 以降でサポートされています。このモードでは、YARN のジョブ マネージャーでジョブの main() メソッドが実行されます。ジョブが終了すると、クラスタはシャットダウンします。

ジョブ送信の例:

flink run-application \
    -t yarn-application \
   -Djobmanager.memory.process.size=1024m \
   -Dtaskmanager.memory.process.size=2048m \
   -Djobmanager.heap.mb=820 \
   -Dtaskmanager.heap.mb=1640 \
   -Dtaskmanager.numberOfTaskSlots=2 \
   -Dparallelism.default=4 \
   /usr/lib/flink/examples/batch/WordCount.jar

実行中のジョブを一覧表示します。

./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY

実行中のジョブをキャンセルします。

./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>

ジョブ単位モード

この Flink モードでは、クライアント側でジョブの main() メソッドを実行します。

ジョブ送信の例:

flink run \
    -m yarn-cluster \
    -p 4 \
    -ys 2 \
    -yjm 1024m \
    -ytm 2048m \
    /usr/lib/flink/examples/batch/WordCount.jar

セッション モード

長時間実行される Flink YARN セッションを開始してから、1 つ以上のジョブをセッションに送信します。

Flink セッションは次のいずれかの方法で開始できます。

  1. Flink クラスタを作成したら、クラスタ マスター VM にプリインストールされている Flink yarn-session.sh スクリプトをカスタム設定で実行します。

    カスタム設定の例:

    /usr/lib/flink/bin/yarn-session.sh \
      -s 1 \
      -jm 1024m \
      -tm 2048m \
      -nm flink-dataproc \
      --detached
     ```
    
  2. Flink クラスタが作成されたら、Flink /usr/bin/flink-yarn-daemon ラッパー スクリプトをデフォルト設定で追加します。

    . /usr/bin/flink-yarn-daemon
    
  3. gcloud dataproc clusters create コマンドに --metadata flink-start-yarn-session=true フラグを追加して、Flink クラスタを作成します。このフラグを有効にすると、クラスタが作成された後、Dataproc が /usr/bin/flink-yarn-daemon を実行して、クラスタで Flink セッションを開始します。

セッションの YARN アプリケーション ID は /tmp/.yarn-properties-${USER} に保存されます。ID は yarn application -list コマンドで一覧表示できます。

セッションにジョブを送信する

Flink セッションを開始すると、コマンド出力に、ジョブが実行される Flink マスター VM の URL(ホストとポートを含む)が一覧表示されます。

Flink マスター URL を表示する別の方法: 次のコマンド出力は、Tracking-URL フィールドに Flink マスター URL を一覧表示します。

yarn application -list -appId=<yarn-app-id> | sed 's#http://##'`

次のコマンドを実行して、Flink ジョブをセッションに送信します。http:// prefix を削除した後に、FLINK_MASTER_URL を Flink マスター URL に置き換えます。

flink run -m FLINK_MASTER_URL /usr/lib/flink/examples/batch/WordCount.jar

セッション内のジョブを一覧表示する

セッション内の Flink ジョブを一覧表示するには、次のいずれかを行います。

  • 引数を指定せずに flink list を実行します。このコマンドは、/tmp/.yarn-properties-${USER} でセッションの YARN アプリケーション ID を検索します。

  • flink list -yid YARN_APPLICATION_ID を実行します。/tmp/.yarn-properties-${USER} または yarn application -list を実行して YARN アプリケーション ID を取得します。

  • flink list -m FLINK_MASTER_URL を実行します。

セッションを停止する

セッションを停止するには、/tmp/.yarn-properties-${USER} または yarn application -list の出力から、セッションの YARN アプリケーション ID を取得し、次のコマンドのいずれかを実行します。

echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID

Apache Beam ジョブを実行する

FlinkRunner を使用して、Dataproc 上で Apache Beam ジョブを実行できます。

次の方法で、Flink で Beam ジョブを実行できます。

  1. Java Beam ジョブ
  2. ポータブル Beam ジョブ

Java Beam ジョブ

Beam ジョブを JAR ファイルにパッケージ化します。バンドルされた JAR ファイルに、ジョブの実行に必要な依存関係を指定します。

次の例では、Dataproc クラスタのマスターノードから Java Beam ジョブが実行されます。

  1. Flink コンポーネントを有効にして Dataproc クラスタを作成します。

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: クラスタにインストールされている Flink のバージョンを決定するクラスタのイメージ バージョン(たとえば、最新と以前の 4 つの 2.0.x イメージ リリース バージョンについては、Apache Flink コンポーネントのバージョンをご覧ください)。
    • --region: サポートされている Dataproc リージョン
    • --enable-component-gateway: Flink ジョブ マネージャー UI へのアクセスを有効にします。
    • --scopes: プロジェクト内の Cloud Platform サービスへの API アクセスを有効にします。
  2. SSH ユーティリティを使用して、FLink クラスタのマスターノードでターミナル ウィンドウを開きます。

  3. Dataproc クラスタのマスターノードで Flink YARN セッションを開始します。

    . /usr/bin/flink-yarn-daemon
    

    Dataproc クラスタの Flink バージョンをメモしておきます。

    flink --version
    
  4. ローカルマシン上で、正規の Beam ワードカウントの例を Java に生成します。

    Dataproc クラスタ上で、Flink バージョンと互換性のある Beam バージョンを選択します。Beam-Flink バージョンの互換性を一覧表している Flink バージョンの互換性の表をご覧ください。

    生成された POM ファイルを開きます。タグ <flink.artifact.name> で指定された Beam Flink ランナーのバージョンを確認します。Flink アーティファクト名の Beam Flink ランナー バージョンが、クラスタの Flink バージョンと一致しない場合は、バージョン番号を更新して一致するようにします。

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. ワードカウントの例をパッケージ化します。

    mvn package -Pflink-runner
    
  6. パッケージ化された uber JAR ファイル word-count-beam-bundled-0.1.jar(~ 135 MB)を Dataproc クラスタのマスターノードにアップロードします。gsutil cp を使用すると、Cloud Storage から Dataproc クラスタへのファイル転送を高速化できます。

    1. ローカル ターミナル上で、Cloud Storage バケットを作成し、uber JAR をアップロードします。

      gsutil mb BUCKET_NAME
      
      gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Dataproc のマスターノード上で、uber JAR をダウンロードします。

      gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Dataproc クラスタのマスターノード上で Java Beam ジョブを実行します。

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. 結果が Cloud Storage バケットに書き込まれていることを確認します。

    gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Flink YARN セッションを停止します。

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

ポータブル Beam ジョブ

Python、Go、その他のサポートされている言語で記述された Beam ジョブを実行するには、Beam の Flink Runnerで説明されているように、FlinkRunnerPortableRunner を使用します(ポータビリティ フレームワークのロードマップもご覧ください)。

次の例では、Dataproc クラスタのマスターノードから Python でポータブル Beam ジョブが実行されます。

  1. Flink コンポーネントと Docker コンポーネントの両方を有効にして Dataproc クラスタを作成します。

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    注:

    • --optional-components: Flink と Docker。
    • --image-version: クラスタにインストールされている Flink のバージョンを決定するクラスタのイメージ バージョン(たとえば、最新と以前の 4 つの 2.0.x イメージ リリース バージョンについては、Apache Flink コンポーネントのバージョンをご覧ください)。
    • --region: 使用可能な Dataproc リージョン
    • --enable-component-gateway: Flink ジョブ マネージャー UI へのアクセスを有効にします。
    • --scopes: プロジェクト内の Cloud Platform サービスへの API アクセスを有効にします。
  2. ローカルまたは Cloud Shellgsutil を使用して Cloud Storage バケットを作成します。サンプルのワードカウント プログラムを実行するときに、BUCKET_NAME を指定します。

    gsutil mb BUCKET_NAME
    
  3. クラスタ VM のターミナル ウィンドウで、Flink YARN セッションを開始します。Flink のマスター URL、ジョブが実行される Flink マスターのアドレスに注意してください。サンプルのワードカウント プログラムを実行するときに、FLINK_MASTER_URL を指定します。

    . /usr/bin/flink-yarn-daemon
    

    Dataproc クラスタを実行している Flink のバージョンを表示してメモします。サンプルのワードカウント プログラムを実行するときに、FLINK_VERSION を指定します。

    flink --version
    
  4. ジョブに必要な Python ライブラリをクラスタのマスターノードにインストールします。

  5. クラスタの Flink バージョンと互換性のある Beam バージョンをインストールします。

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. クラスタのマスターノードでワードカウントの例を実行します。

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    注:

    • --runner: FlinkRunner
    • --flink_version: FLINK_VERSION、前述。
    • --flink_master: FLINK_MASTER_URL、前述。
    • --flink_submit_uber_jar: uber JAR を使用して Beam ジョブを実行します。
    • --output: 先ほど作成した BUCKET_NAME
  7. 結果がバケットに書き込まれていることを確認します。

    gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Flink YARN セッションを停止します。

    1. アプリケーション ID を取得します。
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

Dataproc Flink コンポーネントは、Kerberos クラスタをサポートしています。Flink ジョブを送信して保持したり、Flink クラスタを開始したりするには、有効な Kerberos チケットが必要です。デフォルトでは、Kerberos チケットは 7 日間有効です。

Flink ジョブ マネージャー ウェブ インターフェースは、Flink ジョブまたは Flink セッション クラスタの実行中に使用できます。ウェブ インターフェースを使用するには:

  1. コンポーネント ゲートウェイを有効にして Dataproc クラスタを作成する
  2. クラスタを作成後、Google Cloud コンソールの [クラスタの詳細] ページの [ウェブ インターフェース] タブでコンポーネント ゲートウェイYARN ResourceManager リンクをクリックします。
  3. YARN リソース マネージャー UI 上で、Flink クラスタ アプリケーションのエントリを特定します。ジョブの完了ステータスに応じて、[ApplicationMaster] リンクか [履歴] リンクが一覧表示されます。
  4. 長時間実行ストリーミング ジョブの場合は、[ApplicationManager] リンクをクリックして Flink ダッシュボードを開きます。完了したジョブの場合は、[履歴] リンクをクリックしてジョブの詳細を表示します。