Dataproc Flink コンポーネント

Dataproc クラスタを作成する際には、オプション コンポーネント機能を使用して、追加コンポーネントをインストールできます。このページでは Flink コンポーネントについて説明します。

Dataproc Flink コンポーネントは、Apache Flink を Dataproc クラスタにインストールします。

コンポーネントをインストールする

Dataproc クラスタの作成時にコンポーネントをインストールします。 Dataproc Flink コンポーネントは、Dataproc のイメージ 1.5 以降で作成されたクラスタにインストールできます。

Dataproc イメージの各リリースに含まれるコンポーネント バージョンについては、サポートされる Dataproc バージョンをご覧ください。

gcloud コマンド

Flink コンポーネントを含む Dataproc クラスタを作成するには、--optional-componentsフラグを指定した gcloud dataproc clusters create cluster-name コマンドを使用します。

gcloud dataproc clusters create cluster-name \
    --optional-components=FLINK \
    --region=region \
    --image-version=1.5 \
    --enable-component-gateway \
    ... other flags

注: Flink YARN セッションはかなりの YARN リソースを使用するため、デフォルトでは、Dataproc クラスタは Dataproc の起動時に Flink セッションを開始しません。gcloud dataproc clusters create コマンドに --metadata flink-start-yarn-session=true フラグを追加することで、Flink クラスタの起動時にセッションを開始できます。

REST API

Dataproc API を使用して Flink コンポーネントを指定するには、clusters.create リクエストの一部として SoftwareConfig.Component を使用します。

Console

  1. コンポーネントとコンポーネント ゲートウェイを有効にします。
    • Cloud Console で、Dataproc の [クラスタの作成] ページを開きます。[クラスターを設定] パネルが選択されています。
    • [
        コンポーネント] セクションで次の設定を行います。
      • [オプション コンポーネント] で、クラスタにインストールする Flink やその他のオプション コンポーネントを選択します。
      • [コンポーネント ゲートウェイ] で [コンポーネント ゲートウェイを有効にする] を選択します(コンポーネント ゲートウェイの URL を表示してアクセスするをご覧ください)。

Flink を使用する Dataproc クラスタが起動したら、SSH を Dataproc マスターノードに接続してから、Flink ジョブを実行します。

例:

単一の Flink ジョブを実行します。ジョブを受け入れると、Flink は YARN でそのジョブのジョブ マネージャーとスロットの作成を開始します。完了するまで Flink ジョブは YARN クラスタで実行されます。 ジョブが完了するとジョブ マネージャーがシャットダウンされます。ジョブのログは YARN ログで確認できます。

flink run -m yarn-cluster /usr/lib/flink/examples/batch/WordCount.jar

例:

長時間実行される Flink YARN セッションを開始してから、ジョブを実行します。

セッションを開始します。注: または、gcloud dataproc clusters create --metadata flink-start-yarn-session=true フラグを使用して Flink クラスタを作成するときに、Flink YARN セッションを開始することもできます。

. /usr/bin/flink-yarn-daemon

ジョブを実行します。

HADOOP_CLASSPATH=`hadoop classpath` \
    flink run -m JOB_MANAGER_HOSTNAME:REST_API_PORT /usr/lib/flink/examples/batch/WordCount.jar

Apache Beam ジョブの実行

Dataproc では、FlinkRunner を使用して Apache Beam ジョブを実行できます。

Dataproc マスターノードに SSH 接続したら、次の 2 つの方法で Flink で Beam ジョブを実行できます。

  1. Java Beam ジョブ
  2. ポータブルな Beam ジョブ。

Java Beam ジョブ

Beam ジョブを jar ファイルにパッケージ化してから、ジョブを実行します。

mvn package -Pflink-runner
bin/flink run -c org.apache.beam.examples.WordCount /path/to/your.jar \
    --runner=FlinkRunner \
    --other-parameters

ポータブル ビーム ジョブ

Python、Go などのサポート対象言語で記述された Beam ジョブを実行するには、

  1. PortableRunner を使用します(Portability Framework ロードマップをご覧ください)。

  2. Dataproc クラスタは、各クラスタノードに Docker をインストールする Docker コンポーネントを有効にして作成する必要があります。Docker コンポーネントをクラスタに追加するには、Flink コンポーネントと Docker コンポーネントの両方をインストールしてクラスタを作成します。

    gcloud:

    gcloud dataproc clusters create cluster-name \
        --optional-components=FLINK,DOCKER \
        --region=region \
        --image-version=1.5 \
        --enable-component-gateway \
        ... other flags
    

  3. Beam で必要な Python またはその他のライブラリ(apache_beamapache_beam[gcp] など)をインストールします。Flink マスター URL を渡すことも、URL を省略して 1 つのジョブを実行することもできます。

    Python の例:

    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    options = PipelineOptions([
        "--runner=FlinkRunner",
        "--flink_version=1.9",
        "--flink_master=localhost:8081",
        "--environment_type=DOCKER"
    ])
    with beam.Pipeline(options=options) as p:
    

Dataproc Flink コンポーネントは、Kerberos クラスタをサポートしています。Flink ジョブを送信して保持したり、Flink クラスタを開始したりするには、有効な Kerberos チケットが必要です。デフォルトでは、チケットは 7 日間有効です。

Flink ジョブ マネージャー ウェブ インターフェースは、Flink ジョブまたは Flink セッション クラスタの実行中に使用できます。Flink ジョブ マネージャー UI は、YARN の Flink アプリケーションのアプリケーション マスターから開くことができます。

UI アクセスを有効にして使用するには:

  1. コンポーネント ゲートウェイを有効にして Dataproc クラスタを作成します
  2. クラスタを作成後、Google Cloud Console の [クラスタの詳細] ページの [ウェブ インターフェース] タブでコンポーネント ゲートウェイの YARN ResourceManager リンクをクリックします。
  3. YARN リソース マネージャー ページで、Flink クラスタ アプリケーションのエントリを見つけてアプリケーション マスターのリンクをクリックします。
  4. Flink のダッシュボードが開きます。