Pub/Sub Topic or Subscription to Text Files on Cloud Storage template

The Pub/Sub Topic or Subscription to Cloud Storage Text template is a streaming pipeline that reads records from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.

Pipeline requirements

  • The Pub/Sub topic or subscription must exist prior to execution.
  • The messages published to the topic must be in text format.
  • The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file.

Template parameters

Parameter Description
inputTopic The Pub/Sub topic to read the input from. The topic name should be in the format projects/<project-id>/topics/<topic-name>. If this parameter is provided inputSubscription shouldn't be provided.
inputSubscription The Pub/Sub subscription to read the input from. The subscription name should be in the format projects/<project-id>/subscription/<subscription-name>. If this parameter is provided inputTopic shouldn't be provided.
outputDirectory The path and filename prefix for writing output files. For example, gs://bucket-name/path/. This value must end in a slash.
outputFilenamePrefix The prefix to place on each windowed file. For example, output-.
outputFilenameSuffix The suffix to place on each windowed file, typically a file extension such as .txt or .csv.
outputShardTemplate The shard template defines the dynamic portion of each windowed file. By default, the pipeline uses a single shard for output to the file system within each window. This means that all data outputs into a single file per window. The outputShardTemplate defaults to W-P-SS-of-NN where W is the window date range, P is the pane info, S is the shard number, and N is the number of shards. In case of a single file, the SS-of-NN portion of the outputShardTemplate is 00-of-01.
windowDuration (Optional) The window duration is the interval in which data is written to the output directory. Configure the duration based on the pipeline's throughput. For example, a higher throughput might require smaller window sizes so that the data fits into memory. Defaults to 5m, with a minimum of 1s. Allowed formats are: [int]s (for seconds, example: 5s), [int]m (for minutes, example: 12m), [int]h (for hours, example: 2h).

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • SUBSCRIPTION_NAME: your Pub/Sub subscription name
  • BUCKET_NAME: the name of your Cloud Storage bucket

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • SUBSCRIPTION_NAME: your Pub/Sub subscription name
  • BUCKET_NAME: the name of your Cloud Storage bucket

What's next