请参阅 Application Integration 支持的连接器

使用“针对每个并行”任务将数据插入到 BigQuery 中

在本教程中,您将创建 Application Integration 和子集成以处理一系列记录。对于每条记录,主集成会异步调用子集成,即获取每条记录的数据并将其作为行插入 BigQuery 数据集中的表。

在本教程中,您将完成以下任务:

准备工作

  • 确保您有权访问 Application Integration。
  • 在 Google Cloud 项目中执行以下操作:

    • 将以下角色授予您要用于创建连接的服务账号:
      • roles/bigquery.dataEditor
      • roles/bigquery.readSessionUser
      • roles/secretmanager.viewer
      • roles/secretmanager.secretAccessor
    • 启用以下服务:
      • secretmanager.googleapis.com (Secret Manager API)
      • connectors.googleapis.com (Connectors API)

      如果之前没有为您的项目启用这些服务,则在“创建连接”页面中创建连接时系统会提示您启用。

设置 BigQuery 连接

首先,创建要用于本教程的 BigQuery 数据集和表。创建数据集和表后,创建 BigQuery 连接。您将在本教程后面的集成中使用此连接。

设置 BigQuery 数据集和表

如需设置 BigQuery 数据集和表,请执行以下步骤:

  1. Cloud 控制台中,选择您的 Google Cloud 项目。
  2. 如需从 Google Cloud 控制台启动 Cloud Shell 会话,请点击 Cloud 控制台中的 “激活 Cloud Shell”图标 激活 Cloud Shell 图标。此操作将在 Google Cloud 控制台底部的窗格中启动会话。
  3. 如需启用 BigQuery API,请在 Cloud Shell 终端中输入以下命令:
    export PROJECT_ID=project_id
    export REGION=region
    gcloud services enable --project "${PROJECT_ID}" \
        bigquery.googleapis.com \
        bigquerystorage.googleapis.com
    在此命令中,进行如下替换:
    • project_id 替换为 Google Cloud 项目的 ID。
    • region 替换为您用于创建 BigQuery 数据集的区域。
  4. 要创建名为 bq_tutorial 的 BigQuery 数据集,请在 Cloud Shell 终端中输入以下命令:
          bq  --project_id ${PROJECT_ID} --location ${REGION} mk bq_tutorial
        
  5. 如需创建名为 tutorial 的 BigQuery 表,请在 Cloud Shell 终端中输入以下命令:
          bq --project_id ${PROJECT_ID} \
            query  \
            --nouse_legacy_sql \
          'create table bq_tutorial.tutorial (
          unique_key STRING NOT NULL,
          created_date STRING,
          closed_date STRING,
          agency STRING,
          agency_name STRING,
          complaint_type STRING,
          descriptor STRING,
          location_type STRING,
          incident_zip STRING,
          incident_address STRING,
          street_name STRING,
          cross_street_1 STRING,
          cross_street_2 STRING,
          intersection_street_1 STRING,
          intersection_street_2 STRING,
          address_type STRING,
          city STRING,
          landmark STRING,
          facility_type STRING,
          status STRING,
          due_date STRING,
          resolution_action_updated_date STRING,
          community_board STRING,
          borough STRING,
          x_coordinate_state_plane STRING,
          y_coordinate_state_plane STRING,
          park_facility_name STRING,
          park_borough STRING,
          school_name STRING,
          school_number STRING,
          school_region STRING,
          school_code STRING,
          school_phone_number STRING,
          school_address STRING,
          school_city STRING,
          school_state STRING,
          school_zip STRING,
          school_not_found STRING,
          school_or_citywide_complaint STRING,
          vehicle_type STRING,
          taxi_company_borough STRING,
          taxi_pick_up_location STRING,
          bridge_highway_name STRING,
          bridge_highway_direction STRING,
          bridge_highway_segment STRING,
          road_ramp STRING,
          garage_lot_name STRING,
          ferry_direction STRING,
          ferry_terminal_name STRING,
          latitude STRING,
          longitude STRING,
          location STRING
          ) '
      
  6. Verify that your BigQuery table is created.
    1. In the Cloud console page, click the Navigation menu.
    2. In the Analytics section, click BigQuery.
    3. Expand your project and confirm that the bq_tutorial dataset is listed.
    4. Expand the bq_tutorial dataset and confirm that the tutorial table is listed.
    5. Click the documents table to view the schema.

Create a BigQuery connection

Next, you'll create a BigQuery connection. A BigQuery connection lets you insert, read, update and delete rows in a BigQuery table and use the resulting output in an integration. After creating the BigQuery connection, you'll use this connection in an integration later in this tutorial to add rows to the BigQuery table.

