Application Integration でサポートされているコネクタをご覧ください。
For Each Parallel タスクを使用して BigQuery にデータを挿入する
このチュートリアルでは、Application Integration とサブ統合を作成し、一連のレコードを処理します。レコードごとに、メイン統合が非同期的にサブ統合を呼び出します。サブ統合が各レコードのデータを取得し、BigQuery データセットのテーブルに行として挿入します。
このチュートリアルでは、次のタスクを行います。
準備
- Application Integration にアクセスできることを確認します。
-
Google Cloud プロジェクトで次の操作を行います。
- 接続の作成に使用するサービス アカウントに次のロールを付与します。
roles/bigquery.dataEditor
roles/bigquery.readSessionUser
roles/secretmanager.viewer
roles/secretmanager.secretAccessor
- 次のサービスを有効にします。
secretmanager.googleapis.com
(Secret Manager API)connectors.googleapis.com
(Connectors API)
以前にプロジェクトでこうしたサービスを有効にしていない場合は、[接続を作成] ページで接続を作成するときに有効にするよう求められます。
- 接続の作成に使用するサービス アカウントに次のロールを付与します。
BigQuery の接続を設定する
まず、このチュートリアルで使用する BigQuery データセットとテーブルを作成します。データセットとテーブルを作成したら、BigQuery 接続を作成します。この接続は、このチュートリアルの後半にインテグレーションで使用します。
BigQuery データセットとテーブルを設定する
BigQuery データセットとテーブルを設定するには、次の手順を行います。
- Cloud コンソール ページで、Google Cloud プロジェクトを選択します。
- Google Cloud コンソールから Cloud Shell セッションを起動するには、Cloud コンソール の [Cloud Shell をアクティブにする] アイコンをクリックします。これにより、Google Cloud コンソールの下部ペインでセッションが起動します。
-
BigQuery API を有効にするには、Cloud Shell ターミナルで次のコマンドを入力します。
このコマンドでは、次のように置き換えます。export PROJECT_ID=project_id export REGION=region gcloud services enable --project "${PROJECT_ID}" \ bigquery.googleapis.com \ bigquerystorage.googleapis.com
project_id
は、Google Cloud プロジェクトのプロジェクト ID に置き換えます。region
は、BigQuery データセットの作成に使用するリージョンに置き換えます。
bq_tutorial
という名前の BigQuery データセットを作成するには、Cloud Shell ターミナルで次のコマンドを入力します。bq --project_id ${PROJECT_ID} --location ${REGION} mk bq_tutorial
tutorial
という名前の BigQuery テーブルを作成するには、Cloud Shell ターミナルで次のコマンドを入力します。bq --project_id ${PROJECT_ID} \ query \ --nouse_legacy_sql \ 'create table bq_tutorial.tutorial ( unique_key STRING NOT NULL, created_date STRING, closed_date STRING, agency STRING, agency_name STRING, complaint_type STRING, descriptor STRING, location_type STRING, incident_zip STRING, incident_address STRING, street_name STRING, cross_street_1 STRING, cross_street_2 STRING, intersection_street_1 STRING, intersection_street_2 STRING, address_type STRING, city STRING, landmark STRING, facility_type STRING, status STRING, due_date STRING, resolution_action_updated_date STRING, community_board STRING, borough STRING, x_coordinate_state_plane STRING, y_coordinate_state_plane STRING, park_facility_name STRING, park_borough STRING, school_name STRING, school_number STRING, school_region STRING, school_code STRING, school_phone_number STRING, school_address STRING, school_city STRING, school_state STRING, school_zip STRING, school_not_found STRING, school_or_citywide_complaint STRING, vehicle_type STRING, taxi_company_borough STRING, taxi_pick_up_location STRING, bridge_highway_name STRING, bridge_highway_direction STRING, bridge_highway_segment STRING, road_ramp STRING, garage_lot_name STRING, ferry_direction STRING, ferry_terminal_name STRING, latitude STRING, longitude STRING, location STRING ) '
-
Verify that your BigQuery table is created.
- In the Cloud console page, click the Navigation menu.
- In the Analytics section, click BigQuery.
-
Expand your project and confirm that the
bq_tutorial
dataset is listed. -
Expand the bq_tutorial dataset and confirm that the
tutorial
table is listed. - Click the documents table to view the schema.
Create a BigQuery connection
Next, you'll create a BigQuery connection. A BigQuery connection lets you insert, read, update and delete rows in a BigQuery table and use the resulting output in an integration. After creating the BigQuery connection, you'll use this connection in an integration later in this tutorial to add rows to the BigQuery table.
To create a BigQuery connection, complete the following steps:
- In the Cloud console page, select your Google Cloud project.
- Open the connections page.
- Click + CREATE NEW to open the Create Connection page.
- Configure the connection:
- In the Create Connection section, complete the following:
- Connector: Select BigQuery from the drop down list of available Connectors.
- Connector version: Select the latest Connector version from the drop down list of available versions.
- In the Connection Name field, enter a name for the Connection instance. For this tutorial, enter connector-bq-tutorial.
- Optionally, add a Description of the connection instance.
- Service Account: Select a service account that has the required roles.
- Project ID: Enter the ID of the Google Cloud project where the BigQuery data resides.
- Dataset ID: Enter the ID of the BigQuery dataset that you want to use. For this tutorial, enter bq_tutorial.
- Optionally, click + ADD LABEL to add a label in the form of a key/value pair.
- Click Next.
- Location: Select a region from where the connection will run. Supported
regions for connectors include:
- Click Next.
For the list of all the supported regions, see Locations.
- Authentication: The BigQuery connection does not require authentication configuration. Click Next.
- Review: Review your connection's configuration details. In this section, the connection and authentication details of the new connection are displayed for your review.
- In the Create Connection section, complete the following:
- Click Create.
Set up a sub-integration
In this tutorial, the sub-integration takes each record sent to it by the main integration and inserts it as a row in the tutorial
table in the bq_tutorial
dataset.
Create a sub-integration
To create the sub-integration, complete the following steps:
- In the Google Cloud console, go to the Application Integration page.
- In the navigation menu, click Integrations. The Integrations List page appears.
- Click Create integration.
- In the Create Integration dialog, do the following:
- Enter a name, for example, enter Process-each-record
- Optionally, enter a description. For example, enter API Trigger to process each record (sub-integration)
- Select the region where you want to create your integration.
- Click Create to open the integration editor.
Add an API Trigger
To add an API Trigger to the integration, do the following:
- In the integration editor, select Add a task/trigger > Triggers to display a list of available triggers.
- Drag the API Trigger element to the integration editor.
Add a Data Mapping task
To add a Data Mapping task in the integration, complete the following steps:
- Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
- Drag the Data Mapping element to the integration editor.
Configure the BigQuery connection
Now you are ready to use the BigQuery connection that you created earlier in the sub-integration. To configure the BigQuery connection in this integration, complete the following steps:
- Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
- Drag the Connectors element to the integration editor.
- Click the Connectors task element on the designer to view the task configuration pane.
- Click the edit icon on the right panel and update the Label to Insert row to BigQuery.
- Click Configure task.
The Configure connector task dialog appears.
- In the Configure connector task dialog, do the following:
- Select the connection region where you created your BigQuery connection.
- Select the BigQuery connection that you want to use. For this tutorial, select connector-bq-tutorial.
- Once a connection is chosen, the Type column appears. Select Entities and then tutorial from the list of available entities.
- Once a type is chosen, the Operation column appears. Select Create.
- Click Done to complete the connection configuration and close the dialog.
Connect the integration elements
Next, add edge connections to connect the API Trigger to the Data Mapping task and the Data Mapping task to the Connectors task. An edge connection is a connection between any two elements in an integration. For more information on edges and edge conditions, see Edges.
To add the edge connections, complete the following steps:
- Click the Fork control point at the bottom of the API Trigger element. Drag and drop the edge connection at the Join control point at the top of the Data Mapping element.
- Click the Fork control point at the bottom of the Data Mapping element. Drag and drop the edge connection at the Join control point at the top of the Connectors element.
Configure the Data Mapping task
To configure the Data Mapping task, complete the following steps:
- In the integration editor, click the Data Mapping task to view the task configuration pane.
- Click Open Data Mapping Editor.
- In the Data Mapping Editor, click Add to add a new variable.
- In the Create Variable dialog, enter the following information:
- Name: Enter record.
- Data Type: Select JSON.
-
Schema: Select Infer from a sample JSON payload. Enter the following sample JSON payload:
{ "unique_key":"304271", "created_date":"02/06/2007 12:00:00 AM", "closed_date":"03/01/2007 12:00:00 AM", "agency":"TLC", "agency_name":"Taxi and Limousine Commission", "complaint_type":"Taxi Complaint", "descriptor":"Driver Complaint", "location_type":"Street", "incident_zip":"10001", "incident_address":"", "street_name":"", "cross_street_1":"", "cross_street_2":"", "intersection_street_1":"WEST 29 STREET", "intersection_street_2":"7 AVENUE", "address_type":"INTERSECTION", "city":"NEW YORK", "landmark":"", "facility_type":"N/A", "status":"Closed", "due_date":"02/28/2007 12:00:00 AM", "resolution_action_updated_date":"03/01/2007 12:00:00 AM", "community_board":"05 MANHATTAN", "borough":"MANHATTAN", "x_coordinate_state_plane":"986215", "y_coordinate_state_plane":"211740", "park_facility_name":"", "park_borough":"MANHATTAN", "school_name":"", "school_number":"", "school_region":"", "school_code":"", "school_phone_number":"", "school_address":"", "school_city":"", "school_state":"", "school_zip":"", "school_not_found":"", "school_or_citywide_complaint":"", "vehicle_type":"", "taxi_company_borough":"", "taxi_pick_up_location":"Other", "bridge_highway_name":"", "bridge_highway_direction":"", "road_ramp":"", "bridge_highway_segment":"", "garage_lot_name":"", "ferry_direction":"", "ferry_terminal_name":"", "latitude":"40.74785373937869", "longitude":"-73.99290823133913", "location":"(40.74785373937869, -73.99290823133913)" }
- [作成] をクリックします。
- 変数を作成したら、データ マッピング エディタで次の手順を実行します。
- 新しい record 変数を [Input] 列にドラッグします。
- connectorInputPayload 変数を [connectorInputPayload] 列にドラッグします。
- データ マッピング エディタを閉じて統合デザイナーに戻ります。
サブインテグレーションを公開する
サブインテグレーションを公開するには、統合エディタで [Publish] をクリックします。
メインインテグレーションを設定する
このセクションでは、メインインテグレーションを設定します。ここでは、For Each Parallel タスクを使用して、各レコードを処理します。このメインインテグレーションが、各レコードに対してサブインテグレーションを 1 回呼び出します。
メインインテグレーションを作成する
メイン統合を作成するには、次の手順を行います。
- Google Cloud コンソールで [Application Integration] ページに移動します。
- ナビゲーション メニューで [統合] をクリックします。[統合リスト] ページが表示されます。
- [統合を作成] をクリックします。
- [Create integration] ダイアログで、次の操作を行います。
- 名前を入力します(例: process-records)。
- 必要に応じて、説明を入力します。たとえば、「API Trigger to process records (main integration)」と入力します。
- インテグレーションを作成するリージョンを選択します。
- [作成] をクリックして統合エディタを開きます。
API トリガーを追加する
インテグレーションに API トリガーを追加する手順は次のとおりです。
- 統合エディタで、[タスク / トリガーを追加] > [トリガー] を選択して、使用可能なトリガーのリストを表示します。
- [API Trigger] 要素を統合エディタにドラッグします。
For Each Parallel タスクを追加する
インテグレーションで For Each Parallel タスクを追加するには、次の手順を行います。
- 統合エディタで [+ Add a task/trigger] > [Tasks] の順に選択して、利用可能なタスクのリストを表示します。
- [For Each Parallel] 要素を統合エディタにドラッグします。
インテグレーションの要素を接続する
次に、API トリガーを For Each Parallel タスクに接続するエッジ接続を追加します。
エッジ接続を追加するには、API Trigger 要素の下部にある [Fork] コントロール ポイントをクリックします。For Each Parallel タスク要素の上部にある [Join] コントロール ポイントでエッジ接続をドラッグ&ドロップします。
For-Each Parallel タスクを構成する
For Each Parallel タスクを構成する手順は次のとおりです。
- 統合エディタで [For Each Parallel] タスクをクリックして、タスク構成ペインを表示します。
- [配列の選択] > [反復処理のためのリスト] で、[新しい変数を追加] をクリックして新しい変数を追加します。
- [変数の作成] ダイアログで、次の情報を入力します。
- 名前: 「
records
」と入力します。 - データ型: JSON を選択します。
-
スキーマ: [Infer from a sample JSON payload] を選択します。次のサンプル JSON ペイロードを入力します。
[{ "unique_key":"304271", "created_date":"02/06/2007 12:00:00 AM", "closed_date":"03/01/2007 12:00:00 AM", "agency":"TLC", "agency_name":"Taxi and Limousine Commission", "complaint_type":"Taxi Complaint", "descriptor":"Driver Complaint", "location_type":"Street", "incident_zip":"10001", "incident_address":"", "street_name":"", "cross_street_1":"", "cross_street_2":"", "intersection_street_1":"WEST 29 STREET", "intersection_street_2":"7 AVENUE", "address_type":"INTERSECTION", "city":"NEW YORK", "landmark":"", "facility_type":"N/A", "status":"Closed", "due_date":"02/28/2007 12:00:00 AM", "resolution_action_updated_date":"03/01/2007 12:00:00 AM", "community_board":"05 MANHATTAN", "borough":"MANHATTAN", "x_coordinate_state_plane":"986215", "y_coordinate_state_plane":"211740", "park_facility_name":"", "park_borough":"MANHATTAN", "school_name":"", "school_number":"", "school_region":"", "school_code":"", "school_phone_number":"", "school_address":"", "school_city":"", "school_state":"", "school_zip":"", "school_not_found":"", "school_or_citywide_complaint":"", "vehicle_type":"", "taxi_company_borough":"", "taxi_pick_up_location":"Other", "bridge_highway_name":"", "bridge_highway_direction":"", "road_ramp":"", "bridge_highway_segment":"", "garage_lot_name":"", "ferry_direction":"", "ferry_terminal_name":"", "latitude":"40.74785373937869", "longitude":"-73.99290823133913", "location":"(40.74785373937869, -73.99290823133913)" }]
- 名前: 「
- [作成] をクリックします。
- [Sub-integration Details] セクションに、次の情報を入力します。
- API トリガー ID: サブインテグレーションで API トリガー要素を選択します。たとえば、[Process-each-record_API_1] を選択します。
- 実行戦略: [ASYNC] を選択します。
- [Run a single integration] を選択します。
- [On each execution] セクションの [Where to map individual array elements] で、サブ統合のデータ マッピング タスクの変数名を入力します。この場合は「record」と入力します。 サブインテグレーション変数は、公開されているインテグレーションでのみ一覧表示されます。変数がリストにない場合は、サブ統合が公開されてから変数が表示されるまでに時間がかかるため、ページを更新してください。
メインインテグレーションを公開する
メイン統合を公開するには、統合エディタで [Publish] をクリックします。
インテグレーションをテストする
インテグレーションをテストするには、次の手順を実行します。
- サンプルデータを Cloud Shell にダウンロードします。
- Google Cloud コンソールから Cloud Shell セッションを起動するには、Cloud コンソール の [Cloud Shell をアクティブにする] アイコンをクリックします。これにより、Google Cloud コンソールの下部ペインでセッションが起動します。
- Cloud Shell ターミナルで次のコマンドを入力します。
wget https://raw.githubusercontent.com/GoogleCloudPlatform/application-integration-samples/main/assets/bq-sample-dataset.json
- サンプルデータがダウンロードされたことを確認するには、Cloud Shell ターミナルに次のコマンドを入力します。
ダウンロードしたファイルが Cloud Shell ターミナルに表示されます。ls -la bq-sample-dataset.json
- サンプル データセットから 3 つのランダムなエントリを選択し、それらを統合に渡せる方法で保存するには、Cloud Shell ターミナルで次のコマンドを入力します。
AUTH=$(gcloud auth print-access-token) export SAMPLE_DOCS=$(jq $(r=$((RANDOM % 1000)) ; echo ".[$r:$((r + 3))]") < bq-sample-dataset.json | jq -Rs '.') generate_post_data() { cat <<EOF { "triggerId": "api_trigger/process-records_API_1", "inputParameters": { "records": { "jsonValue": $SAMPLE_DOCS } } } EOF }
- テストを開始するには、Cloud Shell ターミナルで次のコマンドを入力します。
このコマンドでは、次のように置き換えます。curl -X POST \ https://integrations.googleapis.com/v1/projects/project_id/locations/region/integrations/process-records:execute \ -H "Authorization: Bearer $AUTH" \ -H "Content-Type: application/json" \ -d "$(generate_post_data)"
project_id
は、Google Cloud プロジェクトの ID に置き換えます。region
は、統合を作成したリージョンに置き換えます。
- BigQuery テーブルにこれらのレコードが含まれていることを確認するには、次の手順を実行します。
- Cloud コンソール ページで、 ナビゲーション メニューをクリックします。
- [分析] で、[BigQuery] をクリックします。
-
プロジェクトを展開し、
bq_tutorial
データセットをクリックします。 -
bq_tutorial データセットを展開し、
tutorial
テーブルをクリックします。 - [Table Explorer] タブをクリックして、挿入されたレコードを表示します。
次のステップ
他のコネクタとのインテグレーションを構築してみる。サポートされているすべてのコネクタのリストについては、コネクタ リファレンスをご覧ください。