Dataproc の Flink オプション コンポーネント

Dataproc クラスタを作成する際には、オプション コンポーネント機能を使用して、Flink などの追加コンポーネントを有効化できます。このページでは、Apache Flink のオプション コンポーネント(Flink クラスタ)を有効にして Dataproc クラスタを作成し、そのクラスタで Flink ジョブを実行する方法について説明します。

Flink クラスタを使用すると、次のことができます。

  1. Google Cloud コンソール、Google Cloud CLI、または Dataproc API から Dataproc Jobs リソースを使用して Flink ジョブを実行する

  2. Flink クラスタのマスターノードで実行されている flink CLI を使用して Flink ジョブを実行する

  3. Flink で Apache Beam ジョブを実行する

  4. Kerberos クラスタで Flink を実行する

Google Cloud コンソール、Google Cloud CLI、または Dataproc API を使用して、クラスタで Flink コンポーネントが有効になっている Dataproc クラスタを作成できます。

推奨: Flink コンポーネントで標準の 1 マスター VM クラスタを使用します。 Dataproc 高可用性モードクラスタ(3 つのマスター VM を含む)では、Flink 高可用性モードはサポートされていません。

Google Cloud コンソール、Google Cloud CLI、または Dataproc API から Dataproc の Jobs リソースを使用して Flink ジョブを実行できます。

コンソール

コンソールから Flink ワードカウント ジョブのサンプルを送信するには:

  1. ブラウザの Google Cloud コンソールで Dataproc の [ジョブを送信] ページを開きます。

  2. [ジョブを送信] ページのフィールドに入力します。

    1. クラスタリストから [クラスタ] 名を選択します。
    2. [ジョブタイプ] を Flink に設定します。
    3. [メインクラスまたは JAR] を org.apache.flink.examples.java.wordcount.WordCount に設定します。
    4. [JAR ファイル] を file:///usr/lib/flink/examples/batch/WordCount.jar に設定します。
      • file:/// は、クラスタにあるファイルを示します。Dataproc は Flink クラスタの作成時に WordCount.jar をインストールしました。
      • このフィールドには、Cloud Storage パス(gs://BUCKET/JARFILE)または Hadoop 分散ファイル システム(HDFS)パス(hdfs://PATH_TO_JAR)も指定できます。
  3. [送信] をクリックします。

    • ジョブドライバ出力が [ジョブの詳細] ページに表示されます。
    • Google Cloud コンソールで Dataproc の [ジョブ] ページに Flink ジョブが表示されます。
    • ジョブを停止または削除するには、[ジョブ] ページまたは [ジョブの詳細] ページで [停止] または [削除] をクリックします。

gcloud

Dataproc Flink クラスタに Flink ジョブを送信するには、ターミナル ウィンドウまたは Cloud Shell で gcloud CLI の gcloud dataproc ジョブの送信 コマンドをローカルに実行します。

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

注:

  • CLUSTER_NAME: ジョブを送信する Dataproc Flink クラスタの名前を指定します。
  • REGION: クラスタが配置される Compute Engine のリージョンを指定します。
  • MAIN_CLASS: Flink アプリケーションの main クラスを指定します。次に例を示します。
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE: Flink アプリケーションの jar ファイルを指定します。以下を指定できます。
    • file:/// 接頭辞を使用して、クラスタにインストールされている jar ファイル:
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • Cloud Storage 内の jar ファイル: gs://BUCKET/JARFILE
    • HDFS 内の jar ファイル: hdfs://PATH_TO_JAR
  • JOB_ARGS: 必要に応じて、ダブルダッシュ(--)の後にジョブ引数を追加します。

  • ジョブを送信すると、ジョブドライバ出力がローカルターミナルまたは Cloud Shell ターミナルに表示されます。

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)
    

REST

このセクションでは、Dataproc jobs.submit API を使用して Dataproc Flink クラスタに Flink ジョブを送信する方法を説明します。

リクエストのデータを使用する前に、次のように置き換えます。

  • PROJECT_ID: Google Cloud プロジェクト ID
  • REGION: クラスタ リージョン
  • CLUSTER_NAME: ジョブを送信する Dataproc Flink クラスタの名前を指定します。

HTTP メソッドと URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

リクエストの本文(JSON):

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

リクエストを送信するには、次のいずれかのオプションを展開します。

次のような JSON レスポンスが返されます。

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
  • Google Cloud コンソールで Dataproc の [ジョブ] ページに Flink ジョブが表示されます。
  • Google Cloud コンソールの [ジョブ] ページまたは [ジョブの詳細] ページで [停止] または [削除] をクリックすると、ジョブの停止または削除ができます。

Dataproc Jobs リソースを使用して Flink ジョブを実行する代わりに、flink CLI を使用して Flink クラスタのマスターノードで Flink ジョブを実行できます。