To create a BigQuery connection, complete the following steps:

  1. In the Cloud console page, select your Google Cloud project.
  2. Open the connections page.
  3. Click + CREATE NEW to open the Create Connection page.
  4. Configure the connection:
    1. In the Create Connection section, complete the following:
      • Connector: Select BigQuery from the drop down list of available Connectors.
      • Connector version: Select the latest Connector version from the drop down list of available versions.
      • In the Connection Name field, enter a name for the Connection instance. For this tutorial, enter connector-bq-tutorial.
      • Optionally, add a Description of the connection instance.
      • Service Account: Select a service account that has the required roles.
      • Project ID: Enter the ID of the Google Cloud project where the BigQuery data resides.
      • Dataset ID: Enter the ID of the BigQuery dataset that you want to use. For this tutorial, enter bq_tutorial.
      • Optionally, click + ADD LABEL to add a label in the form of a key/value pair.
      • Click Next.
    2. Location: Select a region from where the connection will run. Supported regions for connectors include:

        For the list of all the supported regions, see Locations.

      • Click Next.
    3. Authentication: The BigQuery connection does not require authentication configuration. Click Next.
    4. Review: Review your connection's configuration details. In this section, the connection and authentication details of the new connection are displayed for your review.
  5. Click Create.

Set up a sub-integration

In this tutorial, the sub-integration takes each record sent to it by the main integration and inserts it as a row in the tutorial table in the bq_tutorial dataset.

Create a sub-integration

To create the sub-integration, complete the following steps:

  1. In the Google Cloud console, go to the Application Integration page.

    Go to Application Integration

  2. In the navigation menu, click Integrations. The Integrations List page appears.
  3. Click Create integration.
  4. In the Create Integration dialog, do the following:
    • Enter a name, for example, enter Process-each-record
    • Optionally, enter a description. For example, enter API Trigger to process each record (sub-integration)
    • Select the region where you want to create your integration.
  5. Click Create to open the integration editor.

Add an API Trigger

To add an API Trigger to the integration, do the following:

  1. In the integration editor, select Add a task/trigger > Triggers to display a list of available triggers.
  2. Drag the API Trigger element to the integration editor.

Add a Data Mapping task

To add a Data Mapping task in the integration, complete the following steps:

  1. Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
  2. Drag the Data Mapping element to the integration editor.

Configure the BigQuery connection

Now you are ready to use the BigQuery connection that you created earlier in the sub-integration. To configure the BigQuery connection in this integration, complete the following steps:

  1. Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
  2. Drag the Connectors element to the integration editor.
  3. Click the Connectors task element on the designer to view the task configuration pane.
  4. Click the edit icon on the right panel and update the Label to Insert row to BigQuery.
  5. Click Configure task.

    The Configure connector task dialog appears.

  6. In the Configure connector task dialog, do the following:
    1. Select the connection region where you created your BigQuery connection.
    2. Select the BigQuery connection that you want to use. For this tutorial, select connector-bq-tutorial.
    3. Once a connection is chosen, the Type column appears. Select Entities and then tutorial from the list of available entities.
    4. Once a type is chosen, the Operation column appears. Select Create.
    5. Click Done to complete the connection configuration and close the dialog.

Connect the integration elements

Next, add edge connections to connect the API Trigger to the Data Mapping task and the Data Mapping task to the Connectors task. An edge connection is a connection between any two elements in an integration. For more information on edges and edge conditions, see Edges.

To add the edge connections, complete the following steps:

  1. Click the Fork control point at the bottom of the API Trigger element. Drag and drop the edge connection at the Join control point at the top of the Data Mapping element.
  2. Click the Fork control point at the bottom of the Data Mapping element. Drag and drop the edge connection at the Join control point at the top of the Connectors element.

Configure the Data Mapping task

