Pub/Sub Avro to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに Avro データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。
パイプラインの要件
- 入力 Pub/Sub サブスクリプションが存在していること。
- Avro レコードのスキーマ ファイルが、Cloud Storage に存在していること。
- 未処理の Pub/Sub トピックが存在していること。
- 出力 BigQuery データセットが存在していること。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
schemaPath |
Avro スキーマ ファイルがある Cloud Storage の場所。例: gs://path/to/my/schema.avsc 。 |
inputSubscription |
読み取り元の Pub/Sub 入力サブスクリプション。例: projects/<project>/subscriptions/<subscription> |
outputTopic |
未処理レコードに使用する Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name> |
outputTableSpec |
BigQuery 出力テーブルの場所。例: <my-project>:<my-dataset>.<my-table> 。createDisposition の指定によっては、ユーザー指定の Avro スキーマを使用して出力テーブルが自動的に作成されます。 |
writeDisposition |
省略可: BigQuery の WriteDisposition。例: WRITE_APPEND 、WRITE_EMPTY 、または WRITE_TRUNCATE 。デフォルト: WRITE_APPEND |
createDisposition |
省略可: BigQuery の CreateDisposition。例: CREATE_IF_NEEDED 、CREATE_NEVER 。デフォルト: CREATE_IF_NEEDED |
useStorageWriteApi |
省略可: true の場合、パイプラインは BigQuery Storage Write API を使用します。デフォルト値は false です。詳細については、Storage Write API の使用をご覧ください。 |
useStorageWriteApiAtLeastOnce |
省略可: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクスを使用するには、このパラメータを true に設定します。exactly-once セマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApi が true の場合にのみ適用されます。デフォルト値は false です。 |
numStorageWriteApiStreams |
省略可: Storage Write API を使用する場合は、書き込みストリームの数を指定します。useStorageWriteApi が true で、useStorageWriteApiAtLeastOnce が false の場合に、このパラメータを設定する必要があります。 |
storageWriteApiTriggeringFrequencySec |
省略可: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。useStorageWriteApi が true で、useStorageWriteApiAtLeastOnce が false の場合に、このパラメータを設定する必要があります。 |
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the Pub/Sub Avro to BigQuery template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
SCHEMA_PATH
: Avro スキーマ ファイルへの Cloud Storage パス(例:gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: Pub/Sub 入力サブスクリプション名BIGQUERY_TABLE
: BigQuery 出力テーブル名DEADLETTER_TOPIC
: 未処理のキューに使用する Pub/Sub トピック
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
SCHEMA_PATH
: Avro スキーマ ファイルへの Cloud Storage パス(例:gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: Pub/Sub 入力サブスクリプション名BIGQUERY_TABLE
: BigQuery 出力テーブル名DEADLETTER_TOPIC
: 未処理のキューに使用する Pub/Sub トピック
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。