使用 Cloud Composer 启动 Dataflow 流水线

本页面介绍如何使用 DataflowTemplateOperator 从 Cloud Composer 启动 Dataflow 流水线。Cloud Storage Text to BigQuery 流水线是一个批处理流水线,可让您上传 Cloud Storage 中存储的文本文件,使用您提供的 JavaScript 用户定义函数 (UDF) 进行转换,并将结果输出到 BigQuery。

用户定义的函数、输入文件和 json 架构将上传到 Cloud Storage 存储分区。引用这些文件的 DAG 将启动 Dataflow 批处理流水线,该流水线将用户定义的函数和 json 架构文件应用于我们的输入文件。之后,此内容将上传到 BigQuery 表

  • 在启动工作流之前,我们需要创建以下实体:

    • 来自空数据集的空 BigQuery 表,其中包含以下信息列:locationaverage_temperaturemonth 以及可选的 inches_of_rainis_currentlatest_measurement

    • 一个 JSON 文件,用于将 .txt 文件中的数据标准化为 BigQuery 表架构的正确格式。JSON 对象将具有一个 BigQuery Schema 数组,其中每个对象都包含一个列名称、输入类型以及是否为必填字段。

    • 输入 .txt 文件,用于保存我们要批量上传到 BigQuery 表格的数据。

    • 以 JavaScript 编写的用户定义函数,该函数会将 .txt 文件的每一行转换为表的相关变量。

    • 指向上述文件位置的有向无环图 (DAG) 文件。

  • 接下来,我们将 .txt 文件、.js UDF 文件和 .json 架构文件上传到存储分区。我们还会将 DAG 上传到我们的 Cloud Composer 环境。

  • 上传 DAG 后,将启动 Airflow 任务。该任务将启动一个 Cloud Dataflow 流水线,该流水线将用户定义的函数应用于我们的 .txt 文件,并根据 JSON 架构设置其格式。

  • 最后,数据将上传到我们之前创建的 BigQuery 表格。

费用

本教程使用 Google Cloud 的以下收费组件:

  • Cloud Composer
  • Dataflow
  • Cloud Storage
  • BigQuery

前提条件

  • 确保您已创建 Cloud Composer 环境
  • 完成本页所述操作所需的 Cloud Composer 最低版本为 1.9.0。如需查看映像版本,请查看环境详细信息
  • 本教程要求熟悉 JavaScript 以编写用户定义的函数。
  • 启用 Cloud Composer, Dataflow, Cloud Storage, BigQuery API。

    启用 API

设置环境

创建具有架构定义的空 BigQuery 表

首先,您需要创建一个具有架构定义的 BigQuery 表格。在本教程的稍后部分您将使用此架构定义。此 BigQuery 表格将保存批量上传的结果。

要创建具有架构定义的空表,请执行以下操作:

控制台

  1. 在 Cloud Console 中打开 BigQuery 网页界面。
    转到 BigQuery 网页界面

  2. 在导航面板的资源部分中,展开您的项目。

  3. 在窗口右侧的详细信息面板中,点击创建数据集

点击窗口右侧的“创建数据集”按钮

  1. 在“创建数据集”页面的数据集 ID 部分中,将数据集命名为 average_weather。保留其他所有字段的默认状态。

在数据集 ID 中填入名称 average_weather

  1. 点击创建数据集

  2. 返回导航面板,在资源部分中,展开您的项目。然后,点击 average_weather 数据集。

  3. 在窗口右侧的详细信息面板中,点击创建表

点击窗口右侧的“创建表”

  1. 创建表页面的来源部分,选择空白表

  2. 创建表页面的目标部分,执行以下操作:

    • 对于数据集名称,选择 average_weather 数据集。

      为 average_weather 数据集选择数据集选项

    • 表名称字段中,输入名称 average_weather

    • 确认表类型设置为原生表

  3. 架构部分中,输入架构定义。

    • 通过以下方式,手动输入架构信息:

      • 启用以文本形式修改,并以 JSON 数组格式输入表架构。 在此选项中输入以下字段:

        [
        {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
        },
        {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
        },
        {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
        },
        {
        "name": "inches_of_rain",
        "type": "NUMERIC"
        },
        {
        "name": "is_current",
        "type": "BOOLEAN"
        },
        {
        "name": "latest_measurement",
        "type": "DATE"
        }
        ]
        

      • 使用添加字段手动输入架构。

      点击屏幕底部的添加字段以输入字段

  4. 对于分区和聚簇设置,保留默认值 No partitioning

  5. 对于高级选项部分的加密,保留默认值 Google-managed key。默认情况下,Compute Engine 会对静态存储的客户内容进行加密

  6. 点击创建表

CLI