To configure the Data Mapping task, complete the following steps:

  1. In the integration editor, click the Data Mapping task to view the task configuration pane.
  2. Click Open Data Mapping Editor.
  3. In the Data Mapping Editor, click Add to add a new variable.
  4. In the Create Variable dialog, enter the following information:
    • Name: Enter record.
    • Data Type: Select JSON.
    • Schema: Select Infer from a sample JSON payload. Enter the following sample JSON payload:
                  {
                    "unique_key":"304271",
                    "created_date":"02/06/2007 12:00:00 AM",
                    "closed_date":"03/01/2007 12:00:00 AM",
                    "agency":"TLC",
                    "agency_name":"Taxi and Limousine Commission",
                    "complaint_type":"Taxi Complaint",
                    "descriptor":"Driver Complaint",
                    "location_type":"Street",
                    "incident_zip":"10001",
                    "incident_address":"",
                    "street_name":"",
                    "cross_street_1":"",
                    "cross_street_2":"",
                    "intersection_street_1":"WEST 29 STREET",
                    "intersection_street_2":"7 AVENUE",
                    "address_type":"INTERSECTION",
                    "city":"NEW YORK",
                    "landmark":"",
                    "facility_type":"N/A",
                    "status":"Closed",
                    "due_date":"02/28/2007 12:00:00 AM",
                    "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                    "community_board":"05 MANHATTAN",
                    "borough":"MANHATTAN",
                    "x_coordinate_state_plane":"986215",
                    "y_coordinate_state_plane":"211740",
                    "park_facility_name":"",
                    "park_borough":"MANHATTAN",
                    "school_name":"",
                    "school_number":"",
                    "school_region":"",
                    "school_code":"",
                    "school_phone_number":"",
                    "school_address":"",
                    "school_city":"",
                    "school_state":"",
                    "school_zip":"",
                    "school_not_found":"",
                    "school_or_citywide_complaint":"",
                    "vehicle_type":"",
                    "taxi_company_borough":"",
                    "taxi_pick_up_location":"Other",
                    "bridge_highway_name":"",
                    "bridge_highway_direction":"",
                    "road_ramp":"",
                    "bridge_highway_segment":"",
                    "garage_lot_name":"",
                    "ferry_direction":"",
                    "ferry_terminal_name":"",
                    "latitude":"40.74785373937869",
                    "longitude":"-73.99290823133913",
                    "location":"(40.74785373937869, -73.99290823133913)"
                  }
                
    • 点击创建
    • 创建变量后,在数据映射编辑器中,完成以下步骤:
      • 将新的 record 变量拖动到输入列。
      • connectorInputPayload 变量拖动到输出列。
    • 关闭数据映射编辑器以返回到集成编辑器。

发布子集成

如需发布子集成,请在集成编辑器中点击发布

设置主集成

在本部分中,您将设置主集成,即使用针对每个并行任务处理每条记录。然后,主集成会为每条记录调用一次子集成。

创建主集成

要创建主集成,请完成以下步骤:

  1. 在 Google Cloud 控制台中,前往 Application Integration 页面。

    转到 Application Integration

  2. 在导航菜单中,点击集成。系统随即会显示集成列表页面。
  3. 点击创建集成
  4. 创建数据集对话框中,执行以下操作:
    • 输入名称,例如,输入 process-records
    • (可选)输入说明。例如,输入 API 触发器以处理记录(主集成)
    • 选择您要在其中创建集成的区域。
  5. 点击创建以打开集成编辑器。

添加 API 触发器

要向集成添加 API 触发器,请执行以下操作:

  1. 在集成设计器中,选择添加任务/触发器 > 触发器以显示可用触发器列表。
  2. API 触发器元素拖动至集成编辑器。

添加一个“针对每个并行”任务

如需在集成中添加“针对每个并行”任务,请完成以下步骤:

  1. 在集成设计器中选择添加任务/触发器 > 任务以显示可用任务列表。
  2. 针对每个并行元素拖动到集成编辑器中。

连接集成元素

接下来,添加一个边缘连接,将 API 触发器连接到“针对每个并行”任务。

如需添加边缘连接,请点击 API 触发器元素底部的创建分支控制点。将 Edge 连接拖放到“针对每个并行”任务元素顶部的联接控制点。

配置“针对每个并行”任务

