Working with Data Pipelines

Overview

You can use Dataflow Data Pipelines to create recurrent job schedules, understand where resources are spent over multiple job executions, define and manage data freshness objectives, and drill down into individual pipeline stages to fix and optimize your pipelines.

Data Pipeline features:

  • Create a recurring batch pipeline to run a batch job on a schedule.
  • Create a recurring incremental batch pipeline to run a batch job against the latest version of input data.
  • Use the pipeline summary scorecard to view a pipeline's aggregated capacity usage and resource consumption.
  • View a streaming pipeline's data freshness. This metric, which evolves over time, can be tied to an alert that notifies you when freshness falls below a specified objective.
  • Use pipeline metric graphs to compare batch pipeline jobs and find anomalies.
  • Use the thread-time-per-step graph to see a batch pipeline's top three DAG graph stages that contribute to job duration.

Data Pipeline use restrictions:

  • Regional availability: Since Dataflow data pipelines uses Cloud Scheduler, an App Engine application, data pipelines are available in available App Engine regions.

  • Quota limits:

    • Max num of pipelines per project: 500
    • Max num of pipelines per organization: 2500

Types of data pipelines

There are two types of Dataflow data pipelines: streaming and batch. Both types of pipelines run jobs that are defined in Dataflow templates.

Streaming data pipeline
A streaming data pipeline runs a Dataflow streaming job immediately after it is created.
Batch data pipeline
A batch data pipeline runs a Dataflow batch job on a user-defined schedule. The batch pipeline input filename can be parameterized to allow for incremental batch pipeline processing.

Incremental batch pipelines

You can use datetime placeholders to specify an incremental input file format for a batch pipeline.

  • Placeholders for year, month, date, hour, minute, and second can be used, and must follow the strftime() format. Placeholders are preceded by the percentage symbol (%).
  • Parameter formatting is not verified during pipeline creation.
    • Example: If you specify "gs://bucket/Y" as the parameterized input file path, it will evaluated as "gs://bucket/Y", since "Y" without a preceding "%" does not map to the strftime() format.

At each scheduled batch pipeline execution time, the placeholder portion of the input file path is evaluated to the current (or time-shifted) datetime (date values are evaluated using the current date in the time zone of the scheduled job). If the evaluated file path matches the path of an input file, the file will be picked up for processing by the batch pipeline at the scheduled time.

  • Example: A batch pipeline is scheduled to repeat at the start of each hour PST. If you parameterize the input file path as gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv, on April 15, 2021, 6PM PST, the input file path will be evaluated to gs://bucket-name/2021-04-15/prefix-18_00.csv.

Using time shift parameters

You can use + or - minute or hour time shift parameters, enclosed in curly braces with the format, "{[+|-][0-9]+[m|h]}", to support matching an input file path with an evaluated datetime that is shifted before or after the current datetime of the pipeline schedule. The batch pipeline will continue to repeat at its scheduled time, but the input file path will be evaluated with the specified time offset.

  • Example: A batch pipeline is scheduled to repeat at the start of each hour PST. If you parameterize the input file path as gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h}, on April 15, 2021, 6PM PST, the input file path will be evaluated to gs://bucket-name/2021-04-15/prefix-16_00.csv.

Data pipeline roles

For data pipeline operations to succeed, a user must be granted the necessary IAM roles, as follows:

  1. A user must have the appropriate role to perform operations:

  2. A user must be able to act as the service account used by Cloud Scheduler and Dataflow by being granted the roles/iam.serviceAccountUser role on that account. If the user does not select a service account for Cloud Scheduler and Dataflow, the default Compute Engine service account is used.

Creating a data pipeline

You can create a data pipeline in two ways:

  1. Import a job, or
  2. Create a data pipeline

Data pipelines setup page: When you first access the Dataflow pipelines feature in the Cloud Console, a setup page opens.

  1. Enable the listed APIs
  2. Select a region for the App engine application that Cloud Scheduler will use to schedule your pipelines.

Import a job

