The Pub/Sub Topic or Subscription to Cloud Storage Text template is a streaming pipeline that reads records from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.
Pipeline requirements
- The Pub/Sub topic or subscription must exist prior to execution.
- The messages published to the topic must be in text format.
- The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file.
Template parameters
Parameter | Description |
---|---|
inputTopic |
The Pub/Sub topic to read the input from. The topic name should be in the
format projects/<project-id>/topics/<topic-name> . If this parameter
is provided inputSubscription shouldn't be provided. |
inputSubscription |
The Pub/Sub subscription to read the input from. The subscription name should
be in the format
projects/<project-id>/subscription/<subscription-name> . If this
parameter is provided inputTopic shouldn't be provided. |
outputDirectory |
The path and filename prefix for writing output files. For example,
gs://bucket-name/path/ . This value must end in a slash. |
outputFilenamePrefix |
The prefix to place on each windowed file. For example, output- . |
outputFilenameSuffix |
The suffix to place on each windowed file, typically a file extension such as
.txt or .csv . |
outputShardTemplate |
The shard template defines the dynamic portion of each windowed file. By default, the
pipeline uses a single shard for output to the file system within each window. This means
that all data outputs into a single file per window. The outputShardTemplate
defaults to W-P-SS-of-NN where W is the window date range,
P is the pane info, S is the shard number, and N is
the number of shards. In case of a single file, the SS-of-NN portion of the
outputShardTemplate is 00-of-01 .
|
windowDuration |
(Optional) The window duration is the interval in which data is written to the output directory. Configure the duration based on the pipeline's throughput. For example, a higher throughput might require smaller window sizes so that the data fits into memory. Defaults to 5m, with a minimum of 1s. Allowed formats are: [int]s (for seconds, example: 5s), [int]m (for minutes, example: 12m), [int]h (for hours, example: 2h). |
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --project=YOUR_PROJECT_ID \ --region REGION_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
Replace the following:
JOB_NAME
: a unique job name of your choiceREGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
SUBSCRIPTION_NAME
: your Pub/Sub subscription nameBUCKET_NAME
: the name of your Cloud Storage bucket
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex", } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
SUBSCRIPTION_NAME
: your Pub/Sub subscription nameBUCKET_NAME
: the name of your Cloud Storage bucket
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.