Pub/Sub Topic to Text Files on Cloud Storage

The Pub/Sub to Cloud Storage Text template is a streaming pipeline that reads records from Pub/Sub topic and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.

Pipeline requirements

  • The Pub/Sub topic must exist prior to execution.
  • The messages published to the topic must be in text format.
  • The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file.

Template parameters

Required parameters

  • outputDirectory : The path and filename prefix for writing output files. For example, gs://bucket-name/path/. This value must end in a slash.
  • outputFilenamePrefix : The prefix to place on each windowed file. For example, output-. Defaults to: output.

Optional parameters

  • inputTopic : The Pub/Sub topic to read the input from. The topic name should be in the format projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • userTempLocation : The user provided directory to output temporary files to. Must end with a slash.
  • outputFilenameSuffix : The suffix to place on each windowed file. Typically a file extension such as .txt or .csv. Defaults to empty.
  • outputShardTemplate : The shard template defines the dynamic portion of each windowed file. By default, the pipeline uses a single shard for output to the file system within each window. Therefore, all data outputs into a single file per window. The outputShardTemplate defaults to W-P-SS-of-NN, where W is the window date range, P is the pane info, S is the shard number, and N is the number of shards. In case of a single file, the SS-of-NN portion of the outputShardTemplate is 00-of-01.
  • yearPattern : Pattern for formatting the year. Must be one or more of y or Y. Case makes no difference in the year. Optionally, wrap the pattern with characters that aren't alphanumeric or the directory ('/') character. Defaults to YYYY.
  • monthPattern : Pattern for formatting the month. Must be one or more of the M character. Optionally, wrap the pattern with characters that aren't alphanumeric or the directory ('/') character. Defaults to MM.
  • dayPattern : Pattern for formatting the day. Must be one or more of d for day of month or D for day of year. Optionally, wrap the pattern with characters that aren't alphanumeric or the directory ('/') character. Defaults to dd.
  • hourPattern : Pattern for formatting the hour. Must be one or more of the H character. Optionally, wrap the pattern with characters that aren't alphanumeric or the directory ('/') character. Defaults to HH.
  • minutePattern : Pattern for formatting the minute. Must be one or more of the m character. Optionally, wrap the pattern with characters that aren't alphanumeric or the directory ('/') character. Defaults to mm.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub to Text Files on Cloud Storage template.
  6. In the provided parameter fields, enter your parameter values.
  7. Optional: To switch from exactly-once processing to at-least-once streaming mode, select At Least Once.
  8. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging)
  • TOPIC_NAME: your Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging)
  • TOPIC_NAME: your Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket

What's next