使用带 --location 标志的 bq mk 命令创建空数据集。将 PROJECT_ID 替换为您的项目 ID,将 LOCATION 替换为您的首选位置。我们建议您选择 Composer 环境所在的区域,以最大限度地缩短延迟时间。

复制以下命令以创建平均全球天气数据集:

bq --location=LOCATION mk \
--dataset \
PROJECT_ID:average_weather

要使用我们的架构定义在此数据集中创建一个空表,请在以下命令中将 PROJECT_ID 替换为您的项目 ID,然后在终端中输入该表:

bq mk \
--table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

创建表后,您可以更新表的到期时间、说明和标签,也可以修改架构定义

Python

在运行示例之前,请务必运行以下命令以在环境中安装库:

pip install google.cloud.bigquery

将此代码保存为 dataflowtemplateoperator_create_dataset_and_table_helper.py 并更新其中的变量以反映您的项目和位置,然后使用以下命令运行该代码:

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

在试用此示例之前,请按照《Compute Engine 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 Compute Engine Python API 参考文档


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = 'your-project'  # Your GCP Project
location = 'US'  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = 'average_weather'

def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

创建存储分区

接下来,您需要创建一个存储分区以保存工作流所需的所有文件。您今后创建的 DAG 将引用您已上传到此存储分区的文件。要创建新的存储分区,请执行以下操作:

控制台

  1. 在 Cloud Console 中打开 Cloud Storage。

    打开 Cloud Storage

  2. 点击创建存储分区,打开存储分区创建表单。

  3. 输入您的存储分区信息,然后点击继续以完成各个步骤:

    • 为您的存储分区指定一个全局唯一的名称(在本教程的其余部分,该名称将被引用为 bucketName)。

    • 选择区域作为位置类型。接下来,选择一个要用于永久存储该存储分区数据的位置

    • 选择标准作为数据的默认存储类别。

    • 选择统一访问权限控制以访问您的对象。

  4. 点击完成

gsutil

  1. 使用 gsutil mb 命令:
    gsutil mb gs://bucketName/
    

代码示例

Go

在试用此示例之前,请按照《Compute Engine 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 Compute Engine Go API 参考文档

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// createBucket creates a new bucket in the project.
func createBucket(w io.Writer, projectID, bucketName string) error {
	// projectID := "my-project-id"
	// bucketName := "bucket-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %v", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	bucket := client.Bucket(bucketName)
	if err := bucket.Create(ctx, projectID, nil); err != nil {
		return fmt.Errorf("Bucket(%q).Create: %v", bucketName, err)
	}
	fmt.Fprintf(w, "Bucket %v created\n", bucketName)
	return nil
}

Python

在试用此示例之前,请按照《Compute Engine 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 Compute Engine Python API 参考文档

from google.cloud import storage

def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print("Bucket {} created".format(bucket.name))

为输出表创建 JSON 格式的 BigQuery 架构

创建与您之前创建的输出表匹配的 JSON 格式的 BigQuery 架构文件。请注意,字段名称、类型和模式必须与之前在 BigQuery 表架构中的定义相匹配。此文件会将 .txt 文件中的数据标准化为与 BigQuery 架构兼容的格式。将此文件命名为 jsonSchema.json

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

创建 JavaScript(.js) 文件以设置数据格式

在此文件中,您将定义 UDF(用户定义的函数),该函数提供转换输入文件中文本行的逻辑。请注意,此函数会将输入文件中的每一行文本作为其参数,因此该函数将针对输入文件的每一行运行一次。将此文件命名为 transformCSVtoJSON.js

Node.js

在试用此示例之前,请按照《Compute Engine 快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Compute Engine Node.js API 参考文档


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  const weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  var jsonString = JSON.stringify(weatherInCity);
  return jsonString;
}

创建输入文件

此文件将保存您要上传到 BigQuery 表格的信息。在本地复制此文件并将其命名为 inputFile.txt

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

将文件上传到存储分区并创建暂存文件夹

将以下文件上传到您之前创建的存储分区:

  • JSON 格式的 BigQuery 架构 (.json)
  • JavaScript 用户定义的函数 (transformCSVtoJSON.js)
  • 您要处理的文本的输入文件 (.txt)

控制台

  1. 在 Google Cloud Console 中打开 Cloud Storage 浏览器。
    打开 Cloud Storage 浏览器
  2. 在存储分区列表中,点击存储分区 bucketName

  3. 在存储分区的对象标签中,执行以下任一操作:

    • 将所需文件从桌面或文件管理器拖放到 Cloud Console 的主窗格中。

    • 点击上传文件按钮,在出现的对话框中选择要上传的文件,然后点击打开

gsutil

使用 [gsutil cp] 命令:

gsutil cp [OBJECT_LOCATION] gs://bucketName