以降のセクションでは、Dataproc Flink クラスタで flink CLI ジョブを実行するさまざまな方法について説明します。

  1. マスターノードに SSH で接続: SSH ユーティリティを使用して、クラスタのマスター VM でターミナル ウィンドウを開きます。

  2. クラスパスを設定する: Flink クラスタ マスター VM の SSH ターミナル ウィンドウから Hadoop クラスパスを初期化します。

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. Flink ジョブを実行する: Flink ジョブをさまざまな YARN のデプロイモードで実行できます。アプリケーション、ジョブごと、セッション モードで行います。

    1. アプリケーション モード: Flink アプリケーション モードは、Dataproc イメージ バージョン 2.0 以降でサポートされています。このモードでは、YARN Job Manager でジョブの main() メソッドが実行されます。クラスタは、ジョブが終了するとシャットダウンします。

      ジョブ送信の例:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      実行中のジョブを一覧表示します。

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      実行中のジョブをキャンセルします。

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. ジョブごとモード: この Flink モードでは、クライアント側でジョブの main() メソッドを実行します。

      ジョブ送信の例:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. セッション モード: 長時間実行される Flink YARN セッションを開始してから、1 つ以上のジョブをセッションに送信します。

      1. セッションの開始: 次のいずれかの方法で Flink セッションを開始できます。

        1. Flink クラスタを作成し、gcloud dataproc clusters create コマンドに --metadata flink-start-yarn-session=true フラグを追加します(Dataproc Flink クラスタを作成するをご覧ください)。このフラグを有効にすると、クラスタの作成後に Dataproc が /usr/bin/flink-yarn-daemon を実行し、クラスタで Flink セッションを開始します。

          セッションの YARN アプリケーション ID は /tmp/.yarn-properties-${USER} に保存されます。ID を一覧表示するには、yarn application -list コマンドを使用します。

        2. カスタム設定を使用して、クラスタ マスター VM にプリインストールされている Flink yarn-session.sh スクリプトを実行します。

          カスタム設定の例:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. デフォルト設定で /usr/bin/flink-yarn-daemon ラッパー スクリプトを実行します。

          . /usr/bin/flink-yarn-daemon
          
      2. セッションにジョブを送信する: 次のコマンドを実行して、セッションに Flink ジョブを送信します。

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL: ジョブが実行される Flink マスター VM の URL(ホストとポートを含む)。URL から http:// prefix を削除します。この URL は、Flink セッションを開始するときにコマンド出力に表示されます。次のコマンドを実行して、この URL を Tracking-URL フィールドに一覧表示します。
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. セッション内のジョブを一覧表示する: セッション内の Flink ジョブを一覧表示するには、次のいずれかを行います。

        • 引数なしで flink list を実行します。このコマンドは、/tmp/.yarn-properties-${USER} でセッションの YARN アプリケーション ID を検索します。

        • /tmp/.yarn-properties-${USER} または yarn application -list の出力からセッションの YARN アプリケーション ID を取得し、<code> flink list -yid YARN_APPLICATION_ID を実行します。

        • flink list -m FLINK_MASTER_URL を実行します。

      4. セッションを停止する: セッションを停止するには、/tmp/.yarn-properties-${USER} または yarn application -list の出力からセッションの YARN アプリケーション ID を取得して、次のいずれかのコマンドを実行します。:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

FlinkRunner を使用して、Dataproc 上で Apache Beam ジョブを実行できます。

次の方法で、Flink で Beam ジョブを実行できます。

  1. Java Beam ジョブ
  2. ポータブル Beam ジョブ

Java Beam ジョブ

Beam ジョブを JAR ファイルにパッケージ化します。バンドルされた JAR ファイルに、ジョブの実行に必要な依存関係を指定します。

次の例では、Dataproc クラスタのマスターノードから Java Beam ジョブが実行されます。

  1. Flink コンポーネントを有効にして Dataproc クラスタを作成します。

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: クラスタにインストールされている Flink のバージョンを決定するクラスタのイメージ バージョン(たとえば、最新と以前の 4 つの 2.0.x イメージ リリース バージョンについては、Apache Flink コンポーネントのバージョンをご覧ください)。
    • --region: サポートされている Dataproc リージョン
    • --enable-component-gateway: Flink ジョブ マネージャー UI へのアクセスを有効にします。
    • --scopes: クラスタによる Google Cloud API へのアクセスを有効にします(スコープのベスト プラクティスを参照)。 Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform スコープがデフォルトで有効になります(このフラグの設定を指定する必要はありません)。
  2. SSH ユーティリティを使用して、Flink クラスタのマスターノードでターミナル ウィンドウを開きます。

  3. Dataproc クラスタのマスターノード上で Flink YARN セッションを開始します。

    . /usr/bin/flink-yarn-daemon
    

    Dataproc クラスタの Flink バージョンをメモしておきます。

    flink --version
    
  4. ローカルマシン上で、正規の Beam ワードカウントの例を Java に生成します。

    Dataproc クラスタ上で、Flink バージョンと互換性のある Beam バージョンを選択します。Beam-Flink バージョンの互換性を一覧表している Flink バージョンの互換性の表をご覧ください。

    生成された POM ファイルを開きます。タグ <flink.artifact.name> で指定された Beam Flink ランナーのバージョンを確認します。Flink アーティファクト名の Beam Flink ランナー バージョンが、クラスタの Flink バージョンと一致しない場合は、バージョン番号を更新して一致するようにします。

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. ワードカウントの例をパッケージ化します。

    mvn package -Pflink-runner
    
  6. パッケージ化された uber JAR ファイル word-count-beam-bundled-0.1.jar(~ 135 MB)を Dataproc クラスタのマスターノードにアップロードします。gsutil cp を使用すると、Cloud Storage から Dataproc クラスタへのファイル転送を高速化できます。

    1. ローカル ターミナル上で、Cloud Storage バケットを作成し、uber JAR をアップロードします。

      gsutil mb BUCKET_NAME
      
      gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Dataproc のマスターノード上で、uber JAR をダウンロードします。

      gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Dataproc クラスタのマスターノード上で Java Beam ジョブを実行します。

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. 結果が Cloud Storage バケットに書き込まれていることを確認します。

    gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Flink YARN セッションを停止します。

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

