Text Files on Cloud Storage to Pub/Sub(Stream)テンプレート

このテンプレートは、Cloud Storage にアップロードされた新しいテキスト ファイルを継続的にポーリングし、各ファイルを行単位で読み取り、Pub/Sub トピックに文字列を公開するストリーミング パイプラインを作成します。このテンプレートは、JSON レコードを含む改行区切りのファイルや CSV ファイルのレコードをリアルタイムで処理するために Pub/Sub トピックに公開します。また、Pub/Sub でデータを再生することもできます。

パイプラインは無期限に実行され、「ドレイン」ではなく「キャンセル」によって手動で終了させる必要があります。これは「Watch」変換を使用しているためで、この変換は「SplittableDoFn」であり、ドレインはサポートしていません。

現在、ポーリング間隔は固定されており、10 秒に設定されています。このテンプレートでは、個々のレコードにタイムスタンプを設定しません。このため、実行中はイベント時間と公開時間が同じになります。パイプラインの処理が正確なイベント時間に依存している場合は、このパイプラインを使用しないでください。

パイプラインの要件

  • 入力ファイルは、改行区切りの JSON または CSV 形式である必要があります。ソースファイル内に複数行にわたるレコードがあると、ファイル内の各行がメッセージとして Pub/Sub に公開されるため、ダウンストリームで問題が発生する可能性があります。
  • 実行前に Pub/Sub トピックが存在している必要があります。
  • このパイプラインは無期限で実行されるため、手動で終了する必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 読み込み元の入力ファイルのパターン。たとえば、gs://bucket-name/files/*.jsongs://bucket-name/path/*.csv です。
outputTopic 書き込み先の Pub/Sub 入力トピック。名前は projects/<project-id>/topics/<topic-name> の形式にします。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Text Files on Cloud Storage to Pub/Sub (Stream) template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. 省略可: 1 回限りの処理から 1 回以上のストリーミング モードに切り替えるには、[1 回以上] を選択します。
  8. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILE_PATTERN: Cloud Storage バケットから読み取るファイル パターン glob(例: path/*.csv

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • STAGING_LOCATION: ローカル ファイルをステージングする場所(例: gs://your-bucket/staging
  • TOPIC_NAME: Pub/Sub トピック名
  • BUCKET_NAME: Cloud Storage バケットの名前
  • FILE_PATTERN: Cloud Storage バケットから読み取るファイル パターン glob(例: path/*.csv

次のステップ