The Pub/Sub to Pub/Sub template is a streaming pipeline that reads messages from a Pub/Sub subscription and writes the messages to another Pub/Sub topic. The pipeline also accepts an optional message attribute key and a value that can be used to filter the messages that should be written to the Pub/Sub topic. You can use this template to copy messages from a Pub/Sub subscription to another Pub/Sub topic with an optional message filter.
Pipeline requirements
- The source Pub/Sub subscription must exist prior to execution.
- The source Pub/Sub subscription must be a pull subscription.
- The destination Pub/Sub topic must exist prior to execution.
Template parameters
Required parameters
- inputSubscription : The Pub/Sub subscription to read the input from. (Example: projects/your-project-id/subscriptions/your-subscription-name).
- outputTopic : The Pub/Sub topic to write the output to. (Example: projects/your-project-id/topics/your-topic-name).
Optional parameters
- filterKey : The attribute key to use to filter events. No filters are applied if
filterKey
is not specified. - filterValue : The attribute value to use to filter events when a
filterKey
is provided. By default, a nullfilterValue
is used.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Pub/Sub to Pub/Sub template.
- In the provided parameter fields, enter your parameter values.
- Optional: To switch from exactly-once processing to at-least-once streaming mode, select At Least Once.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Cloud_PubSub \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ filterKey=FILTER_KEY,\ filterValue=FILTER_VALUE
Replace the following:
JOB_NAME
: a unique job name of your choiceREGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION
: the location for staging local files (for example,gs://your-bucket/staging
)SUBSCRIPTION_NAME
: the Pub/Sub subscription nameTOPIC_NAME
: the Pub/Sub topic nameFILTER_KEY
: the attribute key by which events are filtered. No filters are applied if no key is specified.FILTER_VALUE
: filter attribute value to use if an event filter key is provided. Accepts a valid Java Regex string as an event filter value. In case a regex is provided, the complete expression should match in order for the message to be filtered. Partial matches (such as substring) are not filtered. A null event filter value is used by default.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "filterKey": "FILTER_KEY", "filterValue": "FILTER_VALUE" } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION
: the location for staging local files (for example,gs://your-bucket/staging
)SUBSCRIPTION_NAME
: the Pub/Sub subscription nameTOPIC_NAME
: the Pub/Sub topic nameFILTER_KEY
: the attribute key by which events are filtered. No filters are applied if no key is specified.FILTER_VALUE
: filter attribute value to use if an event filter key is provided. Accepts a valid Java Regex string as an event filter value. In case a regex is provided, the complete expression should match in order for the message to be filtered. Partial matches (such as substring) are not filtered. A null event filter value is used by default.
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.