このテンプレートは、Cloud Storage に保存されたテキスト ファイルからレコードを読み取り、Pub/Sub トピックに公開するバッチ パイプラインを作成します。このテンプレートは、JSON レコードまたは Pub/Sub トピックが改行区りで含まれるファイルまたは CSV ファイルのレコードを Pub/Sub トピックに公開し、リアルタイムで処理する場合に使用できます。また、Pub/Sub でデータを再生することもできます。
このテンプレートでは、個々のレコードにタイムスタンプを設定しません。実行中はイベント時間と公開時間が同じになります。パイプラインの処理が正確なイベント時間に依存している場合は、このパイプラインを使用しないでください。
パイプラインの要件
- 読み込むファイルは、改行区切りの JSON または CSV 形式でなければなりません。ソースファイル内に複数行にわたるレコードがあると、ファイル内の各行がメッセージとして Pub/Sub に公開されるため、ダウンストリームで問題が発生する可能性があります。
- パイプラインを実行する前に、Pub/Sub トピックが存在している必要があります。
テンプレートのパラメータ
必須パラメータ
- inputFilePattern: 読み込み元の入力ファイルのパターン(例: gs://bucket-name/files/*.json)。
- outputTopic: 書き込み先の Pub/Sub 入力トピック。名前は
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
の形式にする必要があります(例: projects/your-project-id/topics/your-topic-name)。
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the Text Files on Cloud Storage to Pub/Sub (Batch) template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_Text_to_Cloud_PubSub \ --region REGION_NAME \ --parameters \ inputFilePattern=gs://BUCKET_NAME/files/*.json,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクトの IDJOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
REGION_NAME
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "gs://BUCKET_NAME/files/*.json", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" }, "environment": { "zone": "us-central1-f" } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクトの IDJOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
LOCATION
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)TOPIC_NAME
: Pub/Sub トピック名BUCKET_NAME
: Cloud Storage バケットの名前
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。