Dataproc Flink コンポーネント

Dataproc クラスタを作成する際には、オプション コンポーネント機能を使用して、追加コンポーネントをインストールできます。このページでは Flink コンポーネントについて説明します。

Dataproc Flink コンポーネントは、Apache Flink を Dataproc クラスタにインストールします。

コンポーネントをインストールする

Dataproc クラスタの作成時にコンポーネントをインストールします。 Dataproc Flink コンポーネントは、Dataproc のイメージ 1.5 以降で作成されたクラスタにインストールできます。

クラスタ イメージのバージョンによって、クラスタにインストールされている Flink コンポーネントのバージョンが決まります(たとえば、最新と以前の 4 つの 2.0.x イメージ リリースのバージョンについては、リストされているApache Flink コンポーネントのバージョンをご覧ください)。Dataproc イメージの各リリースに含まれるコンポーネント バージョンについては、サポートされる Dataproc バージョンをご覧ください。

gcloud コマンド

Flink コンポーネントを含む Dataproc クラスタを作成するには、--optional-componentsフラグを指定した gcloud dataproc clusters create cluster-name コマンドを使用します。

gcloud dataproc clusters create cluster-name \
    --optional-components=FLINK \
    --region=region \
    --enable-component-gateway \
    --image-version=DATAPROC_IMAGE_VERSION \
    ... other flags

注: Flink YARN セッションはかなりの YARN リソースを使用するため、デフォルトでは、Dataproc クラスタは Dataproc の起動時に Flink セッションを開始しません。gcloud dataproc clusters create コマンドに --metadata flink-start-yarn-session=true フラグを追加することで、Flink クラスタの起動時にセッションを開始できます。

REST API

Dataproc API を使用して Flink コンポーネントを指定するには、clusters.create リクエストの一部として SoftwareConfig.Component を使用します。SoftwareConfig.imageVersion フィールドは、クラスタのイメージ バージョンの設定に使用されます。

Console

  1. コンポーネントとコンポーネント ゲートウェイを有効にします。
    • Cloud Console で、Dataproc の [クラスタの作成] ページを開きます。[クラスターを設定] パネルが選択されています。
    • [バージョニング] セクションで、イメージのタイプとバージョンを確認または変更します。
    • [
        コンポーネント] セクションで次の設定を行います。
      • [コンポーネント ゲートウェイ] で [コンポーネント ゲートウェイを有効にする] を選択します(コンポーネント ゲートウェイの URL を表示してアクセスするをご覧ください)。
      • [オプション コンポーネント] で、クラスタにインストールする Flink やその他のオプション コンポーネントを選択します。

Flink を備えた Dataproc クラスタが起動した後、Dataproc クラスタのマスターノードに SSH で接続し、Flink ジョブを実行します。

例:

単一の Flink ジョブを実行します。ジョブを受け入れると、Flink は YARN でそのジョブのジョブ マネージャーとスロットの作成を開始します。完了するまで Flink ジョブは YARN クラスタで実行されます。 ジョブが完了するとジョブ マネージャーがシャットダウンされます。ジョブのログは YARN ログで確認できます。

flink run -m yarn-cluster /usr/lib/flink/examples/batch/WordCount.jar

例:

長時間実行される Flink YARN セッションを開始してから、ジョブを実行します。

Dataproc クラスタのマスターノードでセッションを開始します。または、gcloud dataproc clusters create --metadata flink-start-yarn-session=true フラグを使用して Flink クラスタを作成するときに、Flink YARN セッションを開始することもできます。

. /usr/bin/flink-yarn-daemon

セッションが正常に開始されたら、FLINK_MASTER_URL のホストとポートをメモします。次のコマンドの JOB_MANAGER_HOSTNAMEREST_API_PORT と、これらのアイテムで置き換えます。ジョブを実行します。

HADOOP_CLASSPATH=`hadoop classpath`

flink run -m JOB_MANAGER_HOSTNAME:REST_API_PORT /usr/lib/flink/examples/batch/WordCount.jar

セッションを停止するには、APPLICATION_IDFlink ジョブ マネージャー UI にある、または yarn application -list の出力からの Flink YARN セッションに関連付けられたアプリケーション ID に置き換えてから、実行します。

yarn application -kill APPLICATION_ID

Apache Beam ジョブの実行

FlinkRunner を使用して、Dataproc 上で Apache Beam ジョブを実行できます。

次の方法で、Flink で Beam ジョブを実行できます。

  1. Java Beam ジョブ
  2. ポータブル Beam ジョブ

Java Beam ジョブ

Beam ジョブを JAR ファイルにパッケージ化します。バンドルされた JAR ファイルに、ジョブの実行に必要な依存関係を指定します。

