Apache Kafka to Cloud Storage テンプレートは、Apache Kafka for BigQuery からテキストデータを取り込み、レコードを Cloud Storage に出力するストリーミング パイプラインです。
Apache Kafka to BigQuery テンプレートは、セルフマネージドまたは外部の Kafka でも使用できます。
パイプラインの要件
- 出力先の Cloud Storage バケットが存在する。
- Apache Kafka ブローカー サーバーが動作していて Dataflow ワーカーマシンから到達可能である。
- Apache Kafka トピックが存在している。
Kafka メッセージ形式
Apache Kafka to Cloud Storage テンプレートでは、Kafka から CONFLUENT_AVRO_WIRE_FORMAT
または JSON
形式でメッセージを読み取ります。
出力ファイル形式
出力ファイルの形式は、Kafka 入力メッセージと同じ形式です。たとえば、Kafka メッセージ形式として JSON を選択すると、JSON ファイルが出力 Cloud Storage バケットに書き込まれます。
認証
Apache Kafka to Cloud Storage テンプレートは、Kafka ブローカーに対する SASL / PLAIN 認証をサポートしています。
テンプレートのパラメータ
必須パラメータ
- readBootstrapServerAndTopic: 入力を読み取る Kafka トピック
- kafkaReadAuthenticationMode: Kafka クラスタで使用する認証モード。認証なしの場合は NONE を、SASL / PLAIN のユーザー名とパスワードの場合は SASL_PLAIN を指定します。Apache Kafka for BigQuery は SASL_PLAIN 認証モードのみをサポートします。デフォルト値は SASL_PLAIN です。
- outputDirectory: 出力ファイルを書き込むパスとファイル名の接頭辞。末尾はスラッシュでなければなりません例: gs://your-bucket/your-path/。
- messageFormat: 読み取る Kafka メッセージの形式。サポートされる値は、AVRO_CONFLUENT_WIRE_FORMAT(Confluent Schema Registry でエンコードされた Avro)、AVRO_BINARY_ENCODING(プレーンなバイナリ Avro)、JSON です。デフォルトは AVRO_CONFLUENT_WIRE_FORMAT です。
オプション パラメータ
- windowDuration: Cloud Storage へのデータ書き込みが行われるウィンドウ期間またはサイズ。指定できる形式は、秒が Ns(例: 5s)、分が Nm(例: 12m)、時が Nh(例: 2h)です(例: 5m)。デフォルトは 5m です。
- outputFilenamePrefix: ウィンドウ処理された各ファイルに配置する接頭辞。(例: output-)。デフォルトは output です。
- numShards: 書き込み時に生成される出力シャードの最大数。シャード数が多いと Cloud Storage への書き込みのスループットが高くなりますが、出力 Cloud Storage ファイルの処理時にシャード全体のデータ集計コストが高くなる可能性があります。デフォルト値は Dataflow によって決定されます。
- enableCommitOffsets: 処理済みメッセージのオフセットを Kafka に commit します。有効にすると、パイプライン再開時のメッセージの処理のギャップや重複を最小限に抑えることができます。コンシューマー グループ ID を指定する必要があります。デフォルトは false です。
- consumerGroupId: このパイプラインが属するコンシューマー グループの固有識別子。Kafka へのオフセット commit が有効な場合は必須です。デフォルトは空です。
- kafkaReadOffset: commit されたオフセットが存在しない場合にメッセージを読み始めるポイント。最も古いメッセージから始まり、最新のメッセージが最後になります。デフォルトは latest です。
- kafkaReadUsernameSecretId: SASL_PLAIN 認証で使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレットの ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。
- kafkaReadPasswordSecretId: SASL_PLAIN 認証で使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレットの ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。
- schemaFormat: Kafka スキーマの形式。SINGLE_SCHEMA_FILE または SCHEMA_REGISTRY として提供できます。SINGLE_SCHEMA_FILE が指定されている場合、すべてのメッセージは avro スキーマ ファイルで言及されているスキーマを持つ必要があります。SCHEMA_REGISTRY が指定されている場合、メッセージは 1 つのスキーマまたは複数のスキーマを持つことができます。デフォルトは SINGLE_SCHEMA_FILE です。
- confluentAvroSchemaPath: トピック内のすべてのメッセージをデコードするために使用される単一の Avro スキーマ ファイルの Google Cloud Storage パス。デフォルトは空です。
- schemaRegistryConnectionUrl: メッセージのデコード用に Avro スキーマを管理するために使用される Confluent Schema Registry のインスタンスの URL。デフォルトは空です。
- binaryAvroSchemaPath: バイナリでエンコードされた Avro メッセージをデコードするために使用される Avro スキーマ ファイルの Google Cloud Storage パス。デフォルトは空です。
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the Kafka to Cloud Storage template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- 省略可: 1 回限りの処理から 1 回以上のストリーミング モードに切り替えるには、[1 回以上] を選択します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Cloud Storage \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクトの IDJOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
BIGQUERY_TABLE
: 実際の Cloud Storage テーブル名KAFKA_TOPICS
: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping
をご覧ください。PATH_TO_JAVASCRIPT_UDF_FILE
: 使用する JavaScript ユーザー定義関数(UDF)を定義する.js
ファイルの Cloud Storage URI(例:gs://my-bucket/my-udfs/my_file.js
)JAVASCRIPT_FUNCTION
: 使用する JavaScript ユーザー定義関数(UDF)の名前たとえば、JavaScript 関数のコードが
myTransform(inJson) { /*...do stuff...*/ }
の場合、関数名はmyTransform
です。JavaScript UDF の例については、UDF の例をご覧ください。KAFKA_SERVER_ADDRESSES
: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーからアクセス可能なポート番号を付加する必要があります。例:35.70.252.199:9092
。複数のアドレスを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping
をご覧ください。
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Cloud Storage", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクトの IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
BIGQUERY_TABLE
: 実際の Cloud Storage テーブル名KAFKA_TOPICS
: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping
をご覧ください。PATH_TO_JAVASCRIPT_UDF_FILE
: 使用する JavaScript ユーザー定義関数(UDF)を定義する.js
ファイルの Cloud Storage URI(例:gs://my-bucket/my-udfs/my_file.js
)JAVASCRIPT_FUNCTION
: 使用する JavaScript ユーザー定義関数(UDF)の名前たとえば、JavaScript 関数のコードが
myTransform(inJson) { /*...do stuff...*/ }
の場合、関数名はmyTransform
です。JavaScript UDF の例については、UDF の例をご覧ください。KAFKA_SERVER_ADDRESSES
: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーからアクセス可能なポート番号を付加する必要があります。例:35.70.252.199:9092
。複数のアドレスを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping
をご覧ください。
詳細については、Dataflow で Kafka から Cloud Storage にデータを書き込むをご覧ください。
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。