Dataflow と Managed Service for Apache Kafka を使用する

このページでは、Dataflow パイプラインで Google Cloud Managed Service for Apache Kafka をソースまたはシンクとして使用する方法について説明します。

次のいずれかの方法を使用できます。

要件

  • プロジェクトで Cloud Storage、Dataflow、Managed Service for Apache Kafka の各 API を有効にします。API を有効にするを確認するか、または次の Google Cloud CLI コマンドを実行します。

    gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
    
  • Dataflow ワーカー サービス アカウントには、Identity and Access Management(IAM)のマネージド Kafka クライアント(roles/managedkafka.client)ロールが必要です。

  • Dataflow ワーカー VM には、Kafka ブートストラップ サーバーへのネットワーク アクセスが必要です。詳細については、Managed Service for Apache Kafka ネットワーキングを構成するをご覧ください。

ブートストラップ サーバーのアドレスを取得する

Managed Service for Apache Kafka クラスタに接続するパイプラインを実行するには、まずクラスタのブートストラップ サーバー アドレスを取得します。このアドレスは、パイプラインを構成するときに必要になります。

Google Cloud コンソールまたは Google Cloud CLI を以下で説明するように使用できます。

コンソール

  1. Google Cloud コンソールで、[クラスタ] ページに移動します。

    [クラスタ] に移動

  2. クラスタ名をクリックします。

  3. [構成] タブをクリックします。

  4. [ブートストラップ URL] からブートストラップ サーバーのアドレスをコピーします。

gcloud

managed-kafka clusters describe コマンドを使用します。

gcloud managed-kafka clusters describe CLUSTER_ID \
  --location=LOCATION \
  --format="value(bootstrapAddress)"

次のように置き換えます。

  • CLUSTER_ID: クラスタの ID または名前
  • LOCATION: クラスタのロケーション

詳細については、Managed Service for Apache Kafka クラスタを表示するをご覧ください。

Managed Service for Apache Kafka と Dataflow テンプレートを使用する

Google は、Apache Kafka からデータを読み取る Dataflow テンプレートをいくつか提供しています。

これらのテンプレートは Managed Service for Apache Kafka で使用できます。ユースケースに一致するテンプレートがある場合は、カスタム パイプライン コードを作成するのではなく、そのテンプレートを使用することを検討してください。

コンソール

  1. [Dataflow] > [ジョブ] ページに移動します。

    [ジョブ] に移動

  2. [テンプレートからジョブを作成] をクリックします。

  3. [ジョブ名] に、ジョブの名前を入力します。

  4. [Dataflow テンプレート] プルダウン メニューから、実行するテンプレートを選択します。

  5. [Kafka ブートストラップ サーバー] ボックスに、ブートストラップ サーバーのアドレスを入力します。

  6. [Kafka トピック] ボックスに、トピックの名前を入力します。

  7. [Kafka authentication mode] で [APPLICATION_DEFAULT_CREDENTIALS] を選択します。

  8. [Kafka message format] で、Apache Kafka メッセージの形式を選択します。

  9. 必要に応じて他のパラメータを入力します。サポートされているパラメータは、各テンプレートのドキュメントに記載されています。

  10. ジョブを実行します

gcloud

gcloud dataflow jobs run コマンドを使用します。

gcloud dataflow jobs run JOB_NAME \
  --gcs-location gs://TEMPLATE_FILE \
  --region REGION_NAME \
  --parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...

次のように置き換えます。

  • JOB_NAME: ジョブの名前
  • TEMPLATE_FILE: Cloud Storage 内のテンプレート ファイルの場所
  • REGION_NAME: ジョブをデプロイするリージョン
  • PROJECT_NAME: Google Cloud プロジェクトの名前
  • LOCATION: クラスタのロケーション
  • CLUSTER_ID: クラスタの ID または名前
  • TOPIC: Kafka トピックの名前

Managed Service for Apache Kafka と Beam パイプラインを使用する

このセクションでは、Apache Beam SDK を使用して、Managed Service for Apache Kafka に接続する Dataflow パイプラインを作成して実行する方法について説明します。