其中:

  • [OBJECT_LOCATION] 是对象的本地路径。例如 Desktop/dog.png

  • [bucketName] 是您之前创建的全局唯一的存储分区名称。

如果成功,则响应类似如下示例:

Operation completed over 1 objects/58.8 KiB.

Python

Python

在试用此示例之前,请按照《Compute Engine 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 Compute Engine Python API 参考文档

from google.cloud import storage

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # bucket_name = "your-bucket-name"
    # source_file_name = "local/path/to/file"
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)

    print(
        "File {} uploaded to {}.".format(
            source_file_name, destination_blob_name
        )
    )

DataflowTemplateOperator 配置

在运行示例之前,请务必设置适当的环境变量。您可以使用 gcloud 或 Airflow 界面执行此操作:

gcloud

输入以下命令:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set project_id PROJECT_ID

其中:

  • ENVIRONMENT 是 Cloud Composer 环境的名称
  • LOCATION 是 Cloud Composer 环境所在的区域
  • PROJECT_ID 是您的 Google Cloud 项目 ID。
gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set gce_region GCE_REGION

其中:

  • GCE_REGION 是您的 Compute Engine 区域的区域。
gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set gce_zone GCE_ZONE

其中:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set bucket_path BUCKET_PATH

其中:

  • BUCKET_PATH 是您之前创建的 Cloud Storage 存储分区的位置。

Airflow 界面

  1. 在工具栏中,点击 Admin > Variables

  2. 点击 Create

  3. 请输入以下信息:

    • Key:project_id
    • Val:PROJECT_ID 这是您的 Google Cloud 项目 ID
  4. 点击 Save and Add Another 选择左下角的 save and add another option 选项

  5. 请输入以下信息:

    • Key:bucket_path
    • 值:BUCKET_PATH 这是您的 Cloud Storage 存储分区的位置(例如,gs://my-bucket)。
  6. 点击 Save and Add Another

  7. 请输入以下信息:

    • Key:gce_region
    • 值:GCE_REGION 这是您的 Compute Engine 区域。
  8. 点击 Save and Add Another

  9. 请输入以下信息:

  10. 点击保存

现在,您将引用之前创建的文件来创建启动 Dataflow 工作流的 DAG。复制此 DAG 并在本地将其保存为 composer-dataflow-dag.py

Python

在试用此示例之前,请按照《Compute Engine 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 Compute Engine Python API 参考文档



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/concepts.html#variables
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
* gce_region - Google Compute Engine region where Cloud Dataflow cluster should be
  created.
Learn more about the difference between the two here:
https://cloud.google.com/compute/docs/regions-zones
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.utils.dates import days_ago

bucket_path = models.Variable.get("bucket_path")
project_id = models.Variable.get("project_id")
gce_zone = models.Variable.get("gce_zone")
gce_region = models.Variable.get("gce_region")

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your region
        "region": gce_region,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "temp_location": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:

    start_template_job = DataflowTemplateOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

将 DAG 上传到 Cloud Storage

将您的 DAG 上传到您的环境文件夹。上传成功完成后,您应该可以在 Cloud Composer 环境页面上点击 DAG 文件夹链接来查看上传的 DAG。

您环境中的 DAG 文件夹存放您的 DAG

查看任务的状态

  1. 转到 Airflow 网页界面
  2. 在 DAG 页面上,点击 DAG 名称(例如 composerDataflowDAG)。
  3. 在 DAG 详细信息页面上,点击 Graph View
  4. 检查状态:

    • 失败:任务被红色框圈起。您还可以将指针悬停在任务上,然后查看 State: Failed任务被红色框圈起,表示任务失败

几分钟后,您应该会在 Dataflow 和 BigQuery 中看到结果。

在 Dataflow 中查看作业

  1. 转到 Dataflow 网页界面。 转到 Dataflow 网页界面

  2. 您的作业将命名为 dataflow_operator_transform_csv_to_bq,并在名称末尾附加一个唯一 ID,以连字符连接,例如:数据流作业具有唯一 ID

  3. 点击名称可查看作业详情。详细了解 Dataflow 作业详情。 查看以下所有作业详情

在 BigQuery 中查看结果

  1. 转到 BigQuery 网页界面。 转到 BigQuery 网页界面

  2. 您可以使用标准 SQL 提交查询。使用以下查询可查看已添加到表中的行:

    SELECT * FROM projectId.average_weather
    

清理

为避免您的 Google Cloud Platform 帐号产生费用,您可以删除本教程中使用的资源:

  1. 删除 Cloud Composer 环境
  2. 删除 Cloud Composer 环境的 Cloud Storage 存储分区。删除 Cloud Composer 环境不会删除其存储分区。
  3. 停止 Dataflow 作业
  4. 删除 BigQuery 表BigQuery 数据集