Google-provided utility templates

Google provides a set of open-source Dataflow templates. For general information about templates, see Dataflow templates. For a list of all Google-provided templates, see Get started with Google-provided templates.

This guide documents utility templates.

File Format Conversion (Avro, Parquet, CSV)

The File Format Conversion template is a batch pipeline that converts files stored on Cloud Storage from one supported format to another.

The following format conversions are supported:

  • CSV to Avro
  • CSV to Parquet
  • Avro to Parquet
  • Parquet to Avro

Requirements for this pipeline:

  • The output Cloud Storage bucket must exist before running the pipeline.

Template parameters

Parameter Description
inputFileFormat The input file format. Must be one of [csv, avro, parquet].
outputFileFormat The output file format. Must be one of [avro, parquet].
inputFileSpec The Cloud Storage path pattern for input files. For example, gs://bucket-name/path/*.csv
outputBucket The Cloud Storage folder to write output files. This path must end with a slash. For example, gs://bucket-name/output/
schema The Cloud Storage path to the Avro schema file. For example, gs://bucket-name/schema/my-schema.avsc
containsHeaders (Optional) The input CSV files contain a header record (true/false). The default value is false. Only required when reading CSV files.
csvFormat (Optional) The CSV format specification to use for parsing records. The default value is Default. See Apache Commons CSV Format for more details.
delimiter (Optional) The field delimiter used by the input CSV files.
outputFilePrefix (Optional) The output file prefix. The default value is output.
numShards (Optional) The number of output file shards.

Running the File Format Conversion template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Convert file formats template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/File_Format_Conversion \
    --parameters \
inputFileFormat=INPUT_FORMAT,\
outputFileFormat=OUTPUT_FORMAT,\
inputFileSpec=INPUT_FILES,\
schema=SCHEMA,\
outputBucket=OUTPUT_FOLDER

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • INPUT_FORMAT: the file format of the input file; must be one of [csv, avro, parquet]
  • OUTPUT_FORMAT: the file format of the output files; must be one of [avro, parquet]
  • INPUT_FILES: the path pattern for input files
  • OUTPUT_FOLDER: your Cloud Storage folder for output files
  • SCHEMA: the path to the Avro schema file

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileFormat": "INPUT_FORMAT",
          "outputFileFormat": "OUTPUT_FORMAT",
          "inputFileSpec": "INPUT_FILES",
          "schema": "SCHEMA",
          "outputBucket": "OUTPUT_FOLDER"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/File_Format_Conversion",
   }
}

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • INPUT_FORMAT: the file format of the input file; must be one of [csv, avro, parquet]
  • OUTPUT_FORMAT: the file format of the output files; must be one of [avro, parquet]
  • INPUT_FILES: the path pattern for input files
  • OUTPUT_FOLDER: your Cloud Storage folder for output files
  • SCHEMA: the path to the Avro schema file

Bulk Compress Cloud Storage Files

The Bulk Compress Cloud Storage Files template is a batch pipeline that compresses files on Cloud Storage to a specified location. This template can be useful when you need to compress large batches of files as part of a periodic archival process. The supported compression modes are: BZIP2, DEFLATE, GZIP. Files output to the destination location will follow a naming schema of original filename appended with the compression mode extension. The extensions appended will be one of: .bzip2, .deflate, .gz.

Any errors which occur during the compression process will be output to the failure file in CSV format of filename, error message. If no failures occur wile running the pipeline, the error file will still be created but will contain no error records.

Requirements for this pipeline:

  • The compression must be in one of the following formats: BZIP2, DEFLATE, GZIP.
  • The output directory must exist prior to running the pipeline.

Template parameters

Parameter Description
inputFilePattern The input file pattern to read from. For example, gs://bucket-name/uncompressed/*.txt.
outputDirectory The output location to write to. For example, gs://bucket-name/compressed/.
outputFailureFile The error log output file to use for write failures that occur during the compression process. For example, gs://bucket-name/compressed/failed.csv. If there are no failures, the file is still created but will be empty. The file contents are in CSV format (Filename, Error) and consist of one line for each file that fails compression.
compression The compression algorithm used to compress the matched files. Must be one of: BZIP2, DEFLATE, GZIP

