Apache Kafka to Cloud Storage テンプレート

Apache Kafka to Cloud Storage テンプレートは、Google Cloud Managed Service for Apache Kafka からテキストデータを取り込み、レコードを Cloud Storage に出力するストリーミング パイプラインです。

Apache Kafka to BigQuery テンプレートは、セルフマネージドまたは外部の Kafka でも使用できます。

パイプラインの要件

  • 出力先の Cloud Storage バケットが存在する。
  • Apache Kafka ブローカー サーバーが動作していて Dataflow ワーカーマシンから到達可能である。
  • Apache Kafka トピックが存在している。

Kafka メッセージ形式

Apache Kafka to Cloud Storage テンプレートでは、Kafka から CONFLUENT_AVRO_WIRE_FORMAT または JSON 形式でメッセージを読み取ります。

出力ファイル形式

出力ファイルの形式は、Kafka 入力メッセージと同じ形式です。たとえば、Kafka メッセージ形式として JSON を選択すると、JSON ファイルが出力 Cloud Storage バケットに書き込まれます。

認証

Apache Kafka to Cloud Storage テンプレートは、Kafka ブローカーに対する SASL / PLAIN 認証をサポートしています。

テンプレートのパラメータ

必須パラメータ

  • readBootstrapServerAndTopic: 入力を読み取る Kafka トピック
  • outputDirectory: 出力ファイルを書き込むパスとファイル名の接頭辞。末尾はスラッシュでなければなりません(例: gs://your-bucket/your-path/)。
  • kafkaReadAuthenticationMode: Kafka クラスタで使用する認証モード。認証なしの場合は NONE、SASL/PLAIN のユーザー名とパスワードの場合は SASL_PLAIN、証明書ベースの認証の場合は TLS を使用します。APPLICATION_DEFAULT_CREDENTIALS は、アプリケーションのデフォルト認証情報を使用して Google Cloud Apache Kafka for BigQuery で認証できるため、Google Cloud Apache Kafka for BigQuery クラスタにのみ使用する必要があります。
  • messageFormat: 読み取る Kafka メッセージの形式。サポートされる値は、AVRO_CONFLUENT_WIRE_FORMAT(Confluent Schema Registry でエンコードされた Avro)、AVRO_BINARY_ENCODING(プレーンなバイナリ Avro)、JSON です。デフォルトは AVRO_CONFLUENT_WIRE_FORMAT です。
  • useBigQueryDLQ : true の場合、失敗したメッセージは追加のエラー情報とともに BigQuery に書き込まれます。デフォルトは false です。

オプション パラメータ

  • windowDuration: Cloud Storage へのデータ書き込みが行われるウィンドウ期間またはサイズ。指定できる形式は、秒が Ns(例: 5s)、分が Nm(例: 12m)、時が Nh(例: 2h)です(例: 5m)。デフォルトは 5m です。
  • outputFilenamePrefix: ウィンドウ処理された各ファイルに配置する接頭辞(例: output-)。デフォルトは output です。
  • numShards: 書き込み時に生成される出力シャードの最大数。シャード数が多いと Cloud Storage への書き込みのスループットが高くなりますが、出力 Cloud Storage ファイルの処理時にシャード全体のデータ集計コストが高くなる可能性があります。デフォルト値は Dataflow によって決定されます。
  • enableCommitOffsets: 処理済みメッセージのオフセットを Kafka に commit します。有効にすると、パイプライン再開時のメッセージの処理のギャップや重複を最小限に抑えることができます。コンシューマー グループ ID を指定する必要があります。デフォルトは false です。
  • consumerGroupId: このパイプラインが属するコンシューマー グループの固有識別子。Kafka へのオフセット commit が有効な場合は必須です。デフォルトは空です。
  • kafkaReadOffset: commit されたオフセットが存在しない場合にメッセージを読み始めるポイント。最も古いメッセージから始まり、最新のメッセージが最後になります。デフォルトは latest です。
  • kafkaReadUsernameSecretId: SASL_PLAIN 認証で使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレットの ID(例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。デフォルトは空です。
  • kafkaReadPasswordSecretId: SASL_PLAIN 認証で使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレットの ID例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。
  • kafkaReadKeystoreLocation: Kafka クラスタで認証を行う際に使用する TLS 証明書と秘密鍵を含む Java KeyStore(JKS)ファイルの Google Cloud Storage パス例: gs://your-bucket/keystore.jks。
  • kafkaReadTruststoreLocation: Kafka ブローカー ID を確認するための信頼された証明書を含む Java TrustStore(JKS)ファイルの Google Cloud Storage パス。
  • kafkaReadTruststorePasswordSecretId: Kafka TLS 認証用に Java TrustStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager シークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。
  • kafkaReadKeystorePasswordSecretId: Kafka TLS 認証用に Java KeyStore(JKS)ファイルにアクセスするためのパスワードが含まれる Google Cloud Secret Manager シークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。
  • kafkaReadKeyPasswordSecretId: Kafka TLS 認証用の Java KeyStore(JKS)ファイル内の秘密鍵にアクセスするためのパスワードを含む Google Cloud Secret Manager シークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。
  • schemaFormat: Kafka スキーマの形式。SINGLE_SCHEMA_FILE または SCHEMA_REGISTRY として提供できます。SINGLE_SCHEMA_FILE が指定されている場合、すべてのメッセージは avro スキーマ ファイルで言及されているスキーマを持つ必要があります。SCHEMA_REGISTRY が指定されている場合、メッセージは 1 つのスキーマまたは複数のスキーマを持つことができます。デフォルトは SINGLE_SCHEMA_FILE です。
  • confluentAvroSchemaPath: トピック内のすべてのメッセージをデコードするために使用される単一の Avro スキーマ ファイルの Google Cloud Storage パス。デフォルトは空です。
  • schemaRegistryConnectionUrl: メッセージのデコード用に Avro スキーマを管理するために使用される Confluent Schema Registry のインスタンスの URL。デフォルトは空です。
  • binaryAvroSchemaPath: バイナリでエンコードされた Avro メッセージをデコードするために使用される Avro スキーマ ファイルの Google Cloud Storage パス。デフォルトは空です。
  • schemaRegistryAuthenticationMode : Schema Registry 認証モード。NONE、TLS、OAUTH のいずれかです。デフォルトは NONE です。
  • schemaRegistryTruststoreLocation : Schema Registry の認証用トラストストアが保存されている SSL 証明書の場所。(例: /your-bucket/truststore.jks)。
  • schemaRegistryTruststorePasswordSecretId : トラストストア内のシークレットへのアクセス パスワードが保存されている Secret Manager の SecretId。(例: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version)。
  • schemaRegistryKeystoreLocation : SSL 証明書と秘密鍵を含むキーストアの場所。(例: /your-bucket/keystore.jks)。
  • schemaRegistryKeystorePasswordSecretId : キーストア ファイルにアクセスするためのパスワードが格納されている Secret Manager の SecretId(例: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version)。
  • schemaRegistryKeyPasswordSecretId : キーストアに保存されているクライアントの秘密鍵にアクセスするために必要なパスワードの SecretId(例: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version)。
  • schemaRegistryOauthClientId : OAUTH モードで Schema Registry クライアントを認証するために使用されるクライアント ID。AVRO_CONFLUENT_WIRE_FORMAT メッセージ形式で必須です。
  • schemaRegistryOauthClientSecretId : OAUTH モードで Schema Registry クライアントの認証に使用するクライアント シークレットを含む Google Cloud Secret Manager のシークレットの ID。AVRO_CONFLUENT_WIRE_FORMAT メッセージ形式で必須です。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。
  • schemaRegistryOauthScope : OAUTH モードで Schema Registry クライアントを認証するために使用されるアクセス トークン スコープ。このフィールドは省略可能です。スコープ パラメータを渡さずにリクエストを実行できます。(例: openid)。
  • schemaRegistryOauthTokenEndpointUrl : OAUTH モードで Schema Registry クライアントの認証に使用される OAuth/OIDC ID プロバイダの HTTP(S) ベースの URL。AVRO_CONFLUENT_WIRE_FORMAT メッセージ形式で必須です。
  • outputDeadletterTable : 失敗したメッセージの完全修飾 BigQuery テーブル名。さまざまな理由(スキーマの不一致、JSON の形式の誤りなど)により出力テーブルに到達できなかったメッセージは、このテーブルに書き込まれます。このテーブルはテンプレートによって作成されます。(例: your-project-id:your-dataset.your-table-name)。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Kafka to Cloud Storage template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. (省略可)1 回限りの処理から 1 回以上のストリーミング モードに切り替えるには、[1 回以上] を選択します。
  8. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • BIGQUERY_TABLE: 実際の Cloud Storage テーブル名
  • KAFKA_TOPICS: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping をご覧ください。
  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI(例: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • KAFKA_SERVER_ADDRESSES: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーからアクセス可能なポート番号を付加する必要があります。例: 35.70.252.199:9092。複数のアドレスを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping をご覧ください。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • BIGQUERY_TABLE: 実際の Cloud Storage テーブル名
  • KAFKA_TOPICS: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping をご覧ください。
  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI(例: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • KAFKA_SERVER_ADDRESSES: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーからアクセス可能なポート番号を付加する必要があります。例: 35.70.252.199:9092。複数のアドレスを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping をご覧ください。

詳細については、Dataflow で Kafka から Cloud Storage にデータを書き込むをご覧ください。

次のステップ