You can import a Dataflow batch or streaming job that is based on a classic or flex template and make it a data pipeline.

  1. Go to the Dataflow Jobs page in the Cloud Console, select a completed job, then on the Job Details page, select "+IMPORT AS PIPELINE".

  2. On the Create pipeline from template page, the "data pipeline" pipeline option is selected. Other parameters are populated with the options of the imported job.

    1. For a batch job, provide a recurrence schedule in the "Schedule your pipeline" section under Template parameters. Providing an email account address for the Cloud Scheduler, which is used to schedule batch runs, is optional. If it is not specified, the default Compute Engine service account is used. Note: the user must be granted the roles/iam.serviceAccountUser role on the service account used by Cloud Scheduler, whether it is a user-specified or the default Compute Engine service account (see Data pipeline roles).

Create a data pipeline

  1. Go to the Dataflow Pipelines page in the Cloud Console, then select "+CREATE DATA PIPELINE".

  2. On the Create pipeline from template page under Job management, select "Data pipeline", provide a pipeline name, and fill in the other template selection and parameter fields.

    1. For a batch job, provide a recurrence schedule in the "Schedule your pipeline" section under Template parameters. Providing an email account address for the Cloud Scheduler, which is used to schedule batch runs, is optional. If it is not specified, the default Compute Engine service account is used. Note: the user must be granted the roles/iam.serviceAccountUser role on the service account used by Cloud Scheduler, whether it is a user-specified or the default Compute Engine service account (see Data pipeline roles).

Create a batch data pipeline

To create this sample batch data pipeline, you must have access to the following resources in your project:

This example pipeline uses the Cloud Storage Text to BigQuery batch pipeline template, which reads files in CSV format from Cloud Storage, runs a transform, then inserts values into your-project-id:your-dataset-name.three_column_table.

  1. Create the following files on your local drive:
    1. A bq_three_column_table.json file that contains the following schema of the destination BigQuery table.
{
  "BigQuery Schema": [
    {
      "name": "col1",
      "type": "STRING"
    },
    {
      "name": "col2",
      "type": "STRING"
    },
    {
      "name": "col3",
      "type": "INT64"
    }
  ]
}
  1. A split_csv_3cols.js Javascript file, which implements a simple transformation on the input data before insertion into BigQuery.
function transform(line) {
    var values = line.split(',');
    var obj = new Object();
    obj.col1 = values[0];
    obj.col2 = values[1];
    obj.col3 = values[2];
    var jsonString = JSON.stringify(obj);
    return jsonString;
}
  1. A file01.csv CSV file with several records that will be inserted into the BigQuery table.
    b8e5087a,74,27531
    7a52c051,4a,25846
    672de80f,cd,76981
    111b92bf,2e,104653
    ff658424,f0,149364
    e6c17c75,84,38840
    833f5a69,8f,76892
    d8c833ff,7d,201386
    7d3da7fb,d5,81919
    3836d29b,70,181524
    ca66e6e5,d7,172076
    c8475eb6,03,247282
    558294df,f3,155392
    737b82a8,c7,235523
    82c8f5dc,35,468039
    57ab17f9,5e,480350
    cbcdaf84,bd,354127
    52b55391,eb,423078
    825b8863,62,88160
    26f16d4f,fd,397783
      
  2. Use gsutil to copy the files to folders in a Cloud Storage bucket in your project, as follows:
    1. Copy bq_three_column_table.json and split_csv_3cols.js to gs://your-bucket/text_to_bigquery/
      gsutil cp bq_three_column_table.json gs://your-bucket/text_to_bigquery/
        gsutil cp split_csv_3cols.js gs://your-bucket/text_to_bigquery/
      
    2. Copy file01.csv to gs://your-bucket/inputs/
      gsutil cp file01.csv gs://your-bucket/inputs/
      
  3. Create a "tmp" folder in your-bucket from the Cloud Storage browser. Select your folder name to open the Bucket details page, then click CREATE FOLDER to create a "tmp" folder in your bucket.
  4. Go to the Dataflow Pipelines page, then select "CREATE DATA PIPELINE". Enter or select the following items on the Create pipeline from template page:

    1. Job management:
      1. Select "Data pipeline".
      2. Pipeline name: Enter "text_to_bq_batch_data_pipeline".
      3. Click CONTINUE.
    2. Template selection:
      1. Regional endpoint: Select a Compute Engine region.
      2. Template list: Under "Process Data in Bulk (batch)", select "Text File on Cloud Storage to BigQuery. Description: Batch pipeline. Reads text files stored in Cloud Storage, transforms them using a JavaScript user-defined function (UDF), and outputs the result to BigQuery." Note: Don't select the streaming pipeline with the same name under "Process Data Continuously (stream)".
      3. Click CONTINUE.
    3. Template parameters:
      1. Schedule your pipeline: Select a schedule, such as Hourly at Minute 25, in your Timezone. You can edit the schedule after you submit the pipeline, as explained below.
    4. Required parameters:
      1. JavaScript UDF path in Cloud Storage:
        gs://your-bucket/text_to_bigquery/split_csv_3cols.js
        
      2. JSON path:
        gs://your-bucket/text_to_bigquery/bq_three_column_table.json
        
      3. JavaScript UDF name: "transform"
      4. BigQuery output table (fully qualified table name):
        your_project_id:your_dataset.three_column_table
        
      5. Cloud Storage input path:
        gs://your_bucket/inputs/file*.csv
        
      6. _Temporary BigQuery directory:
        gs://your_bucket/tmp
        
      7. Temporary location:
        gs://your_bucket/tmp
        
    5. Click SUBMIT.
  5. Confirm pipeline and template information and view current and previous history from the Pipeline details page.

