使用 Cloud Composer 启动 Dataflow 流水线

Cloud Composer 1 | Cloud Composer 2

本页面介绍如何使用 DataflowTemplateOperator 从 Cloud Composer 启动 Dataflow 流水线。Cloud Storage Text to BigQuery 流水线是一种批处理流水线,可用于上传 Cloud Storage 中存储的文本文件,使用您提供的 JavaScript 用户定义函数 (UDF) 转换这些文件,然后将结果输出到 BigQuery。

系统会将用户定义的函数、输入文件和 JSON 架构上传到 Cloud Storage 存储桶。引用这些文件的 DAG 将启动 Dataflow 批处理流水线,会将用户定义的函数和 JSON 架构文件应用于输入文件。之后,此内容会上传到 BigQuery 表

概览

  • 在启动工作流之前,您需要创建以下实体:

    • 来自空数据集的空 BigQuery 表,其中包含以下信息列:locationaverage_temperaturemonth 以及可选的 inches_of_rainis_currentlatest_measurement

    • 一个 JSON 文件,用于将 .txt 文件中的数据标准化为 BigQuery 表架构的正确格式。JSON 对象将包含一个 BigQuery Schema 数组,其中每个对象都将包含列名称、输入类型以及它是否为必填字段。

    • 一个 .txt 输入文件,用于保存要批量上传到 BigQuery 表的数据。

    • 以 JavaScript 编写的用户定义函数,该函数会将 .txt 文件的每一行转换为表的相关变量。

    • 一个指向这些文件位置的 Airflow DAG 文件。

  • 接下来,您需要将 .txt 文件、.js UDF 文件和 .json 架构文件上传到 Cloud Storage 存储桶。此外,您还需要将该 DAG 上传到您的 Cloud Composer 环境。

  • 上传 DAG 后,Airflow 将从该 DAG 运行任务。此任务将启动一个 Dataflow 流水线,该流水线会将用户定义的函数应用于 .txt 文件,并根据 JSON 架构设置其格式。

  • 最后,数据将上传到您之前创建的 BigQuery 表。

准备工作

  • 本指南需要熟悉 JavaScript 以编写用户定义的函数。
  • 本指南假定您已经有一个 Cloud Composer 环境。请参阅创建环境以创建环境。您可以在本指南中使用任何版本的 Cloud Composer。
  • 启用 Cloud Composer, Dataflow, Cloud Storage, BigQuery API。

    启用 API

创建具有架构定义的空 BigQuery 表

创建具有架构定义的 BigQuery 表。在本指南的后面部分,您将使用此架构定义。此 BigQuery 表将包含批量上传的结果。

要创建具有架构定义的空表,请执行以下操作:

控制台

  1. 在 Google Cloud 控制台中,前往 BigQuery 页面:

    转到 BigQuery

  2. 在导航面板的资源部分中,展开您的项目。

  3. 在详细信息面板中,点击创建数据集

    点击“创建数据集”按钮

  4. 在“创建数据集”页面的数据集 ID 部分中,将您的数据集命名为 average_weather。将所有其他字段保留为默认状态。

    在数据集 ID 中填入名称 average_weather

  5. 点击创建数据集

  6. 返回导航面板,在资源部分中展开您的项目。然后,点击 average_weather 数据集。

  7. 在详细信息面板中,点击创建表

    点击“创建表”

  8. 创建表页面的来源部分,选择空白表

  9. 创建表页面的目标部分,执行以下操作:

    • 对于数据集名称,选择 average_weather 数据集。

      为 average_weather 数据集选择数据集选项

    • 表名称字段中,输入名称 average_weather

    • 确认表类型设置为原生表

  10. Schema 部分中,输入架构定义。您可以使用以下方法之一:

    • 启用以文本形式修改并以 JSON 数组格式输入表架构,从而手动输入架构信息。输入以下字段:

      [
          {
              "name": "location",
              "type": "GEOGRAPHY",
              "mode": "REQUIRED"
          },
          {
              "name": "average_temperature",
              "type": "INTEGER",
              "mode": "REQUIRED"
          },
          {
              "name": "month",
              "type": "STRING",
              "mode": "REQUIRED"
          },
          {
              "name": "inches_of_rain",
              "type": "NUMERIC"
          },
          {
              "name": "is_current",
              "type": "BOOLEAN"
          },
          {
              "name": "latest_measurement",
              "type": "DATE"
          }
      ]
      
    • 使用添加字段手动输入架构:

      点击“添加字段”即可输入相关字段

  11. 对于分区和聚类设置,保留默认值 No partitioning

  12. 对于高级选项部分的加密,保留默认值 Google-managed key

  13. 点击创建表

