This template creates a batch pipeline that reads records from text files stored in Cloud Storage and publishes them to a Pub/Sub topic. The template can be used to publish records in a newline-delimited file containing JSON records or CSV file to a Pub/Sub topic for real-time processing. You can use this template to replay data to Pub/Sub.
This template does not set any timestamp on the individual records. The event time is equal to the publishing time during execution. If your pipeline relies on an accurate event time for processing, you must not use this pipeline.
Pipeline requirements
- The files to read need to be in newline-delimited JSON or CSV format. Records spanning multiple lines in the source files might cause issues downstream because each line within the files will be published as a message to Pub/Sub.
- The Pub/Sub topic must exist before running the pipeline.
Template parameters
Required parameters
- inputFilePattern : The input file pattern to read from. (Example: gs://bucket-name/files/*.json).
- outputTopic : The Pub/Sub input topic to write to. The name must be in the format
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
. (Example: projects/your-project-id/topics/your-topic-name).
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Text Files on Cloud Storage to Pub/Sub (Batch) template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_Text_to_Cloud_PubSub \ --region REGION_NAME \ --parameters \ inputFilePattern=gs://BUCKET_NAME/files/*.json,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
REGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
TOPIC_NAME
: your Pub/Sub topic nameBUCKET_NAME
: the name of your Cloud Storage bucket
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "gs://BUCKET_NAME/files/*.json", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" }, "environment": { "zone": "us-central1-f" } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
LOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
TOPIC_NAME
: your Pub/Sub topic nameBUCKET_NAME
: the name of your Cloud Storage bucket
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.