Process data using templates

Dataplex provides templates, powered by Dataflow, to perform common data processing tasks like data ingestion, processing, and managing the data lifecycle. This guide describes how to configure and run data processing templates.

Before you begin

Dataplex templates are powered by Dataflow. Before you use templates, enable the Dataflow APIs.

Enable the Dataflow APIs

Notes

  • All templates support common Dataflow pipeline options.

  • Dataplex uses data pipelines to schedule the tasks defined by the templates.

  • You can only see tasks that you schedule through Dataplex on the Google Cloud console Dataplex page.

Template: Convert raw data to curated data

The Dataplex file format conversion template converts data in a Dataplex Cloud Storage asset, or a list of Dataplex entities stored in CSV or JSON formats, to Parquet or Avro format-data in another Dataplex asset. The partition layout is preserved in the conversion. It also supports compression of the output files.

Template parameters

Parameter Description
inputAssetOrEntitiesList The Dataplex asset or Dataplex entities that contain the input files. This parameter should follow the format: projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> or projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity1-name>,projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity 2 name>...
outputFileFormat The output file format in Cloud Storage. This parameter should follow the format: PARQUET or AVRO.
outputAsset The name of the Dataplex asset that contains the Cloud Storage bucket where output files will be stored. This parameter should follow the format: projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name>. You can find the outputAsset in the Google Cloud console, in the Dataplex asset Details tab.
outputFileCompression Optional: The output file compression. The default value for this parameter is SNAPPY. Other values for the parameter can be UNCOMPRESSED, SNAPPY, GZIP, or BZIP2. BZIP2 is not supported for PARQUET files.
writeDisposition Optional: Specifies the action that occurs if a destination file already exists. The default value for this parameter is SKIP, which signals to process only those files that don't exist in the destination directory. Other values for the parameter can be OVERWRITE (overwrite any existing files) or FAIL (don't process anything and produce an error if at least one destination file already exists).
updateDataplexMetadata

Optional: Whether to update Dataplex metadata for the newly created entities. The default value for this parameter is false.

If enabled, the pipeline will automatically copy the schema from source to the destination Dataplex entities, and the automated Dataplex Discovery won't run for them. Use this flag if the schema of the source (raw) data is managed by Dataplex.

Run the template

Console

  1. In the Google Cloud console, go to the Dataplex page:

    Go to Dataplex

  2. Navigate to the Process view.

  3. Click Create task.

  4. Under Convert to Curated Formats, click Create task.

  5. Choose a Dataplex lake.

  6. Provide a task name.

  7. Choose a region for task execution.

  8. Fill in the required parameters.

  9. Click Continue.

gcloud

Replace the following:

JOB_NAME: a job name of your choice
PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers
OUTPUT_FILE_FORMAT: your output file format in Cloud Storage
OUTPUT_ASSET: your Dataplex output asset ID

In your shell or terminal, run the template:

gcloud beta dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--region=REGION_NAME \
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview \
--parameters \
inputAssetOrEntitiesList=INPUT_ASSET_OR_ENTITIES_LIST,\
outputFileFormat=OUTPUT_FILE_FORMAT,\
outputAsset=OUTPUT_ASSET

REST API

Replace the following:

PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
JOB_NAME: a job name of your choice
INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers
OUTPUT_FILE_FORMAT: your output file format in Cloud Storage
OUTPUT_ASSET: your Dataplex output asset ID

Submit an HTTP POST request:

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
        "inputAssetOrEntitiesList": "INPUT_ASSET_OR_ENTITIES_LIST",
        "outputFileFormat": "OUTPUT_FILE_FORMAT",
        "outputAsset": "OUTPUT_ASSET",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview",
 }
}

Template: Tier data from a BigQuery asset to a Cloud Storage asset

The Dataplex BigQuery to Cloud Storage template copies data from a Dataplex BigQuery asset to a Dataplex Cloud Storage asset in a Dataplex-compatible layout and format. You can specify a BigQuery dataset or a list of BigQuery tables to be copied. For additional flexibility, the template allows for copying data older than a specified modification date and allows for optionally deleting data from BigQuery after a successful copy.

When copying partitioned tables from BigQuery to Cloud Storage:

  • The template creates Hive-style partitions on the Cloud Storage bucket. BigQuery cannot have the Hive-style partition key be the same as an existing column. You can use the option enforceSamePartitionKey to either create a new partition key or keep the same partition key but rename the existing column.
  • Dataplex Discovery registers the partition type as string when creating a BigQuery table (and a table in Dataproc Metastore). This may affect your existing partition filters.

There is a limit on the number of tables and partitions that can be transformed in a single template run, which is approximately 300. The exact number depends on the length of the table names and other factors.

Template parameters