要配置“针对每个并行”任务,请完成以下步骤:

  1. 在集成编辑器中,点击针对每个并行任务,以查看任务配置窗格。
  2. 数组选择 > 迭代列表下,点击添加新变量以添加新变量。
  3. 创建变量对话框中,输入以下详细信息:
    • 名称:输入 records
    • 数据类型:选择 JSON
    • 架构:选择从示例 JSON 载荷推断。输入以下示例 JSON 载荷:
                    [{
                      "unique_key":"304271",
                      "created_date":"02/06/2007 12:00:00 AM",
                      "closed_date":"03/01/2007 12:00:00 AM",
                      "agency":"TLC",
                      "agency_name":"Taxi and Limousine Commission",
                      "complaint_type":"Taxi Complaint",
                      "descriptor":"Driver Complaint",
                      "location_type":"Street",
                      "incident_zip":"10001",
                      "incident_address":"",
                      "street_name":"",
                      "cross_street_1":"",
                      "cross_street_2":"",
                      "intersection_street_1":"WEST 29 STREET",
                      "intersection_street_2":"7 AVENUE",
                      "address_type":"INTERSECTION",
                      "city":"NEW YORK",
                      "landmark":"",
                      "facility_type":"N/A",
                      "status":"Closed",
                      "due_date":"02/28/2007 12:00:00 AM",
                      "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                      "community_board":"05 MANHATTAN",
                      "borough":"MANHATTAN",
                      "x_coordinate_state_plane":"986215",
                      "y_coordinate_state_plane":"211740",
                      "park_facility_name":"",
                      "park_borough":"MANHATTAN",
                      "school_name":"",
                      "school_number":"",
                      "school_region":"",
                      "school_code":"",
                      "school_phone_number":"",
                      "school_address":"",
                      "school_city":"",
                      "school_state":"",
                      "school_zip":"",
                      "school_not_found":"",
                      "school_or_citywide_complaint":"",
                      "vehicle_type":"",
                      "taxi_company_borough":"",
                      "taxi_pick_up_location":"Other",
                      "bridge_highway_name":"",
                      "bridge_highway_direction":"",
                      "road_ramp":"",
                      "bridge_highway_segment":"",
                      "garage_lot_name":"",
                      "ferry_direction":"",
                      "ferry_terminal_name":"",
                      "latitude":"40.74785373937869",
                      "longitude":"-73.99290823133913",
                      "location":"(40.74785373937869, -73.99290823133913)"
                    }]
                  
  4. 点击创建
  5. 子集成详情部分中,输入以下信息:
    • API 触发器 ID:在子集成中选择 API 触发器元素。例如,选择 Process-each-record_API_1
    • 执行策略:选择 ASYNC
    • 选择 Run a single integration
  6. On each execution 部分的 Where to map individual array elements 中,输入子集成的数据映射任务中的变量名称。在这种情况下,请输入记录。 系统仅会列出已发布集成的子集成变量。如果变量未列出,请刷新页面,因为在子集成发布后,变量需要一些时间才会显示。

发布主要集成

如需发布主要集成,请在集成编辑器中点击发布

测试您的集成

如需测试集成,请完成以下步骤:

  1. 将示例数据下载到 Cloud Shell:
    1. 如需从 Google Cloud 控制台启动 Cloud Shell 会话,请点击 Cloud 控制台中的“激活 Cloud Shell”图标 激活 Cloud Shell 图标。此操作将在 Google Cloud 控制台底部的窗格中启动会话。
    2. 在 Cloud Shell 终端中输入以下命令:
      wget https://raw.githubusercontent.com/GoogleCloudPlatform/application-integration-samples/main/assets/bq-sample-dataset.json
              
    3. 要验证是否已下载示例数据,请在 Cloud Shell 终端中输入以下命令:
      ls -la bq-sample-dataset.json
      Cloud Shell 终端中会列出下载的文件。
  2. 如需从示例数据集中随机选择三个条目,并以可将其传递给集成的方式存储它们,请在 Cloud Shell 终端中输入以下命令:
    AUTH=$(gcloud auth print-access-token)
    export SAMPLE_DOCS=$(jq $(r=$((RANDOM % 1000)) ; echo ".[$r:$((r + 3))]") < bq-sample-dataset.json | jq -Rs '.')
                
    generate_post_data()
      {
        cat <<EOF
          {
            "triggerId": "api_trigger/process-records_API_1",
            "inputParameters": 
              {
                "records": 
                  {
                    "jsonValue": $SAMPLE_DOCS
                  }
              }
          }
          EOF
      }
  3. 如需开始测试,请在 Cloud Shell 终端中输入以下命令:
    curl -X POST \
      https://integrations.googleapis.com/v1/projects/project_id/locations/region/integrations/process-records:execute \
      -H "Authorization: Bearer $AUTH" \
      -H "Content-Type: application/json" \
      -d "$(generate_post_data)"
    在此命令中,进行如下替换:
    • project_id 替换为 Google Cloud 项目的 ID。
    • region 替换为您创建集成的区域。
    。 此命令会调用主集成,并将示例数据集中的条目传递给主集成。然后,主集成会将每个条目传递到子集成,然后这种集成会将数据作为行添加到 BigQuery 表中。
  4. 如需验证您的 BigQuery 表现在是否包含这些记录,请执行以下步骤:
    1. Cloud 控制台页面中,点击 导航菜单
    2. 分析部分,点击 BigQuery
    3. 展开您的项目,然后点击 bq_tutorial 数据集。
    4. 展开 bq_tutorial 数据集并点击 tutorial 表。
    5. 点击 Table Explorer 标签页以查看插入的记录。

后续步骤

尝试构建与其他连接器的集成。如需查看所有支持的连接器列表,请参阅连接器参考文档