The Pub/Sub Topic or Subscription to Cloud Storage Text template is a streaming pipeline that reads records from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.
Pipeline requirements
- The Pub/Sub topic or subscription must exist prior to execution.
- The messages published to the topic must be in text format.
- The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file.
Template parameters
Required parameters
- outputDirectory : The path and filename prefix to write write output files to. This value must end in a slash. (Example: gs://your-bucket/your-path).
Optional parameters
- inputTopic : The Pub/Sub topic to read the input from. The topic name should be in the format
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
. If this parameter is provided don't useinputSubscription
. (Example: projects/your-project-id/topics/your-topic-name). - inputSubscription : The Pub/Sub subscription to read the input from. The subscription name uses the format
projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_NAME>
. If this parameter is provided, don't useinputTopic
. (Example: projects/your-project-id/subscriptions/your-subscription-name). - userTempLocation : The user provided directory to output temporary files to. Must end with a slash.
- outputFilenamePrefix : The prefix to place on each windowed file. (Example: output-). Defaults to: output.
- outputFilenameSuffix : The suffix to place on each windowed file, typically a file extension such as
.txt
or.csv
. (Example: .txt). Defaults to empty. - outputShardTemplate : The shard template defines the dynamic portion of each windowed file. By default, the pipeline uses a single shard for output to the file system within each window. This means that all data outputs into a single file per window. The
outputShardTemplate
defaults toW-P-SS-of-NN
whereW
is the window date range,P
is the pane info,S
is the shard number, andN
is the number of shards. In case of a single file, theSS-of-NN
portion of theoutputShardTemplate
is00-of-01
. - numShards : The maximum number of output shards produced when writing. A higher number of shards means higher throughput for writing to Cloud Storage, but potentially higher data aggregation cost across shards when processing output Cloud Storage files. Defaults to: 0.
- windowDuration : The window duration is the interval in which data is written to the output directory. Configure the duration based on the pipeline's throughput. For example, a higher throughput might require smaller window sizes so that the data fits into memory. Defaults to 5m (5 minutes), with a minimum of 1s (1 second). Allowed formats are: [int]s (for seconds, example: 5s), [int]m (for minutes, example: 12m), [int]h (for hours, example: 2h). (Example: 5m).
- yearPattern : Pattern for formatting the year. Must be one or more of 'y' or 'Y'. Case makes no difference in the year. The pattern can be optionally wrapped by characters that aren't either alphanumeric or the directory ('/') character. Defaults to 'YYYY'.
- monthPattern : Pattern for formatting the month. Must be one or more of the 'M' character. The pattern can be optionally wrapped by characters that aren't alphanumeric or the directory ('/') character. Defaults to 'MM'.
- dayPattern : Pattern for formatting the day. Must be one or more of 'd' for day of month or 'D' for day of year. Case makes no difference in the year. The pattern can be optionally wrapped by characters that aren't either alphanumeric or the directory ('/') character. Defaults to 'dd'.
- hourPattern : Pattern for formatting the hour. Must be one or more of the 'H' character. The pattern can be optionally wrapped by characters that aren't alphanumeric or the directory ('/') character. Defaults to 'HH'.
- minutePattern : Pattern for formatting the minute. Must be one or more of the 'm' character. The pattern can be optionally wrapped by characters that aren't alphanumeric or the directory ('/') character. Defaults to 'mm'.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --project=YOUR_PROJECT_ID \ --region REGION_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
Replace the following:
JOB_NAME
: a unique job name of your choiceREGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
SUBSCRIPTION_NAME
: your Pub/Sub subscription nameBUCKET_NAME
: the name of your Cloud Storage bucket
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex", } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
SUBSCRIPTION_NAME
: your Pub/Sub subscription nameBUCKET_NAME
: the name of your Cloud Storage bucket
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.