使用“针对每个并行”任务将数据插入到 BigQuery 中

在本教程中,您将创建 Apigee 集成和子集成以处理一系列记录。对于每条记录,主集成会异步调用子集成,即获取每条记录的数据并将其作为行插入 BigQuery 数据集中的表。

在本教程中,您将完成以下任务:

准备工作

  • 确保您有权访问 Apigee Integration。
  • 在 Google Cloud 项目中执行以下操作:

    • 将以下角色授予您要用于创建连接的服务账号:
      • roles/bigquery.dataEditor
      • roles/bigquery.readSessionUser
      • roles/secretmanager.viewer
      • roles/secretmanager.secretAccessor
    • 启用以下服务:
      • secretmanager.googleapis.com (Secret Manager API)
      • connectors.googleapis.com (Connectors API)

      如果之前没有为您的项目启用这些服务,则在“创建连接”页面中创建连接时系统会提示您启用。

设置 BigQuery 连接

首先,创建要用于本教程的 BigQuery 数据集和表。创建数据集和表后,创建 BigQuery 连接。您将在本教程后面的集成中使用此连接。

设置 BigQuery 数据集和表

如需设置 BigQuery 数据集和表,请执行以下步骤:

  1. Cloud 控制台中,选择您的 Google Cloud 项目。
  2. 如需从 Google Cloud 控制台启动 Cloud Shell 会话,请点击 Cloud 控制台中的 “激活 Cloud Shell”图标 激活 Cloud Shell 图标。此操作将在 Google Cloud 控制台底部的窗格中启动会话。
  3. 如需启用 BigQuery API,请在 Cloud Shell 终端中输入以下命令:
    export PROJECT_ID=project_id
    export REGION=region
    gcloud services enable --project "${PROJECT_ID}" \
        bigquery.googleapis.com \
        bigquerystorage.googleapis.com
    在此命令中,进行如下替换:
    • project_id 替换为 Google Cloud 项目的 ID。
    • region 替换为您用于创建 BigQuery 数据集的区域。
  4. 要创建名为 bq_tutorial 的 BigQuery 数据集,请在 Cloud Shell 终端中输入以下命令:
          bq  --project_id ${PROJECT_ID} --location ${REGION} mk bq_tutorial
        
  5. 如需创建名为 tutorial 的 BigQuery 表,请在 Cloud Shell 终端中输入以下命令:
          bq --project_id ${PROJECT_ID} \
            query  \
            --nouse_legacy_sql \
          'create table bq_tutorial.tutorial (
          unique_key STRING NOT NULL,
          created_date STRING,
          closed_date STRING,
          agency STRING,
          agency_name STRING,
          complaint_type STRING,
          descriptor STRING,
          location_type STRING,
          incident_zip STRING,
          incident_address STRING,
          street_name STRING,
          cross_street_1 STRING,
          cross_street_2 STRING,
          intersection_street_1 STRING,
          intersection_street_2 STRING,
          address_type STRING,
          city STRING,
          landmark STRING,
          facility_type STRING,
          status STRING,
          due_date STRING,
          resolution_action_updated_date STRING,
          community_board STRING,
          borough STRING,
          x_coordinate_state_plane STRING,
          y_coordinate_state_plane STRING,
          park_facility_name STRING,
          park_borough STRING,
          school_name STRING,
          school_number STRING,
          school_region STRING,
          school_code STRING,
          school_phone_number STRING,
          school_address STRING,
          school_city STRING,
          school_state STRING,
          school_zip STRING,
          school_not_found STRING,
          school_or_citywide_complaint STRING,
          vehicle_type STRING,
          taxi_company_borough STRING,
          taxi_pick_up_location STRING,
          bridge_highway_name STRING,
          bridge_highway_direction STRING,
          bridge_highway_segment STRING,
          road_ramp STRING,
          garage_lot_name STRING,
          ferry_direction STRING,
          ferry_terminal_name STRING,
          latitude STRING,
          longitude STRING,
          location STRING
          ) '
      
  6. 验证您的 BigQuery 表是否已创建。
    1. Cloud 控制台页面中,点击 导航菜单
    2. 分析部分,点击 BigQuery
    3. 展开项目并确认 bq_tutorial 数据集已列出。
    4. 展开 bq_tutorial 数据集并确认 tutorial 表已列出。
    5. 点击文档表以查看架构。

