Pub/Sub Topic or Subscription to Text Files on Cloud Storage テンプレート

Pub/Sub Topic or Subscription to Cloud Storage Text テンプレートは、Pub/Sub からレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、Pub/Sub のデータを簡単に保存して後で使用できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。

パイプラインの要件

  • 実行前に Pub/Sub トピックまたはサブスクリプションが存在している必要があります。
  • トピックに公開するメッセージは、テキスト形式となる必要があります。
  • トピックに公開するメッセージは、改行を含んでいない必要があります。出力ファイルでは、1 つの Pub/Sub メッセージが 1 行に保存されます。

テンプレートのパラメータ

必須パラメータ

  • outputDirectory: 出力ファイルを書き込むパスとファイル名の接頭辞。この値はスラッシュで終わる必要があります(例: gs://your-bucket/your-path)。

オプション パラメータ

  • inputTopic: 読み取り元の Pub/Sub トピック。トピック名は projects/<PROJECT_ID>/topics/<TOPIC_NAME> の形式にします。このパラメータを指定する場合は、inputSubscription を使用しないでください(例: projects/your-project-id/topics/your-topic-name)。
  • inputSubscription: 入力の読み取り元になる Pub/Sub サブスクリプション。サブスクリプション名の形式は projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_NAME> です。このパラメータを指定する場合は、inputTopic を使用しないでください(例: projects/your-project-id/subscriptions/your-subscription-name)。
  • userTempLocation: 一時ファイルを出力するユーザー指定のディレクトリ。末尾はスラッシュにする必要があります。
  • outputFilenamePrefix: ウィンドウ処理された各ファイルに配置する接頭辞(例: output-)。デフォルトは output です。
  • outputFilenameSuffix: ウィンドウ処理されたファイルの接尾辞。通常は、.txt.csv などのファイル拡張子です(例: .txt)。デフォルトは空です。
  • outputShardTemplate: シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに出力されます。outputShardTemplate のデフォルトは W-P-SS-of-NN です。ここで、W はウィンドウ期間、P はペイン情報、S はシャード番号、N はシャード数です。ファイルが 1 つの場合、outputShardTemplateSS-of-NN 部分は 00-of-01 になります。
  • numShards: 書き込み時に生成される出力シャードの最大数。シャード数が多いと Cloud Storage への書き込みのスループットが高くなりますが、出力 Cloud Storage ファイルの処理時にシャード全体のデータ集計コストが高くなる可能性があります。デフォルト値は 0 です。
  • windowDuration: ウィンドウ期間は、出力ディレクトリにデータが書き込まれる間隔です。パイプラインのスループットに基づいて期間を構成します。たとえば、スループットを向上させるには、データがメモリに収まるようにウィンドウ サイズを小さくする必要があります。デフォルトは 5m(5 分)、最小は 1s(1 秒)です。使用できる形式は、[int]s(秒に使用、例: 5s)、[int]m(分に使用、例: 12m)、[int]h(時間に使用、例: 2h)です(例: 5m)。
  • yearPattern: 年のフォーマット パターン。「y」または「Y」を 1 つ以上指定する必要があります。「年」の値に大文字小文字の区別はありません。必要に応じて、パターンを英数字以外の文字またはディレクトリ文字(/)で囲みます。デフォルトは「YYYY」です。
  • monthPattern: 月のフォーマット パターン。「M」の文字を 1 つ以上指定する必要があります。必要に応じて、パターンを英数字以外の文字またはディレクトリ文字(/)で囲みます。デフォルトは「MM」です。
  • dayPattern: 日付のフォーマット パターン。月の日付の場合は「d」、年の日付の場合は「D」の文字を 1 つ以上指定する必要があります。「年」の値に大文字小文字の区別はありません。必要に応じて、パターンを英数字以外の文字またはディレクトリ文字(/)で囲みます。デフォルトは「dd」です。
  • hourPattern: 時間のフォーマット パターン。「H」の文字を 1 つ以上指定する必要があります。必要に応じて、パターンを英数字以外の文字またはディレクトリ文字(/)で囲みます。デフォルトは「HH」です。
  • minutePattern: 分のフォーマット パターン。「m」の文字を 1 つ以上指定する必要があります。必要に応じて、パターンを英数字以外の文字またはディレクトリ文字(/)で囲みます。デフォルトは「mm」です。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • BUCKET_NAME: Cloud Storage バケットの名前

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名
  • BUCKET_NAME: Cloud Storage バケットの名前

次のステップ