bq

使用 bq mk 命令在此数据集中创建空数据集和表。

运行以下命令以创建全球平均天气数据集:

bq --location=LOCATION mk \
    --dataset PROJECT_ID:average_weather

替换以下内容:

  • LOCATION:环境所在的区域。
  • PROJECT_ID项目 ID

运行以下命令,以使用此架构定义在此数据集中创建一个空表:

bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

创建表后,您可以更新表的到期时间、说明和标签,也可以修改架构定义

Python

将此代码保存为 dataflowtemplateoperator_create_dataset_and_table_helper.py,并更新其中的变量以反映您的项目和位置,然后使用以下命令运行该代码:

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

如需向 Cloud Composer 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = "your-project"  # Your GCP Project
location = "US"  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = "average_weather"

def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

创建 Cloud Storage 存储桶

创建一个存储桶以保存工作流所需的所有文件。您稍后在本指南中创建的 DAG 将引用您上传到此存储桶的文件。要创建新的存储桶,请执行以下操作:

控制台

  1. 在 Google Cloud 控制台中打开 Cloud Storage。

    转到 Cloud Storage

  2. 点击创建存储桶,打开存储桶创建表单。

    1. 输入您的存储桶信息,然后点击继续以完成各个步骤:

      • 为您的存储桶指定一个全局唯一的名称。本指南以 bucketName 为例。

      • 选择区域作为位置类型。接下来,选择存储桶数据的存储位置

      • 选择标准作为数据的默认存储类别。

      • 选择统一访问权限控制以访问您的对象。

    2. 点击完成

gsutil

使用 gsutil mb 命令:

gsutil mb gs://bucketName/

替换以下内容:

  • bucketName:您在本指南前面创建的存储桶的名称。

代码示例

C#

如需向 Cloud Composer 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


using Google.Apis.Storage.v1.Data;
using Google.Cloud.Storage.V1;
using System;

public class CreateBucketSample
{
    public Bucket CreateBucket(
        string projectId = "your-project-id",
        string bucketName = "your-unique-bucket-name")
    {
        var storage = StorageClient.Create();
        var bucket = storage.CreateBucket(projectId, bucketName);
        Console.WriteLine($"Created {bucketName}.");
        return bucket;
    }
}

Go

如需向 Cloud Composer 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// createBucket creates a new bucket in the project.
func createBucket(w io.Writer, projectID, bucketName string) error {
	// projectID := "my-project-id"
	// bucketName := "bucket-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()

	bucket := client.Bucket(bucketName)
	if err := bucket.Create(ctx, projectID, nil); err != nil {
		return fmt.Errorf("Bucket(%q).Create: %w", bucketName, err)
	}
	fmt.Fprintf(w, "Bucket %v created\n", bucketName)
	return nil
}

Java

如需向 Cloud Composer 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CreateBucket {
  public static void createBucket(String projectId, String bucketName) {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID to give your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();

    Bucket bucket = storage.create(BucketInfo.newBuilder(bucketName).build());

    System.out.println("Created bucket " + bucket.getName());
  }
}

Python

如需向 Cloud Composer 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

from google.cloud import storage

def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print(f"Bucket {bucket.name} created")

Ruby

如需向 Cloud Composer 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

def create_bucket bucket_name:
  # The ID to give your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.create_bucket bucket_name

  puts "Created bucket: #{bucket.name}"
end

为输出表创建 JSON 格式的 BigQuery 架构

创建与您之前创建的输出表匹配的 JSON 格式的 BigQuery 架构文件。请注意,字段名称、类型和模式必须与之前在 BigQuery 表架构中定义的字段名称、类型和模式一致。此文件会将 .txt 文件中的数据标准化为与您的 BigQuery 架构兼容的格式。将此文件命名为 jsonSchema.json

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

创建 JavaScript 文件来设置数据格式