Create a sample streaming data pipeline

You can create a sample streaming data pipeline by following the sample batch pipeline instructions, with the following differences:

  • Pipeline schedule. You do not specify a schedule for a streaming data pipeline. The Dataflow streaming job is started immediately.

  • Template Selection: Under "Process Data Continuously (stream)", select "Text File on Cloud Storage to BigQuery. Description: A streaming pipeline that can read text files stored in GCS, perform a transform via a user defined JavaScript function, and stream the results into BigQuery. This pipeline requires a JavaScript function and a JSON representation of the BigQuery TableSchema.

  • Worker machine type: The pipeline will process the initial set of files matching the gs://<your_bucket>/inputs/file*.csv pattern and any additional files matching this pattern that you upload to the inputs/ folder. If the size of CSV files exceeds several GBs, to avoid possible out-of-memory errors, select a machine type with higher memory than the default n1-standard-4 machine type, such as the n1-highmem-8.

Investigating pipeline objectives violations

Recurring batch pipelines

On the Pipeline Details page in the Cloud Console, use the Pipeline status panel's "Individual job status" and "Thread time per step" graphs for an initial analysis of the health of your pipeline.

Sample investigation:

  1. You have a recurring batch pipeline that runs every hour at 3 minutes past the hour, each job normally runs for approximately 9 minutes, and you have an objective for all jobs to complete in less than 10 minutes.

  2. A job runs for more than 10 minutes. A review of the "job status" and "thread time per step" graphs shows a correlation between the longer execution time and a longer running blue s2 stage.

  3. In the Update/Execution history table, you find the job that ran during the hour of interest, then click through to the Dataflow job details page. On that page you find the longer running stage, and then look in the logs for possible errors to determine the cause of the delay.

Streaming pipelines

Under the PIPELINE INFO TAB on the Pipeline Details page in the Cloud Console, use the Pipeline status panel's Data Freshness graph for an initial analysis of the health of your pipeline.

Sample investigation: tha 1. You have a streaming pipeline that normally produces an output with a data freshness of 20 seconds.

  1. You set an objective of having a 30 second data freshness guarantee. When you review the data freshness graph, you notice that between 9 and 10 AM, data freshness jumped to almost 40 seconds.

  2. You switch to the PIPELINE METRICS tab, then view the Throughput, CPU Utilization, and Memory Utilization graphs for further analysis.