ポータブル Beam ジョブ

Python、Go、その他のサポートされている言語で記述された Beam ジョブを実行するには、Beam の Flink Runnerで説明されているように、FlinkRunnerPortableRunner を使用します(ポータビリティ フレームワークのロードマップもご覧ください)。

次の例では、Dataproc クラスタのマスターノードから Python でポータブル Beam ジョブが実行されます。

  1. Flink コンポーネントと Docker コンポーネントの両方を有効にして Dataproc クラスタを作成します。

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    注:

    • --optional-components: Flink と Docker。
    • --image-version: クラスタにインストールされている Flink のバージョンを決定するクラスタのイメージ バージョン(たとえば、最新と以前の 4 つの 2.0.x イメージ リリース バージョンについては、Apache Flink コンポーネントのバージョンをご覧ください)。
    • --region: 使用可能な Dataproc リージョン
    • --enable-component-gateway: Flink ジョブ マネージャー UI へのアクセスを有効にします。
    • --scopes: クラスタによる Google Cloud API へのアクセスを有効にします(スコープのベスト プラクティスを参照)。 Dataproc イメージ バージョン 2.1 以降を使用するクラスタを作成すると、cloud-platform スコープがデフォルトで有効になります(このフラグの設定を指定する必要はありません)。
  2. gsutil をローカルまたは Cloud Shell で使用して、Cloud Storage バケットを作成します。サンプルのワードカウント プログラムの実行時に、BUCKET_NAME を指定します。

    gsutil mb BUCKET_NAME
    
  3. クラスタ VM のターミナル ウィンドウで、Flink YARN セッションを開始します。Flink のマスター URL、ジョブが実行される Flink マスターのアドレスに注意してください。サンプルのワードカウント プログラムを実行するときに、FLINK_MASTER_URL を指定します。

    . /usr/bin/flink-yarn-daemon
    

    Dataproc クラスタを実行している Flink のバージョンを表示してメモします。サンプルのワードカウント プログラムを実行するときに、FLINK_VERSION を指定します。

    flink --version
    
  4. ジョブに必要な Python ライブラリをクラスタ マスターノードにインストールします。

  5. クラスタに Flink バージョンと互換性のある Beam バージョンをインストールします。

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. クラスタ マスターノードでワードカウントの例を実行します。

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    注:

    • --runner: FlinkRunner
    • --flink_version: FLINK_VERSION、前述。
    • --flink_master: FLINK_MASTER_URL、前述。
    • --flink_submit_uber_jar: uber JAR を使用して Beam ジョブを実行します。
    • --output: 先ほど作成した BUCKET_NAME
  7. 結果がバケットに書き込まれていることを確認します。

    gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Flink YARN セッションを停止します。

    1. アプリケーション ID を取得します。
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

Dataproc Flink コンポーネントは、Kerberos クラスタをサポートしています。Flink ジョブを送信して保持したり、Flink クラスタを開始したりするには、有効な Kerberos チケットが必要です。デフォルトでは、Kerberos チケットは 7 日間有効です。

Flink ジョブ マネージャー ウェブ インターフェースは、Flink ジョブまたは Flink セッション クラスタの実行中に使用できます。ウェブ インターフェースを使用するには:

  1. Dataproc Flink クラスタを作成します
  2. クラスタを作成後、Google Cloud コンソールの [クラスタの詳細] ページの [ウェブ インターフェース] タブでコンポーネント ゲートウェイYARN ResourceManager リンクをクリックします。
  3. YARN リソース マネージャー UI 上で、Flink クラスタ アプリケーションのエントリを特定します。ジョブの完了ステータスに応じて、[ApplicationMaster] リンクか [履歴] リンクが一覧表示されます。
  4. 長時間実行ストリーミング ジョブの場合は、[ApplicationManager] リンクをクリックして Flink ダッシュボードを開きます。完了したジョブの場合は、[履歴] リンクをクリックしてジョブの詳細を表示します。