Apache Kafka to Cloud Storage テンプレート

Apache Kafka to Cloud Storage テンプレートは、Apache Kafka for BigQuery からテキストデータを取り込み、レコードを Cloud Storage に出力するストリーミング パイプラインです。

Apache Kafka to BigQuery テンプレートは、セルフマネージドまたは外部の Kafka でも使用できます。

パイプラインの要件

  • 出力先の Cloud Storage バケットが存在する。
  • Apache Kafka ブローカー サーバーが動作していて Dataflow ワーカーマシンから到達可能である。
  • Apache Kafka トピックが存在している。

Kafka メッセージ形式

Apache Kafka to Cloud Storage テンプレートでは、Kafka から CONFLUENT_AVRO_WIRE_FORMAT または JSON 形式でメッセージを読み取ります。

出力ファイル形式

出力ファイルの形式は、Kafka 入力メッセージと同じ形式です。たとえば、Kafka メッセージ形式として JSON を選択すると、JSON ファイルが出力 Cloud Storage バケットに書き込まれます。

認証

Apache Kafka to Cloud Storage テンプレートは、Kafka ブローカーに対する SASL / PLAIN 認証をサポートしています。

テンプレートのパラメータ

必須パラメータ

  • readBootstrapServerAndTopic: 入力を読み取る Kafka トピック
  • kafkaReadAuthenticationMode: Kafka クラスタで使用する認証モード。認証なしの場合は NONE を、SASL / PLAIN のユーザー名とパスワードの場合は SASL_PLAIN を指定します。Apache Kafka for BigQuery は SASL_PLAIN 認証モードのみをサポートします。デフォルト値は SASL_PLAIN です。
  • outputDirectory: 出力ファイルを書き込むパスとファイル名の接頭辞。末尾はスラッシュでなければなりません例: gs://your-bucket/your-path/。
  • messageFormat: 読み取る Kafka メッセージの形式。サポートされる値は、AVRO_CONFLUENT_WIRE_FORMAT(Confluent Schema Registry でエンコードされた Avro)、AVRO_BINARY_ENCODING(プレーンなバイナリ Avro)、JSON です。デフォルトは AVRO_CONFLUENT_WIRE_FORMAT です。

オプション パラメータ

  • windowDuration: Cloud Storage へのデータ書き込みが行われるウィンドウ期間またはサイズ。指定できる形式は、秒が Ns(例: 5s)、分が Nm(例: 12m)、時が Nh(例: 2h)です(例: 5m)。デフォルトは 5m です。
  • outputFilenamePrefix: ウィンドウ処理された各ファイルに配置する接頭辞。(例: output-)。デフォルトは output です。
  • numShards: 書き込み時に生成される出力シャードの最大数。シャード数が多いと Cloud Storage への書き込みのスループットが高くなりますが、出力 Cloud Storage ファイルの処理時にシャード全体のデータ集計コストが高くなる可能性があります。デフォルト値は Dataflow によって決定されます。
  • enableCommitOffsets: 処理済みメッセージのオフセットを Kafka に commit します。有効にすると、パイプライン再開時のメッセージの処理のギャップや重複を最小限に抑えることができます。コンシューマー グループ ID を指定する必要があります。デフォルトは false です。
  • consumerGroupId: このパイプラインが属するコンシューマー グループの固有識別子。Kafka へのオフセット commit が有効な場合は必須です。デフォルトは空です。
  • kafkaReadOffset: commit されたオフセットが存在しない場合にメッセージを読み始めるポイント。最も古いメッセージから始まり、最新のメッセージが最後になります。デフォルトは latest です。
  • kafkaReadUsernameSecretId: SASL_PLAIN 認証で使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレットの ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。
  • kafkaReadPasswordSecretId: SASL_PLAIN 認証で使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレットの ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。
  • schemaFormat: Kafka スキーマの形式。SINGLE_SCHEMA_FILE または SCHEMA_REGISTRY として提供できます。SINGLE_SCHEMA_FILE が指定されている場合、すべてのメッセージは avro スキーマ ファイルで言及されているスキーマを持つ必要があります。SCHEMA_REGISTRY が指定されている場合、メッセージは 1 つのスキーマまたは複数のスキーマを持つことができます。デフォルトは SINGLE_SCHEMA_FILE です。
  • confluentAvroSchemaPath: トピック内のすべてのメッセージをデコードするために使用される単一の Avro スキーマ ファイルの Google Cloud Storage パス。デフォルトは空です。
  • schemaRegistryConnectionUrl: メッセージのデコード用に Avro スキーマを管理するために使用される Confluent Schema Registry のインスタンスの URL。デフォルトは空です。
  • binaryAvroSchemaPath: バイナリでエンコードされた Avro メッセージをデコードするために使用される Avro スキーマ ファイルの Google Cloud Storage パス。デフォルトは空です。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Kafka to Cloud Storage template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. 省略可: 1 回限りの処理から 1 回以上のストリーミング モードに切り替えるには、[1 回以上] を選択します。
  8. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Cloud Storage \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • BIGQUERY_TABLE: 実際の Cloud Storage テーブル名
  • KAFKA_TOPICS: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping をご覧ください。
  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI(例: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数のコードが myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • KAFKA_SERVER_ADDRESSES: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーからアクセス可能なポート番号を付加する必要があります。例: 35.70.252.199:9092。複数のアドレスを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping をご覧ください。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Cloud Storage",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • BIGQUERY_TABLE: 実際の Cloud Storage テーブル名
  • KAFKA_TOPICS: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping をご覧ください。
  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI(例: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数のコードが myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • KAFKA_SERVER_ADDRESSES: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーからアクセス可能なポート番号を付加する必要があります。例: 35.70.252.199:9092。複数のアドレスを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping をご覧ください。

詳細については、Dataflow で Kafka から Cloud Storage にデータを書き込むをご覧ください。

次のステップ