次の例では、Dataproc クラスタのマスターノードから Java Beam ジョブが実行されます。

  1. Flink コンポーネントを有効にして Dataproc クラスタを作成します。

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: クラスタにインストールされている Flink のバージョンを決定するクラスタのイメージ バージョン(たとえば、最新と以前の 4 つの 2.0.x イメージ リリース バージョンについては、Apache Flink コンポーネントのバージョンをご覧ください)。
    • --region: サポートされている Dataproc リージョン
    • --enable-component-gateway: Flink ジョブ マネージャー UI へのアクセスを有効にします。
    • --scopes: 同じプロジェクト内の GCP サービスへの API アクセスを有効にします。
  2. Dataproc クラスタのマスターノードに SSH で接続します。

  3. Dataproc クラスタのマスターノード上で Flink YARN セッションを開始します。

    . /usr/bin/flink-yarn-daemon
    

    Dataproc クラスタの Flink バージョンをメモしておきます。

    flink --version
    
  4. ローカルマシン上で、正規の Beam ワードカウントの例を Java に生成します。

    Dataproc クラスタ上で、Flink バージョンと互換性のある Beam バージョンを選択します。Beam-Flink バージョンの互換性を一覧表している Flink バージョンの互換性の表をご覧ください。

    生成された POM ファイルを開きます。タグ <flink.artifact.name> で指定された Beam Flink ランナーのバージョンを確認します。Flink アーティファクト名の Beam Flink ランナーのバージョンがクラスタの Flink バージョンと一致しない場合は、一致するようにバージョン番号を更新します。

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. ワードカウントの例をパッケージ化します。

    mvn package -Pflink-runner
    
  6. パッケージ化された Uber JAR ファイル word-count-beam-bundled-0.1.jar(~135 MB)を Dataproc クラスタのマスターノードにアップロードします。gsutil cp を使用すると、Cloud Storage から Dataproc クラスタへのファイル転送を高速化できます。

    1. ローカル ターミナル上で、Cloud Storage バケットを作成し、uber JAR をアップロードします。

      gsutil mb BUCKET_NAME
      
      gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Dataproc のマスターノード上で、uber JAR をダウンロードします。

      gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Dataproc クラスタのマスターノード上で Java Beam ジョブを実行します。

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. 結果が Cloud Storage バケットに書き込まれていることを確認します。

    gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Flink YARN セッションを停止します。

    yarn application -list
    
    yarn application -kill APPLICATION_ID
    

ポータブル Beam ジョブ

Python、Go、その他のサポートされている言語で記述された Beam ジョブを実行するには、Beam の Flink Runnerで説明されているように、FlinkRunnerPortableRunner を使用します(ポータビリティ フレームワークのロードマップもご覧ください)。

次の例では、Dataproc クラスタのマスターノードから Python でポータブル Beam ジョブが実行されます。

  1. Flink コンポーネントと Docker コンポーネントの両方を有効にした Dataproc クラスタを作成します。

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink と Docker。
    • --image-version: クラスタにインストールされている Flink のバージョンを決定するクラスタのイメージ バージョン(たとえば、最新と以前の 4 つの 2.0.x イメージ リリース バージョンについては、Apache Flink コンポーネントのバージョンをご覧ください)。
    • --region: サポートされている Dataproc リージョン
    • --enable-component-gateway: Flink ジョブ マネージャー UI へのアクセスを有効にします。
    • --scopes: 同じプロジェクト内の GCP サービスへの API アクセスを有効にします。
  2. Dataproc クラスタのマスターノードに SSH で接続します。

  3. Cloud Storage バケットを作成する。

    gsutil mb BUCKET_NAME
    
  4. Dataproc クラスタのマスターノード上で Flink YARN セッションを開始し、そのセッションが開始したら Flink マスター URL を保存します。

    . /usr/bin/flink-yarn-daemon
    

    Dataproc クラスタの Flink バージョンをメモしておきます。

    flink --version
    
  5. Dataproc クラスタのマスターノード上で、ジョブに必要な Python ライブラリをインストールします。

    Dataproc クラスタ上で、Flink バージョンと互換性のある Beam バージョンを選択します。Beam-Flink バージョンの互換性を一覧表している Flink バージョンの互換性の表をご覧ください。

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Dataproc クラスタのマスターノード上で、ワードカウントの例を実行します。

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    
    • --runner(必須): FlinkRunner
    • --flink_version(必須): Flink のバージョン。
    • --flink_master(必須): ジョブが実行される Flink マスターのアドレス。
    • --flink_submit_uber_jar(必須): uber JAR を使用して Beam ジョブを実行します。
    • --output(必須): 出力が書き込まれる場所。
  7. 結果がバケットに書き込まれたことを確認します。

    gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Flink YARN セッションを停止します。

    yarn application -list
    
    yarn application -kill APPLICATION_ID
    

Dataproc Flink コンポーネントは、Kerberos クラスタをサポートしています。Flink ジョブを送信して保持したり、Flink クラスタを開始したりするには、有効な Kerberos チケットが必要です。デフォルトでは、チケットは 7 日間有効です。

Flink ジョブ マネージャー ウェブ インターフェースは、Flink ジョブまたは Flink セッション クラスタの実行中に使用できます。YARN 内の Flink アプリケーションのアプリケーション マスターから Flink ジョブ マネージャー UI を開くことができます。

UI アクセスを有効にして使用するには:

  1. コンポーネント ゲートウェイを有効にして Dataproc クラスタを作成します
  2. クラスタを作成後、Google Cloud Console の [クラスタの詳細] ページの [ウェブ インターフェース] タブでコンポーネント ゲートウェイYARN ResourceManager リンクをクリックします。
  3. YARN リソース マネージャー UI 上で、Flink クラスタ アプリケーションのエントリを特定します。ジョブの完了ステータスに応じて、[ApplicationMaster] リンクか [履歴] リンクが一覧表示されます。
  4. 長時間実行ストリーミング ジョブの場合は、[ApplicationManager] リンクをクリックして Flink ダッシュボードを開きます。完了したジョブの場合は、[履歴] リンクをクリックしてジョブの詳細を表示します。