在此文件中,您将定义 UDF(用户定义的函数),该函数会提供转换输入文件中的文本行的逻辑。请注意,此函数将输入文件中的每一行文本作为自己的参数,因此该函数将针对输入文件的每一行运行一次。将此文件命名为 transformCSVtoJSON.js


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  var weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  return JSON.stringify(weatherInCity);
}

创建输入文件

此文件将包含您要上传到 BigQuery 表的信息。在本地复制此文件,并将其命名为 inputFile.txt

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

将文件上传到存储桶

将以下文件上传到您之前创建的 Cloud Storage 存储桶:

  • JSON 格式的 BigQuery 架构 (.json)
  • JavaScript 用户定义的函数 (transformCSVtoJSON.js)
  • 您要处理的文本的输入文件 (.txt)

控制台

  1. 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。

    进入“存储桶”

  2. 在存储桶列表中,点击您的存储桶。

  3. 在存储桶的“对象”标签页中,执行以下操作之一:

    • 将所需文件从桌面或文件管理器拖放到 Google Cloud 控制台的主窗格中。

    • 点击上传文件按钮,在随即显示的对话框中选择要上传的文件,然后点击打开

gsutil

运行 gsutil cp 命令:

gsutil cp OBJECT_LOCATION gs://bucketName

替换以下内容:

  • bucketName:您在本指南前面部分创建的存储桶的名称。
  • OBJECT_LOCATION:对象的本地路径。例如 Desktop/transformCSVtoJSON.js

代码示例

Python

如需向 Cloud Composer 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

from google.cloud import storage

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

Ruby

如需向 Cloud Composer 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

def upload_file bucket_name:, local_file_path:, file_name: nil
  # The ID of your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  # The path to your file to upload
  # local_file_path = "/local/path/to/file.txt"

  # The ID of your GCS object
  # file_name = "your-file-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.bucket bucket_name, skip_lookup: true

  file = bucket.create_file local_file_path, file_name

  puts "Uploaded #{local_file_path} as #{file.name} in bucket #{bucket_name}"
end

配置 DataflowTemplateOperator

在运行 DAG 之前,请设置以下 Airflow 变量

Airflow 变量
project_id 项目 ID
gce_zone 必须在其中创建 Dataflow 集群的 Compute Engine 可用区
bucket_path 您之前创建的 Cloud Storage 存储桶的位置。

现在,您将引用之前创建的文件来创建启动 Dataflow 工作流的 DAG。复制此 DAG 并将其在本地另存为 composer-dataflow-dag.py

Airflow 2



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
For more info on zones where Dataflow is available see:
https://cloud.google.com/dataflow/docs/resources/locations
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowTemplatedJobStartOperator,
)
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplatedJobStartOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Airflow 1



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
  created.
Learn more about the difference between the two here:
https://cloud.google.com/compute/docs/regions-zones
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplateOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

将 DAG 上传到 Cloud Storage

将您的 DAG 上传到环境存储桶中的 /dags 文件夹。成功完成上传后,您可以通过点击 Cloud Composer Environments 页面上的 DAGs Folder 链接来查看上传作业。

您环境中的 DAGs 文件夹用于存放您的 DAG

查看任务的状态

  1. 转到 Airflow 网页界面
  2. 在“DAG”页面上,点击 DAG 名称(例如 composerDataflowDAG)。
  3. 在 DAG 详细信息页面上,点击 Graph View
  4. 检查状态:

    • Failed:任务周围有红色方框。您还可以将指针悬停在任务上,然后查找 State: Failed

    • Success:任务周围有绿色框。您也可以将指针悬停在任务上并检查是否存在状态:成功

几分钟后,您可以在 Dataflow 和 BigQuery 中查看结果。

在 Dataflow 中查看作业

  1. 在 Google Cloud 控制台中,转到 Dataflow 页面。

    进入 Dataflow

  2. 您的作业名为 dataflow_operator_transform_csv_to_bq,其唯一 ID 附加到名称末尾并带有连字符,如下所示:

    数据流作业具有唯一 ID

  3. 点击名称可查看作业详情

    查看所有招聘信息详情

在 BigQuery 中查看结果

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 您可以使用标准 SQL 提交查询。使用以下查询查看已添加到表中的行:

    SELECT * FROM projectId.average_weather.average_weather