请参阅 Application Integration 支持的连接器。
使用“针对每个并行”任务将数据插入到 BigQuery 中
在本教程中,您将创建 Application Integration 和子集成,以处理一系列记录。对于每条记录,主集成会异步调用子集成,即获取每条记录的数据并将其作为行插入 BigQuery 数据集中的表。
在本教程中,您将完成以下任务:
准备工作
- 确保您有权访问 Application Integration。
-
在 Google Cloud 项目中执行以下操作:
- 将以下角色授予您要用于创建连接的服务账号:
roles/bigquery.dataEditor
roles/bigquery.readSessionUser
roles/secretmanager.viewer
roles/secretmanager.secretAccessor
- 启用以下服务:
secretmanager.googleapis.com
(Secret Manager API)connectors.googleapis.com
(Connectors API)
如果之前没有为您的项目启用这些服务,则在“创建连接”页面中创建连接时系统会提示您启用。
- 将以下角色授予您要用于创建连接的服务账号:
设置 BigQuery 连接
首先,创建要用于本教程的 BigQuery 数据集和表。创建数据集和表后,创建 BigQuery 连接。您将在本教程后面的集成中使用此连接。
设置 BigQuery 数据集和表
如需设置 BigQuery 数据集和表,请执行以下步骤:
- 在 Cloud 控制台中,选择您的 Google Cloud 项目。
- 如需从 Google Cloud 控制台启动 Cloud Shell 会话,请点击 Cloud 控制台中的 激活 Cloud Shell 图标。此操作将在 Google Cloud 控制台底部的窗格中启动会话。
-
如需启用 BigQuery API,请在 Cloud Shell 终端中输入以下命令:
export PROJECT_ID=project_id export REGION=region gcloud services enable --project "${PROJECT_ID}" \ bigquery.googleapis.com \ bigquerystorage.googleapis.com
在此命令中,进行如下替换:- 将
project_id
替换为 Google Cloud 项目的 ID。 - 将
region
替换为您用于创建 BigQuery 数据集的区域。
- 将
- 要创建名为
bq_tutorial
的 BigQuery 数据集,请在 Cloud Shell 终端中输入以下命令:bq --project_id ${PROJECT_ID} --location ${REGION} mk bq_tutorial
- 如需创建名为
tutorial
的 BigQuery 表,请在 Cloud Shell 终端中输入以下命令:bq --project_id ${PROJECT_ID} \ query \ --nouse_legacy_sql \ 'create table bq_tutorial.tutorial ( unique_key STRING NOT NULL, created_date STRING, closed_date STRING, agency STRING, agency_name STRING, complaint_type STRING, descriptor STRING, location_type STRING, incident_zip STRING, incident_address STRING, street_name STRING, cross_street_1 STRING, cross_street_2 STRING, intersection_street_1 STRING, intersection_street_2 STRING, address_type STRING, city STRING, landmark STRING, facility_type STRING, status STRING, due_date STRING, resolution_action_updated_date STRING, community_board STRING, borough STRING, x_coordinate_state_plane STRING, y_coordinate_state_plane STRING, park_facility_name STRING, park_borough STRING, school_name STRING, school_number STRING, school_region STRING, school_code STRING, school_phone_number STRING, school_address STRING, school_city STRING, school_state STRING, school_zip STRING, school_not_found STRING, school_or_citywide_complaint STRING, vehicle_type STRING, taxi_company_borough STRING, taxi_pick_up_location STRING, bridge_highway_name STRING, bridge_highway_direction STRING, bridge_highway_segment STRING, road_ramp STRING, garage_lot_name STRING, ferry_direction STRING, ferry_terminal_name STRING, latitude STRING, longitude STRING, location STRING ) '
-
验证您的 BigQuery 表是否已创建。
- 在 Cloud 控制台页面中,点击 导航菜单。
- 在分析部分,点击 BigQuery。
-
展开项目并确认
bq_tutorial
数据集已列出。 -
展开 bq_tutorial 数据集并确认
tutorial
表已列出。 - 点击文档表以查看架构。
创建 BigQuery 连接
接下来,您将创建 BigQuery 连接。借助 BigQuery 连接,您可以在 BigQuery 表中插入、读取、更新和删除行,并在集成中使用生成的输出。创建 BigQuery 连接后,您将在本教程的集成中使用此连接,将行添加到 BigQuery 表中。
如需创建 BigQuery 连接,请完成以下步骤:
- 在 Cloud 控制台中,选择您的 Google Cloud 项目。
- 打开连接页面。
- 点击 + 新建以打开创建连接页面。
- 配置连接:
- 在创建连接部分,完成以下操作:
- 连接器:从可用连接器的下拉列表中选择 BigQuery。
- 连接器版本:从可用版本的下拉列表中选择最新的连接器版本。
- 在连接名称字段中,输入连接实例的名称。 对于本教程,输入 connector-bq-tutorial。
- (可选)添加连接实例的说明。
- 服务账号:选择具有所需角色的服务账号。
- 项目 ID:输入 BigQuery 数据所在的 Google Cloud 项目的 ID。
- 数据集 ID:输入要使用的 BigQuery 数据集的 ID。对于本教程,请输入 bq_tutorial。
- (可选)点击 + 添加标签,以键值对的形式添加标签。
- 点击下一步。
- 位置:选择将从哪个区域运行连接。连接器支持的区域包括:
- 点击下一步。
如需查看所有受支持区域的列表,请参阅位置。
- 身份验证:BigQuery 连接无需进行身份验证配置。点击下一步。
- 检查:检查连接的配置详细信息。本部分会显示新连接的连接和身份验证详情,以供您检查。
- 在创建连接部分,完成以下操作:
- 点击创建。
设置子集成
在本教程中,子集成接受主集成向其发送的每条记录,并将其作为一行插入 bq_tutorial
数据集的 tutorial
表中。
创建子集成
如需创建子集成,请完成以下步骤:
- 在 Google Cloud 控制台中,前往 Application Integration 页面。
- 在导航菜单中,点击 Integrations(集成)。随即会出现集成列表页面。
- 点击创建集成。
- 在创建数据集对话框中,执行以下操作:
- 输入名称,例如,输入 Process-each-record
- (可选)输入说明。例如,输入 API 触发器以处理每条记录(子集成)
- 选择您要在其中创建集成的区域。
- 点击创建以打开集成编辑器。
添加 API 触发器
要向集成添加 API 触发器,请执行以下操作:
- 在集成设计器中,选择添加任务/触发器 > 触发器以显示可用触发器列表。
- 将 API 触发器元素拖动至集成编辑器。
添加数据映射任务
如需在集成中添加数据映射任务,请完成以下步骤:
- 在集成设计器中选择添加任务/触发器 > 任务以显示可用任务列表。
- 将数据映射元素拖动至集成编辑器。
配置 BigQuery 连接
现在,您可以使用先前在子集成中创建的 BigQuery 连接。如需在此集成中配置 BigQuery 连接,请完成以下步骤:
- 在集成设计器中选择添加任务/触发器 > 任务以显示可用任务列表。
- 将连接器元素拖动至集成编辑器。
- 点击设计器上的连接器任务元素,以查看任务配置窗格。
- 点击右侧面板上的修改图标,然后将标签更新为将行插入 BigQuery。
- 点击配置任务。
此时将显示配置连接器任务对话框。
- 在配置连接器任务对话框中,执行以下操作:
- 选择您在其中创建了 BigQuery 连接的连接区域。
- 选择要使用的 BigQuery 连接。对于本教程,选择 connector-bq-tutorial。
- 选择连接后,系统会显示类型列。选择实体,然后从可用实体列表中选择教程。
- 选择类型后,系统将显示操作列。选择创建。
- 点击完成以完成连接配置并关闭对话框。
连接集成元素
接下来,添加边缘连接,将 API 触发器连接到数据映射任务,将数据映射任务连接到连接器任务。边缘连接是集成中的任何两个元素之间的连接。如需详细了解边缘和边缘条件,请参阅边缘。
如需添加边缘连接,请完成以下步骤:
- 点击 API 触发器元素底部的分支控制点。将边缘连接拖放到数据映射元素顶部的联接控制点。
- 点击数据映射元素底部的分支控制点。在连接器元素顶部的联接控制点处拖放边缘连接。
配置数据映射任务
如需配置数据映射任务,请完成以下步骤:
- 在集成编辑器中,点击数据映射任务以查看任务配置窗格。
- 点击打开数据映射编辑器。
- 在数据映射编辑器中,点击添加以添加新变量。
- 在创建变量对话框中,输入以下信息:
- 名称:输入 record。
- 数据类型:选择 JSON。
-
架构:选择从示例 JSON 载荷推断。输入以下示例 JSON 载荷:
{ "unique_key":"304271", "created_date":"02/06/2007 12:00:00 AM", "closed_date":"03/01/2007 12:00:00 AM", "agency":"TLC", "agency_name":"Taxi and Limousine Commission", "complaint_type":"Taxi Complaint", "descriptor":"Driver Complaint", "location_type":"Street", "incident_zip":"10001", "incident_address":"", "street_name":"", "cross_street_1":"", "cross_street_2":"", "intersection_street_1":"WEST 29 STREET", "intersection_street_2":"7 AVENUE", "address_type":"INTERSECTION", "city":"NEW YORK", "landmark":"", "facility_type":"N/A", "status":"Closed", "due_date":"02/28/2007 12:00:00 AM", "resolution_action_updated_date":"03/01/2007 12:00:00 AM", "community_board":"05 MANHATTAN", "borough":"MANHATTAN", "x_coordinate_state_plane":"986215", "y_coordinate_state_plane":"211740", "park_facility_name":"", "park_borough":"MANHATTAN", "school_name":"", "school_number":"", "school_region":"", "school_code":"", "school_phone_number":"", "school_address":"", "school_city":"", "school_state":"", "school_zip":"", "school_not_found":"", "school_or_citywide_complaint":"", "vehicle_type":"", "taxi_company_borough":"", "taxi_pick_up_location":"Other", "bridge_highway_name":"", "bridge_highway_direction":"", "road_ramp":"", "bridge_highway_segment":"", "garage_lot_name":"", "ferry_direction":"", "ferry_terminal_name":"", "latitude":"40.74785373937869", "longitude":"-73.99290823133913", "location":"(40.74785373937869, -73.99290823133913)" }
- 点击创建。
- 创建变量后,在数据映射编辑器中,完成以下步骤:
- 将新的 record 变量拖动到输入列。
- 将 connectorInputPayload 变量拖动到输出列。
- 关闭数据映射编辑器以返回到集成编辑器。
发布子集成
如需发布子集成,请在集成编辑器中点击发布。
设置主集成
在本部分中,您将设置主集成,即使用针对每个并行任务处理每条记录。然后,主集成会为每条记录调用一次子集成。
创建主集成
要创建主集成,请完成以下步骤:
- 在 Google Cloud 控制台中,前往 Application Integration 页面。
- 在导航菜单中,点击 Integrations(集成)。随即会出现集成列表页面。
- 点击创建集成。
- 在创建数据集对话框中,执行以下操作:
- 输入名称,例如,输入 process-records。
- (可选)输入说明。例如,输入 API 触发器以处理记录(主集成)
- 选择您要在其中创建集成的区域。
- 点击创建以打开集成编辑器。
添加 API 触发器
要向集成添加 API 触发器,请执行以下操作:
- 在集成设计器中,选择添加任务/触发器 > 触发器以显示可用触发器列表。
- 将 API 触发器元素拖动至集成编辑器。
添加一个“针对每个并行”任务
如需在集成中添加“针对每个并行”任务,请完成以下步骤:
- 在集成设计器中选择添加任务/触发器 > 任务以显示可用任务列表。
- 将针对每个并行元素拖动到集成编辑器中。
连接集成元素
接下来,添加一个边缘连接,将 API 触发器连接到“针对每个并行”任务。
如需添加边缘连接,请点击 API 触发器元素底部的创建分支控制点。将 Edge 连接拖放到“针对每个并行”任务元素顶部的联接控制点。
配置“针对每个并行”任务
要配置“针对每个并行”任务,请完成以下步骤:
- 在集成编辑器中,点击针对每个并行任务,以查看任务配置窗格。
- 在数组选择 > 迭代列表下,点击添加新变量以添加新变量。
- 在创建变量对话框中,输入以下详细信息:
-
名称:输入
records
。 - 数据类型:选择 JSON。
-
架构:选择从示例 JSON 载荷推断。输入以下示例 JSON 载荷:
[{ "unique_key":"304271", "created_date":"02/06/2007 12:00:00 AM", "closed_date":"03/01/2007 12:00:00 AM", "agency":"TLC", "agency_name":"Taxi and Limousine Commission", "complaint_type":"Taxi Complaint", "descriptor":"Driver Complaint", "location_type":"Street", "incident_zip":"10001", "incident_address":"", "street_name":"", "cross_street_1":"", "cross_street_2":"", "intersection_street_1":"WEST 29 STREET", "intersection_street_2":"7 AVENUE", "address_type":"INTERSECTION", "city":"NEW YORK", "landmark":"", "facility_type":"N/A", "status":"Closed", "due_date":"02/28/2007 12:00:00 AM", "resolution_action_updated_date":"03/01/2007 12:00:00 AM", "community_board":"05 MANHATTAN", "borough":"MANHATTAN", "x_coordinate_state_plane":"986215", "y_coordinate_state_plane":"211740", "park_facility_name":"", "park_borough":"MANHATTAN", "school_name":"", "school_number":"", "school_region":"", "school_code":"", "school_phone_number":"", "school_address":"", "school_city":"", "school_state":"", "school_zip":"", "school_not_found":"", "school_or_citywide_complaint":"", "vehicle_type":"", "taxi_company_borough":"", "taxi_pick_up_location":"Other", "bridge_highway_name":"", "bridge_highway_direction":"", "road_ramp":"", "bridge_highway_segment":"", "garage_lot_name":"", "ferry_direction":"", "ferry_terminal_name":"", "latitude":"40.74785373937869", "longitude":"-73.99290823133913", "location":"(40.74785373937869, -73.99290823133913)" }]
-
名称:输入
- 点击创建。
- 在子集成详情部分中,输入以下信息:
- API 触发器 ID:在子集成中选择 API 触发器元素。例如,选择 Process-each-record_API_1。
- 执行策略:选择 ASYNC。
- 选择 Run a single integration。
- 在 On each execution 部分的 Where to map individual array elements 中,输入子集成的数据映射任务中的变量名称。在这种情况下,请输入记录。 系统仅会列出已发布集成的子集成变量。如果变量未列出,请刷新页面,因为在子集成发布后,变量需要一些时间才会显示。
发布主要集成
如需发布主要集成,请在集成编辑器中点击发布。
测试您的集成
如需测试集成,请完成以下步骤:
- 将示例数据下载到 Cloud Shell:
- 如需从 Google Cloud 控制台启动 Cloud Shell 会话,请点击 Cloud 控制台中的 激活 Cloud Shell 图标。此操作将在 Google Cloud 控制台底部的窗格中启动会话。
- 在 Cloud Shell 终端中输入以下命令:
wget https://raw.githubusercontent.com/GoogleCloudPlatform/application-integration-samples/main/assets/bq-sample-dataset.json
- 要验证是否已下载示例数据,请在 Cloud Shell 终端中输入以下命令:
ls -la bq-sample-dataset.json
下载的文件在 Cloud Shell 终端中列出。
- 如需从样本数据集内随机选择三个条目并将其存储为可传递给集成的方式,请在 Cloud Shell 终端中输入以下命令:
AUTH=$(gcloud auth print-access-token) export SAMPLE_DOCS=$(jq $(r=$((RANDOM % 1000)) ; echo ".[$r:$((r + 3))]") < bq-sample-dataset.json | jq -Rs '.') generate_post_data() { cat <<EOF { "triggerId": "api_trigger/process-records_API_1", "inputParameters": { "records": { "jsonValue": $SAMPLE_DOCS } } } EOF }
- 如需开始测试,请在 Cloud Shell 终端中输入以下命令:
curl -X POST \ https://integrations.googleapis.com/v1/projects/project_id/locations/region/integrations/process-records:execute \ -H "Authorization: Bearer $AUTH" \ -H "Content-Type: application/json" \ -d "$(generate_post_data)"
在此命令中,进行如下替换:- 将
project_id
替换为您的 Google Cloud 项目的 ID。 - 将
region
替换为您创建集成的区域。
- 将
- 如需验证您的 BigQuery 表表现在是否包含这些记录,请执行以下步骤:
- 在 Cloud 控制台页面中,点击 导航菜单。
- 在分析部分,点击 BigQuery。
-
展开您的项目,然后点击
bq_tutorial
数据集。 -
展开 bq_tutorial 数据集并点击
tutorial
表。 - 点击 Table Explorer 标签页以查看插入的记录。
后续步骤
尝试构建与其他连接器的集成。如需查看所有支持的连接器列表,请参阅连接器参考文档。