Parameter Description
sourceBigQueryDataset The BigQuery dataset to tier data from. This parameter should contain either a Dataplex asset name in the format projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> or a BigQuery dataset ID in the format projects/<name>/datasets/<dataset-id>.
destinationStorageBucketAssetName The Dataplex asset name for the Cloud Storage bucket to tier data to. This parameter should follow the format projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name>.
tables Optional: A comma-separated list of BigQuery tables to tier. If no list is provided, all tables will be tiered. Tables should be specified by their name only (no project/dataset prefix) and are case-sensitive.
exportDataModifiedBeforeDateTime Optional: Use this parameter to move data older than this date (and optional time). For partitioned BigQuery tables, move partitions last modified before this date/time. For non-partitioned tables, move if the table was last modified before this date/time. If not specified, move all tables/partitions. The date/time is parsed in the default time zone by default, but optional suffixes Z and +HH:mm are supported. This parameter should follow the format YYYY-MM-DD or YYYY-MM-DDTHH:mm:ss or YYYY-MM-DDTHH:mm:ss+03:00. Relative date/time is also supported, and should follow the format -PnDTnHnMn.nS (must start with -P, which indicates time in the past).
fileFormat Optional: The output file format in Cloud Storage. The default value for this parameter is PARQUET. Another value for the parameter can be AVRO.
fileCompression Optional: The output file compression. The default value for this parameter is SNAPPY. Other values for the parameter can be UNCOMPRESSED, SNAPPY, GZIP, or BZIP2. BZIP2 is not supported for PARQUET files.
deleteSourceData Optional: Whether to delete source data from BigQuery after a successful export. Values can be either true or false. The default value for this parameter is false.
partitionIdRegExp Optional: Process partitions with partition ID matching this regular expression only. If no value is provided, this parameter defaults to process all.
writeDisposition Optional: Specifies the action that occurs if a destination file already exists, meaning one or more tables/partitions have already been pre-tiered. The default value for this parameter is SKIP, which signals to process only those tables/partitions that were not already pre-tiered. Other values for the parameter can be OVERWRITE (overwrite any existing files) or FAIL (don't process anything and produce an error if at least one destination file already exists).
enforceSamePartitionKey

Optional: Whether to enforce the same partition key. Due to a BigQuery limitation, it's not possible for the partition key (in the file path) in a partitioned external table to have the same name as one of the columns in the file. If this parameter is true (which is the default value), the partition key of the target file is set to the original partition column name and the column in the file is renamed. If false, the partition key is renamed.

For example, if the original table is partitioned on a column named TS and enforceSamePartitionKey=true, then the destination file path is gs://<bucket>/TS=<partition ID>/<file> and the column is renamed to TS_pkey in the file. This allows existing queries to be executed against the same partitions in either the old table or the new one.

If enforceSamePartitionKey=false, then the destination file path is gs://<bucket>/TS_pid=<partition ID>/<file>, but the column name is kept as TS in the file.

updateDataplexMetadata

Optional: Whether to update Dataplex metadata for the newly created entities. The default value for this parameter is false.

If enabled, the pipeline will automatically copy the schema from source to the destination Dataplex entities, and the automated Dataplex Discovery won't run for them. Use this flag if you are managing the schema of the source BigQuery tables.

Run the template

Console

  1. In the Google Cloud console, go to the Dataplex page:

    Go to Dataplex

  2. Navigate to the Process view.

  3. Click Create Task.

  4. Under Tier from BQ to GCS Assets, click Create task.

  5. Choose a Dataplex lake.

  6. Provide a task name.

  7. Choose a region for task execution.

  8. Fill in the required parameters.

  9. Click Continue.

gcloud

Replace the following:

JOB_NAME: a job name of your choice
PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset
name for the source BigQuery dataset, or the dataset ID
DESTINATION_ASSET_NAME: your Dataplex asset name for
the destination Cloud Storage bucket

In your shell or terminal, run the template:

gcloud beta dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--region=REGION_NAME \
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview \
--parameters \
sourceBigQueryDataset=SOURCE_ASSET_NAME_OR_DATASET_ID,\
destinationStorageBucketAssetName=DESTINATION_ASSET_NAME

REST API

Replace the following:

PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
JOB_NAME: a job name of your choice
SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset
name for the source BigQuery dataset, or the dataset ID
DESTINATION_ASSET_NAME: your Dataplex asset name for
the destination Cloud Storage bucket
REGION_NAME: region in which to run the job

Submit an HTTP POST request:

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch
{
 "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
        "sourceBigQueryDataset": "SOURCE_ASSET_NAME_OR_DATASET_ID",
        "destinationStorageBucketAssetName": "DESTINATION_ASSET_NAME",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview",
 }
}

Schedule other Google Cloud-provided or custom Dataflow templates

Dataplex allows you to schedule and monitor any of the Google Cloud-provided Dataflow templates or your custom Dataflow template in the console.

Schedule

Console

  1. In the Google Cloud console, go to the Dataplex page:

    Go to Dataplex

  2. Navigate to the Process view.

  3. Click Create Task.

  4. Under Author a Dataflow pipeline, click Create Dataflow pipeline.

  5. Choose a Dataplex lake.

  6. Provide a task name.

  7. Choose a region for where to run the task.

  8. Choose a Dataflow template.

  9. Fill in the required parameters.

  10. Click Continue.

Monitor

Console

  1. In the Google Cloud console, go to the Dataplex page:

    Go to Dataplex

  2. Navigate to the Process view.

  3. Click Dataflow pipelines.

  4. Filter by lake or pipeline name.