创建 BigQuery 连接

接下来,您将创建 BigQuery 连接。借助 BigQuery 连接,您可以在 BigQuery 表中插入、读取、更新和删除行,并在集成中使用生成的输出。创建 BigQuery 连接后,您将在本教程的集成中使用此连接,将行添加到 BigQuery 表中。

如需创建 BigQuery 连接,请完成以下步骤:

  1. Cloud 控制台中,选择您的 Google Cloud 项目。
  2. 打开连接页面
  3. 点击 + 新建以打开创建连接页面。
  4. 配置连接:
    1. 创建连接部分,完成以下操作:
      • 连接器:从可用连接器的下拉列表中选择 BigQuery
      • 连接器版本:从可用版本的下拉列表中选择最新的连接器版本。
      • 连接名称字段中,输入连接实例的名称。 对于本教程,输入 connector-bq-tutorial
      • (可选)添加连接实例的说明
      • 服务账号:选择具有所需角色的服务账号。
      • 项目 ID:输入 BigQuery 数据所在的 Google Cloud 项目的 ID。
      • 数据集 ID:输入要使用的 BigQuery 数据集的 ID。对于本教程,请输入 bq_tutorial
      • (可选)点击 + 添加标签,以键值对的形式添加标签。
      • 点击下一步
    2. 位置:选择将从哪个区域运行连接。连接器支持的区域包括:

        如需查看所有受支持区域的列表,请参阅位置

      • 点击下一步
    3. 身份验证:BigQuery 连接无需进行身份验证配置。点击下一步
    4. 检查:检查连接的配置详细信息。本部分会显示新连接的连接和身份验证详情,以供您检查。
  5. 点击创建

设置子集成

在本教程中,子集成接受主集成向其发送的每条记录,并将其作为一行插入 bq_tutorial 数据集的 tutorial 表中。

创建子集成

如需创建子集成,请完成以下步骤:

  1. Apigee 界面中,选择您的 Apigee 组织
  2. 点击开发 > 集成
  3. 点击创建集成
  4. 创建数据集对话框中,执行以下操作:
    • 输入名称,例如,输入 Process-each-record
    • (可选)输入说明。例如,输入 API 触发器以处理每条记录(子集成)
    • 选择您要在其中创建集成的区域。
  5. 点击创建以打开集成编辑器。

添加 API 触发器

要向集成添加 API 触发器,请执行以下操作:

  1. 在集成设计器中,选择添加任务/触发器 > 触发器以显示可用触发器列表。
  2. API 触发器元素拖动至集成编辑器。

添加数据映射任务

如需在集成中添加数据映射任务,请完成以下步骤:

  1. 在集成设计器中选择添加任务/触发器 > 任务以显示可用任务列表。
  2. 数据映射元素拖动至集成编辑器。

配置 BigQuery 连接

现在,您可以使用先前在子集成中创建的 BigQuery 连接。如需在此集成中配置 BigQuery 连接,请完成以下步骤:

  1. 在集成设计器中选择添加任务/触发器 > 任务以显示可用任务列表。
  2. 连接器元素拖动至集成编辑器。
  3. 点击设计器上的连接器任务元素,以查看任务配置窗格。
  4. 点击右侧面板上的修改图标,然后将标签更新为将行插入 BigQuery
  5. 点击配置任务

    此时将显示配置连接器任务对话框。

  6. 配置连接器任务对话框中,执行以下操作:
    1. 选择您在其中创建了 BigQuery 连接的连接区域。
    2. 选择要使用的 BigQuery 连接。对于本教程,选择 connector-bq-tutorial
    3. 选择连接后,系统会显示类型列。选择实体,然后从可用实体列表中选择教程
    4. 选择类型后,系统将显示操作列。选择创建
    5. 点击完成以完成连接配置并关闭对话框。

连接集成元素

