Text Files on Cloud Storage to Pub/Sub (Stream) template

This template creates a streaming pipeline that continuously polls for new text files uploaded to Cloud Storage, reads each file line by line, and publishes strings to a Pub/Sub topic. The template publishes records in a newline-delimited file containing JSON records or CSV file to a Pub/Sub topic for real-time processing. You can use this template to replay data to Pub/Sub.

The pipeline runs indefinitely and needs to be terminated manually via a 'cancel' and not a 'drain', due to its use of the 'Watch' transform, which is a 'SplittableDoFn' that does not support draining.

Currently, the polling interval is fixed and set to 10 seconds. This template does not set any timestamp on the individual records, so the event time is equal to the publishing time during execution. If your pipeline relies on an accurate event time for processing, you shouldn't use this pipeline.

Pipeline requirements

  • Input files must be in newline-delimited JSON or CSV format. Records that span multiple lines in the source files can cause issues downstream, because each line within the files is published as a message to Pub/Sub.
  • The Pub/Sub topic must exist prior to execution.
  • The pipeline runs indefinitely and needs to be terminated manually.

Template parameters

Required parameters

  • inputFilePattern : The input file pattern to read from. (Example: gs://bucket-name/files/*.json).
  • outputTopic : The Pub/Sub input topic to write to. The name must be in the format projects/<PROJECT_ID>/topics/<TOPIC_NAME>. (Example: projects/your-project-id/topics/your-topic-name).

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Text Files on Cloud Storage to Pub/Sub (Stream) template.
  6. In the provided parameter fields, enter your parameter values.
  7. Optional: To switch from exactly-once processing to at-least-once streaming mode, select At Least Once.
  8. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging)
  • TOPIC_NAME: your Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • FILE_PATTERN: the file pattern glob to read from in the Cloud Storage bucket (for example, path/*.csv)

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging)
  • TOPIC_NAME: your Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • FILE_PATTERN: the file pattern glob to read from in the Cloud Storage bucket (for example, path/*.csv)

What's next