Pub/Sub Avro to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに Avro データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。
パイプラインの要件
- 入力 Pub/Sub サブスクリプションが存在していること。
- Avro レコードのスキーマ ファイルが、Cloud Storage に存在していること。
- 未処理の Pub/Sub トピックが存在していること。
- 出力 BigQuery データセットが存在していること。
テンプレートのパラメータ
必須パラメータ
- schemaPath: Avro スキーマ ファイルがある Cloud Storage の場所。例:
gs://path/to/my/schema.avsc
- inputSubscription: 読み取り元の Pub/Sub 入力サブスクリプション(例: projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>)。
- outputTableSpec: 出力を書き込む BigQuery 出力テーブルの場所。たとえば、
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
です。指定されたcreateDisposition
によっては、ユーザーが指定した Avro スキーマを使用して出力テーブルが自動的に作成される場合があります。 - outputTopic: 未処理レコードに使用する Pub/Sub トピック(例: projects/<PROJECT_ID>/topics/<TOPIC_NAME>)。
オプション パラメータ
- useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを true に設定します。exactly-once セマンティクスを使用するには、パラメータを
false
に設定します。このパラメータは、useStorageWriteApi
がtrue
の場合にのみ適用されます。デフォルト値はfalse
です。 - writeDisposition : BigQuery WriteDisposition(https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload)の値。例:
WRITE_APPEND
、WRITE_EMPTY
、WRITE_TRUNCATE
。デフォルトはWRITE_APPEND
です。 - createDisposition: BigQuery CreateDisposition(https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload)。たとえば、
CREATE_IF_NEEDED
やCREATE_NEVER
です。デフォルトはCREATE_IF_NEEDED
です。 - useStorageWriteApi: true の場合、パイプラインは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は
false
です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。 - numStorageWriteApiStreams: Storage Write API を使用する場合は、書き込みストリームの数を指定します。
useStorageWriteApi
がtrue
であり、useStorageWriteApiAtLeastOnce
がfalse
の場合に、このパラメータを設定する必要があります。デフォルト値は 0 です。 - storageWriteApiTriggeringFrequencySec: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。
useStorageWriteApi
がtrue
であり、useStorageWriteApiAtLeastOnce
がfalse
の場合に、このパラメータを設定する必要があります。
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the Pub/Sub Avro to BigQuery template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
SCHEMA_PATH
: Avro スキーマ ファイルへの Cloud Storage パス(例:gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: Pub/Sub 入力サブスクリプション名BIGQUERY_TABLE
: BigQuery 出力テーブル名DEADLETTER_TOPIC
: 未処理のキューに使用する Pub/Sub トピック
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
SCHEMA_PATH
: Avro スキーマ ファイルへの Cloud Storage パス(例:gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: Pub/Sub 入力サブスクリプション名BIGQUERY_TABLE
: BigQuery 出力テーブル名DEADLETTER_TOPIC
: 未処理のキューに使用する Pub/Sub トピック
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。