Pub/Sub to Avro Files on Cloud Storage template

The Pub/Sub to Avro files on Cloud Storage template is a streaming pipeline that reads data from a Pub/Sub topic and writes Avro files into the specified Cloud Storage bucket.

Pipeline requirements

  • The input Pub/Sub topic must exist prior to pipeline execution.

Template parameters

Parameter Description
inputTopic Pub/Sub topic to subscribe for message consumption. The topic name must be in the format of projects/<project-id>/topics/<topic-name>.
outputDirectory Output directory where output Avro files are archived. Must contain / at the end. For example: gs://example-bucket/example-directory/.
avroTempDirectory Directory for temporary Avro files. Must contain / at the end. For example: gs://example-bucket/example-directory/.
outputFilenamePrefix (Optional) Output filename prefix for the Avro files.
outputFilenameSuffix (Optional) Output filename suffix for the Avro files.
outputShardTemplate (Optional) The shard template of the output file. It is specified as repeating sequences of the letters S or N. For example, SSS-NNN. These are replaced with either the shard number or the total number of shards, respectively. When this parameter is not specified, the default template format is W-P-SS-of-NN.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub to Avro Files on Cloud Storage template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging)
  • TOPIC_NAME: the Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • FILENAME_PREFIX: the preferred output filename prefix
  • FILENAME_SUFFIX: the preferred output filename suffix
  • SHARD_TEMPLATE: the preferred output shard template

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging)
  • TOPIC_NAME: the Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • FILENAME_PREFIX: the preferred output filename prefix
  • FILENAME_SUFFIX: the preferred output filename suffix
  • SHARD_TEMPLATE: the preferred output shard template

What's next