ほとんどのシナリオでは、マネージド I/O 変換を Kafka のソースまたはシンクとして使用します。より高度なパフォーマンスの調整が必要な場合は、KafkaIO コネクタの使用を検討してください。マネージド I/O を使用するメリットの詳細については、Dataflow マネージド I/O をご覧ください。

要件

  • Kafka クライアント バージョン 3.6.0 以降。

  • Apache Beam SDK バージョン 2.61.0 以降。

  • Dataflow ジョブを開始するマシンには、Apache Kafka ブートストラップ サーバーへのネットワーク アクセスが必要です。たとえば、クラスタに到達可能な VPC にアクセスできる Compute Engine インスタンスからジョブを開始します。

  • ジョブを作成するプリンシパルには、次の IAM ロールが必要です。

    • マネージド Kafka クライアント(roles/managedkafka.client)。Apache Kafka クラスタにアクセスするために必要です。
    • サービス アカウント ユーザー(roles/iam.serviceAccountUser)。Dataflow ワーカー サービス アカウントとして機能するために必要です。
    • ストレージ管理者(roles/storage.admin)。Cloud Storage にジョブファイルをアップロードするために必要です。
    • Dataflow 管理者(roles/dataflow.admin)。ジョブを作成するために必要です。

    Compute Engine インスタンスからジョブを開始する場合は、VM に接続されているサービス アカウントにこれらのロールを付与できます。詳細については、ユーザー管理のサービス アカウントを使用する VM を作成するをご覧ください。

    ジョブの作成時に、サービス アカウントの権限借用でアプリケーションのデフォルト認証情報(ADC)を使用することもできます。

マネージド I/O を構成する

パイプラインで Apache Kafka 向けマネージド I/O を使用する場合は、Managed Service for Apache Kafka で認証するために次の構成オプションを設定します。

  • "security.protocol": "SASL_SSL"
  • "sasl.mechanism": "OAUTHBEARER"
  • "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
  • "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"

次の例は、Managed Service for Apache Kafka 向けにマネージド I/O を構成する方法を示しています。

Java

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
      .put("bootstrap_servers", options.getBootstrapServer())
      .put("topic", options.getTopic())
      .put("data_format", "RAW")
      // Set the following fields to authenticate with Application Default
      // Credentials (ADC):
      .put("security.protocol", "SASL_SSL")
      .put("sasl.mechanism", "OAUTHBEARER")
      .put("sasl.login.callback.handler.class",
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
      .put("sasl.jaas.config",   "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
      .build();

Python

pipeline
| beam.managed.Read(
    beam.managed.KAFKA,
    config={
      "bootstrap_servers": options.bootstrap_server,
      "topic": options.topic,
      "data_format": "RAW",
      # Set the following fields to authenticate with Application Default
      # Credentials (ADC):
      "security.protocol": "SASL_SSL",
      "sasl.mechanism": "OAUTHBEARER",
      "sasl.login.callback.handler.class":
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
      "sasl.jaas.config":
          "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
    }
)

KafkaIO コネクタを構成する

次の例は、Managed Service for Apache Kafka 向けに KafkaIO コネクタを構成する方法を示しています。

Java

String bootstap = options.getBootstrap();
String topicName = options.getTopic();

// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
    .withBootstrapServers(bootstap)
    .withTopic(topicName)
    .withKeyDeserializer(IntegerSerializer.class)
    .withValueDeserializer(StringDeserializer.class)
    .withGCPApplicationDefaultCredentials())

// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
    .withBootstrapServers(bootstrap)
    .withTopic(topicName)
    .withKeySerializer(IntegerSerializer.class)
    .withValueSerializer(StringSerializer.class)
    .withGCPApplicationDefaultCredentials());

Python

WriteToKafka(
  producer_config={
    "bootstrap.servers": options.bootstrap_servers,
    "security.protocol": 'SASL_SSL',
    "sasl.mechanism": "OAUTHBEARER",
    "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
    "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
  },
  topic=options.topic,
  key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
  value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)

次のステップ