接下来,添加边缘连接,将 API 触发器连接到数据映射任务,将数据映射任务连接到连接器任务。边缘连接是集成中的任何两个元素之间的连接。如需详细了解边缘和边缘条件,请参阅边缘

如需添加边缘连接,请完成以下步骤:

  1. 点击 API 触发器元素底部的分支控制点。将边缘连接拖放到数据映射元素顶部的联接控制点。
  2. 点击数据映射元素底部的分支控制点。在连接器元素顶部的联接控制点处拖放边缘连接。

配置数据映射任务

如需配置数据映射任务,请完成以下步骤:

  1. 在集成编辑器中,点击数据映射任务以查看任务配置窗格。
  2. 点击打开数据映射编辑器
  3. 数据映射编辑器中,点击添加以添加新变量。
  4. 创建变量对话框中,输入以下信息:
    • 名称:输入 record
    • 数据类型:选择 JSON
    • 架构:选择从示例 JSON 载荷推断。输入以下示例 JSON 载荷:
                  {
                    "unique_key":"304271",
                    "created_date":"02/06/2007 12:00:00 AM",
                    "closed_date":"03/01/2007 12:00:00 AM",
                    "agency":"TLC",
                    "agency_name":"Taxi and Limousine Commission",
                    "complaint_type":"Taxi Complaint",
                    "descriptor":"Driver Complaint",
                    "location_type":"Street",
                    "incident_zip":"10001",
                    "incident_address":"",
                    "street_name":"",
                    "cross_street_1":"",
                    "cross_street_2":"",
                    "intersection_street_1":"WEST 29 STREET",
                    "intersection_street_2":"7 AVENUE",
                    "address_type":"INTERSECTION",
                    "city":"NEW YORK",
                    "landmark":"",
                    "facility_type":"N/A",
                    "status":"Closed",
                    "due_date":"02/28/2007 12:00:00 AM",
                    "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                    "community_board":"05 MANHATTAN",
                    "borough":"MANHATTAN",
                    "x_coordinate_state_plane":"986215",
                    "y_coordinate_state_plane":"211740",
                    "park_facility_name":"",
                    "park_borough":"MANHATTAN",
                    "school_name":"",
                    "school_number":"",
                    "school_region":"",
                    "school_code":"",
                    "school_phone_number":"",
                    "school_address":"",
                    "school_city":"",
                    "school_state":"",
                    "school_zip":"",
                    "school_not_found":"",
                    "school_or_citywide_complaint":"",
                    "vehicle_type":"",
                    "taxi_company_borough":"",
                    "taxi_pick_up_location":"Other",
                    "bridge_highway_name":"",
                    "bridge_highway_direction":"",
                    "road_ramp":"",
                    "bridge_highway_segment":"",
                    "garage_lot_name":"",
                    "ferry_direction":"",
                    "ferry_terminal_name":"",
                    "latitude":"40.74785373937869",
                    "longitude":"-73.99290823133913",
                    "location":"(40.74785373937869, -73.99290823133913)"
                  }
                
  5. 点击创建
  6. 创建变量后,在数据映射编辑器中,完成以下步骤:
    • 将新的 record 变量拖动到输入列。
    • connectorInputPayload 变量拖动到输出列。
  7. 关闭数据映射编辑器以返回到集成编辑器。

发布子集成

如需发布子集成,请在集成编辑器中点击发布

设置主集成

在本部分中,您将设置主集成,即使用针对每个并行任务处理每条记录。然后,主集成会为每条记录调用一次子集成。

创建主集成

要创建主集成,请完成以下步骤:

  1. Apigee 界面中,选择您的 Apigee 组织
  2. 点击开发 > 集成
  3. 点击创建集成
  4. 创建数据集对话框中,执行以下操作:
    • 输入名称,例如,输入 process-records
    • (可选)输入说明。例如,输入 API 触发器以处理记录(主集成)
    • 选择您要在其中创建集成的区域。
  5. 点击创建以打开集成编辑器。

添加 API 触发器

要向集成添加 API 触发器,请执行以下操作:

  1. 在集成设计器中,选择添加任务/触发器 > 触发器以显示可用触发器列表。
  2. API 触发器元素拖动至集成编辑器。

添加一个“针对每个并行”任务