Running the Bulk Compress Cloud Storage Files template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Bulk Compress Files on Cloud Storage template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://BUCKET_NAME/compressed,\
outputFailureFile=gs://BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • COMPRESSION: your choice of compression algorithm

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/uncompressed/*.txt",
       "outputDirectory": "gs://BUCKET_NAME/compressed",
       "outputFailureFile": "gs://BUCKET_NAME/failed/failure.csv",
       "compression": "COMPRESSION"
   },
   "environment": { "zone": "us-central1-f" }
}

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • COMPRESSION: your choice of compression algorithm

Bulk Decompress Cloud Storage Files

The Bulk Decompress Cloud Storage Files template is a batch pipeline that decompresses files on Cloud Storage to a specified location. This functionality is useful when you want to use compressed data to minimize network bandwidth costs during a migration, but would like to maximize analytical processing speed by operating on uncompressed data after migration. The pipeline automatically handles multiple compression modes during a single run and determines the decompression mode to use based on the file extension (.bzip2, .deflate, .gz, .zip).

Requirements for this pipeline:

  • The files to decompress must be in one of the following formats: Bzip2, Deflate, Gzip, Zip.
  • The output directory must exist prior to running the pipeline.

Template parameters

Parameter Description
inputFilePattern The input file pattern to read from. For example, gs://bucket-name/compressed/*.gz.
outputDirectory The output location to write to. For example, gs://bucket-name/decompressed.
outputFailureFile The error log output file to use for write failures that occur during the decompression process. For example, gs://bucket-name/decompressed/failed.csv. If there are no failures, the file is still created but will be empty. The file contents are in CSV format (Filename, Error) and consist of one line for each file that fails decompression.

Running the Bulk Decompress Cloud Storage Files template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Bulk Decompress Files on Cloud Storage template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • OUTPUT_FAILURE_FILE_PATH: your choice of path to the file containing failure information

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/compressed/*.gz",
       "outputDirectory": "gs://BUCKET_NAME/decompressed",
       "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • OUTPUT_FAILURE_FILE_PATH: your choice of path to the file containing failure information

Datastore Bulk Delete

The Datastore Bulk Delete template is a pipeline which reads in Entities from Datastore with a given GQL query and then deletes all matching Entities in the selected target project. The pipeline can optionally pass the JSON encoded Datastore Entities to your Javascript UDF, which you can use to filter out Entities by returning null values.

Requirements for this pipeline:

  • Datastore must be set up in the project prior to running the template.
  • If reading and deleting from separate Datastore instances, the Dataflow Controller Service Account must have permission to read from one instance and delete from the other.

Template parameters

Parameter Description
datastoreReadGqlQuery GQL query which specifies which entities to match for deletion. Using a keys-only query may improve performance. For example: "SELECT __key__ FROM MyKind".
datastoreReadProjectId GCP Project ID of the Datastore instance from which you want to read entities (using your GQL query) that are used for matching.
datastoreDeleteProjectId GCP Project ID of the Datastore instance from which to delete matching entities. This can be the same as datastoreReadProjectId if you want to read and delete within the same Datastore instance.
datastoreReadNamespace (Optional) Namespace of requested Entities. Set as "" for default namespace.
datastoreHintNumWorkers (Optional) Hint for the expected number of workers in the Datastore ramp-up throttling step. Default is 500.
javascriptTextTransformGcsPath (Optional) A Cloud Storage path which contains all your JavaScript code. For example, gs://mybucket/mytransforms/*.js. If you don't want to use a UDF leave this field blank.
javascriptTextTransformFunctionName (Optional) Name of the Function to be called. If this function returns a value of undefined or null for a given Datastore Entity, then that Entity will not be deleted. If you have the javascript code of: "function myTransform(inJson) { ...dostuff...}" then your function name is "myTransform". If you don't want to use a UDF leave this field blank.

Running the Datastore Bulk Delete template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Bulk Delete Entities in Datastore template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete \
    --parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GQL_QUERY: the query you'll use to match entities for deletion
  • DATASTORE_READ_AND_DELETE_PROJECT_ID: your Datastore instance project ID. This example both reads and deletes from the same Datastore instance.

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "GQL_QUERY",
       "datastoreReadProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID",
       "datastoreDeleteProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GQL_QUERY: the query you'll use to match entities for deletion
  • DATASTORE_READ_AND_DELETE_PROJECT_ID: your Datastore instance project ID. This example both reads and deletes from the same Datastore instance.

Streaming Data Generator to Pub/Sub, BigQuery, and Cloud Storage

The Streaming Data Generator template is used to generate either an unlimited or fixed number of synthetic records or messages based on user provided schema at the specified rate. Compatible destinations include Pub/Sub topics, BigQuery tables, and Cloud Storage buckets.

Following are a set of few possible use cases:

  • Simulate large-scale real-time event publishing to a Pub/Sub topic to measure and determine the number and size of consumers required to process published events.
  • Generate synthetic data to a BigQuery table or a Cloud Storage bucket to evaluate performance benchmarks or serve as a proof of concept.

Supported sinks and encoding formats

The following table describes which sinks and encoding formats are supported by this template:
JSON Avro Parquet
Pub/Sub Yes Yes No
BigQuery Yes No No
Cloud Storage Yes Yes Yes

The JSON Data Generator library used by the pipeline allows various faker functions to be used for each schema field. For more information on the faker functions and schema format, see the json-data-generator documentation.

Requirements for this pipeline:

  • Create a message schema file and store this file in a Cloud Storage location.
  • The output target must exist prior to execution. The target must be a Pub/Sub topic, a BigQuery table, or a Cloud Storage bucket depending on sink type.
  • If the output encoding is Avro or Parquet, then create an Avro schema file and store it in a Cloud Storage location.

Template parameters

Parameter Description
schemaLocation Location of the schema file. For example: gs://mybucket/filename.json.
qps Number of messages to be published per second. For example: 100.
sinkType (Optional) Output sink Type. Possible values are PUBSUB, BIGQUERY, GCS. Default is PUBSUB.
outputType (Optional) Output encoding Type. Possible values are JSON, AVRO, PARQUET. Default is JSON.
avroSchemaLocation (Optional) Location of AVRO Schema file. Mandatory when outputType is AVRO or PARQUET. For example: gs://mybucket/filename.avsc.
topic (Optional) Name of the Pub/Sub topic to which the pipeline should publish data. Mandatory when sinkType is Pub/Sub. For example: projects/my-project-ID/topics/my-topic-ID.
outputTableSpec (Optional) Name of the output BigQuery table. Mandatory when sinkType is BigQuery. For example: my-project-ID:my_dataset_name.my-table-name.
writeDisposition (Optional) BigQuery Write Disposition. Possible values are WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE. Default is WRITE_APPEND.
outputDeadletterTable (Optional) Name of the output BigQuery table to hold failed records. If not provided, pipeline creates table during execution with name {output_table_name}_error_records. For example: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Optional) Path of the output Cloud Storage location. Mandatory when sinkType is Cloud Storage. For example: gs://mybucket/pathprefix/.
outputFilenamePrefix (Optional) The filename prefix of the output files written to Cloud Storage. Default is output-.
windowDuration (Optional) Window interval at which output is written to Cloud Storage. Default is 1m (in other words, 1 minute).
numShards (Optional) Maximum number of output shards. Mandatory when sinkType is Cloud Storage and should be set to 1 or higher number.
messagesLimit (Optional) Maximum number of output messages. Default is 0 indicating unlimited.
autoscalingAlgorithm (Optional) Algorithm used for autoscaling the workers. Possible values are THROUGHPUT_BASED to enable autoscaling or NONE to disable.
maxNumWorkers (Optional) Maximum number of worker machines. For example: 10.

Running the Streaming Data Generator template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Streaming Data Generator template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • SCHEMA_LOCATION: the path to schema file in Cloud Storage. For example: gs://mybucket/filename.json.
  • QPS: the number of messages to be published per second
  • PUBSUB_TOPIC: the output Pub/Sub topic. For example: projects/my-project-ID/topics/my-topic-ID.

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Replace the following:

  • PROJECT_ID: the Cloud project ID where you want to run the Dataflow job
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • SCHEMA_LOCATION: the path to schema file in Cloud Storage. For example: gs://mybucket/filename.json.
  • QPS: the number of messages to be published per second
  • PUBSUB_TOPIC: the output Pub/Sub topic. For example: projects/my-project-ID/topics/my-topic-ID.