如需在集成中添加“针对每个并行”任务,请完成以下步骤:

  1. 在集成设计器中选择添加任务/触发器 > 任务以显示可用任务列表。
  2. 针对每个并行元素拖动到集成编辑器中。

连接集成元素

接下来,添加一个边缘连接,将 API 触发器连接到“针对每个并行”任务。

如需添加边缘连接,请点击 API 触发器元素底部的创建分支控制点。将 Edge 连接拖放到“针对每个并行”任务元素顶部的联接控制点。

配置“针对每个并行”任务

要配置“针对每个并行”任务,请完成以下步骤:

  1. 在集成编辑器中,点击针对每个并行任务,以查看任务配置窗格。
  2. 数组选择 > 迭代列表下,点击添加新变量以添加新变量。
  3. 创建变量对话框中,输入以下详细信息:
    • 名称:输入 records
    • 数据类型:选择 JSON
    • 架构:选择从示例 JSON 载荷推断。输入以下示例 JSON 载荷:
                    [{
                      "unique_key":"304271",
                      "created_date":"02/06/2007 12:00:00 AM",
                      "closed_date":"03/01/2007 12:00:00 AM",
                      "agency":"TLC",
                      "agency_name":"Taxi and Limousine Commission",
                      "complaint_type":"Taxi Complaint",
                      "descriptor":"Driver Complaint",
                      "location_type":"Street",
                      "incident_zip":"10001",
                      "incident_address":"",
                      "street_name":"",
                      "cross_street_1":"",
                      "cross_street_2":"",
                      "intersection_street_1":"WEST 29 STREET",
                      "intersection_street_2":"7 AVENUE",
                      "address_type":"INTERSECTION",
                      "city":"NEW YORK",
                      "landmark":"",
                      "facility_type":"N/A",
                      "status":"Closed",
                      "due_date":"02/28/2007 12:00:00 AM",
                      "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                      "community_board":"05 MANHATTAN",
                      "borough":"MANHATTAN",
                      "x_coordinate_state_plane":"986215",
                      "y_coordinate_state_plane":"211740",
                      "park_facility_name":"",
                      "park_borough":"MANHATTAN",
                      "school_name":"",
                      "school_number":"",
                      "school_region":"",
                      "school_code":"",
                      "school_phone_number":"",
                      "school_address":"",
                      "school_city":"",
                      "school_state":"",
                      "school_zip":"",
                      "school_not_found":"",
                      "school_or_citywide_complaint":"",
                      "vehicle_type":"",
                      "taxi_company_borough":"",
                      "taxi_pick_up_location":"Other",
                      "bridge_highway_name":"",
                      "bridge_highway_direction":"",
                      "road_ramp":"",
                      "bridge_highway_segment":"",
                      "garage_lot_name":"",
                      "ferry_direction":"",
                      "ferry_terminal_name":"",
                      "latitude":"40.74785373937869",
                      "longitude":"-73.99290823133913",
                      "location":"(40.74785373937869, -73.99290823133913)"
                    }]
                  
  4. 点击创建
  5. 子集成详情部分中,输入以下信息:
    • API 触发器 ID:在子集成中选择 API 触发器元素。例如,选择 Process-each-record_API_1
    • 执行策略:选择 ASYNC
    • 选择 Run a single integration
  6. On each execution 部分的 Where to map individual array elements 中,输入子集成的数据映射任务中的变量名称。在这种情况下,请输入记录。 系统仅会列出已发布集成的子集成变量。如果变量未列出,请刷新页面,因为在子集成发布后,变量需要一些时间才会显示。

发布主要集成

如需发布主要集成,请在集成编辑器中点击发布

创建 Apigee API 代理

要触发集成,请创建一个 Apigee API 代理,并将您的集成作为目标。为此,请完成以下步骤:

  1. 在 Google Cloud 项目中创建服务账号,并为其分配所需的角色 Apigee Integration Invoker。如需详细了解如何分配 IAM 角色,请参阅 IAM 角色和权限
  2. 登录 Apigee 界面
  3. 在导航栏中,依次选择开发 > API 代理
  4. 点击新建
  5. 创建代理向导中,点击集成目标
  6. 代理详情页面上,输入以下信息:
    • 名称:输入 process-records
    • 基本路径:输入 /v1/process-records
    • 集成区域:选择您用于创建集成的区域。
    • 集成目标:选择您创建的主要集成。在本教程中,选择 process-records
    • 触发器:选择所需的 API 触发器。在本教程中,选择 process-records_API_1
    • 端点类型:选择同步
  7. 点击下一步,然后在通用政策页面上再次点击下一步
  8. 摘要页面上,点击创建
  9. 创建代理时,点击修改代理
  10. 点击开发标签页。
  11. 在“政策”下,使用以下政策更新设置集成请求政策:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <SetIntegrationRequest continueOnError="false" enabled="true" name="set-integration-request">
      <DisplayName>Set Integration Request</DisplayName>
      <ProjectId>project_id</ProjectId>
      <IntegrationName>process-records</IntegrationName>
      <IntegrationRegion>region</IntegrationRegion>
      <ApiTrigger>api_trigger/process-records_API_1</ApiTrigger>
      <Parameters>
        <Parameter name="records" type="json" ref="request.content"/>
      </Parameters>
    </SetIntegrationRequest>
    您需要进行如下替换:
    • project_id 替换为 Google Cloud 项目的 ID。
    • region 替换为您创建集成的区域。
  12. 保存更改。
  13. 点击部署
  14. 修订版本中,选择更新后的修订版本。
  15. 环境中,选择要用于部署集成的环境。
  16. 部署 API 代理时,请提供您之前创建服务账号时使用的并具有所需角色的电子邮件地址。
  17. 点击部署

测试您的集成

如需测试集成,请完成以下步骤:

  1. 将示例数据下载到 Cloud Shell:
    1. 如需从 Google Cloud 控制台启动 Cloud Shell 会话,请点击 Cloud 控制台中的“激活 Cloud Shell”图标 激活 Cloud Shell 图标。此操作将在 Google Cloud 控制台底部的窗格中启动会话。
    2. 在 Cloud Shell 终端中输入以下命令:
      wget https://raw.githubusercontent.com/GoogleCloudPlatform/application-integration-samples/main/assets/bq-sample-dataset.json
              
    3. 要验证是否已下载示例数据,请在 Cloud Shell 终端中输入以下命令:
      ls -la bq-sample-dataset.json
      下载的文件在 Cloud Shell 终端中列出。
  2. 如需在 Apigee 代理中启用调试功能,请完成以下步骤:
    1. 切换回您刚刚在 Apigee 界面中创建的 API 代理。
    2. 点击调试标签页。
    3. 点击启动调试会话,然后输入以下信息:
      1. 选择要运行调试会话的环境
      2. (可选)从过滤条件下拉列表中,选择一个过滤条件以应用于您要创建的调试会话中的所有事务。默认值为 None (All transactions),其中包含调试数据中的所有事务。
      3. 点击启动
  3. 如需开始测试,请在 Cloud Shell 终端中输入以下命令:
    export APIGEE_DOMAIN=<your-Apigee-domain>
    export SAMPLE_DOCS=$(jq  $(r=$((RANDOM % 1000)) ; echo ".[$r:$((r + 3))]") < bq-sample-dataset.json)
    
    curl -X POST https://$APIGEE_DOMAIN/v1/process-records \
      -H 'Content-Type: application/json' \
      -d "$SAMPLE_DOCS"
          
    此测试从样本数据集中选择三个随机条目,并将它们传递给主集成。主集成会将每个条目传递到子集成,然后这种集成会将数据作为行添加到 BigQuery 表中。
  4. 如需验证您的 BigQuery 表现在是否包含这些记录,请执行以下步骤:
    1. Cloud 控制台页面中,点击 导航菜单
    2. 分析部分,点击 BigQuery
    3. 展开您的项目,然后点击 bq_tutorial 数据集。
    4. 展开 bq_tutorial 数据集,然后点击 tutorial 表。
    5. 点击表探索器标签页以查看插入的记录。

后续步骤

尝试构建与其他连接器的集成。如需查看所有支持的连接器列表,请参阅连接器参考文档