Application Integration でサポートされているコネクタをご覧ください。
For Each Parallel タスクを使用して BigQuery にデータを挿入する
このチュートリアルでは、Application Integration とサブ統合を作成し、一連のレコードを処理します。レコードごとに、メイン統合が非同期的にサブ統合を呼び出します。サブ統合が各レコードのデータを取得し、BigQuery データセットのテーブルに行として挿入します。
このチュートリアルでは、次のタスクを行います。
準備
- Application Integration にアクセスできることを確認します。
-
Google Cloud プロジェクトで次の操作を行います。
- 接続の作成に使用するサービス アカウントに次のロールを付与します。
roles/bigquery.dataEditor
roles/bigquery.readSessionUser
roles/secretmanager.viewer
roles/secretmanager.secretAccessor
- 次のサービスを有効にします。
secretmanager.googleapis.com
(Secret Manager API)connectors.googleapis.com
(Connectors API)
以前にプロジェクトでこうしたサービスを有効にしていない場合は、[接続を作成] ページで接続を作成するときに有効にするよう求められます。
- 接続の作成に使用するサービス アカウントに次のロールを付与します。
BigQuery の接続を設定する
まず、このチュートリアルで使用する BigQuery データセットとテーブルを作成します。データセットとテーブルを作成したら、BigQuery 接続を作成します。この接続は、このチュートリアルの後半にインテグレーションで使用します。
BigQuery データセットとテーブルを設定する
BigQuery データセットとテーブルを設定するには、次の手順を行います。
- Cloud コンソール ページで、Google Cloud プロジェクトを選択します。
- Google Cloud コンソールから Cloud Shell セッションを起動するには、Cloud コンソール の [Cloud Shell をアクティブにする] アイコンをクリックします。これにより、Google Cloud コンソールの下部ペインでセッションが起動します。
-
BigQuery API を有効にするには、Cloud Shell ターミナルで次のコマンドを入力します。
export PROJECT_ID=project_id export REGION=region gcloud services enable --project "${PROJECT_ID}" \ bigquery.googleapis.com \ bigquerystorage.googleapis.com
このコマンドでは、次のように置き換えます。project_id
は、Google Cloud プロジェクトのプロジェクト ID に置き換えます。region
は、BigQuery データセットの作成に使用するリージョンに置き換えます。
bq_tutorial
という名前の BigQuery データセットを作成するには、Cloud Shell ターミナルで次のコマンドを入力します。bq --project_id ${PROJECT_ID} --location ${REGION} mk bq_tutorial
tutorial
という名前の BigQuery テーブルを作成するには、Cloud Shell ターミナルで次のコマンドを入力します。bq --project_id ${PROJECT_ID} \ query \ --nouse_legacy_sql \ 'create table bq_tutorial.tutorial ( unique_key STRING NOT NULL, created_date STRING, closed_date STRING, agency STRING, agency_name STRING, complaint_type STRING, descriptor STRING, location_type STRING, incident_zip STRING, incident_address STRING, street_name STRING, cross_street_1 STRING, cross_street_2 STRING, intersection_street_1 STRING, intersection_street_2 STRING, address_type STRING, city STRING, landmark STRING, facility_type STRING, status STRING, due_date STRING, resolution_action_updated_date STRING, community_board STRING, borough STRING, x_coordinate_state_plane STRING, y_coordinate_state_plane STRING, park_facility_name STRING, park_borough STRING, school_name STRING, school_number STRING, school_region STRING, school_code STRING, school_phone_number STRING, school_address STRING, school_city STRING, school_state STRING, school_zip STRING, school_not_found STRING, school_or_citywide_complaint STRING, vehicle_type STRING, taxi_company_borough STRING, taxi_pick_up_location STRING, bridge_highway_name STRING, bridge_highway_direction STRING, bridge_highway_segment STRING, road_ramp STRING, garage_lot_name STRING, ferry_direction STRING, ferry_terminal_name STRING, latitude STRING, longitude STRING, location STRING ) '
-
BigQuery テーブルが作成されていることを確認します。
- Cloud コンソール ページで、 ナビゲーション メニューをクリックします。
- [分析] で、[BigQuery] をクリックします。
-
プロジェクトを開き、
bq_tutorial
データセットが表示されていることを確認します。 -
bq_tutorial データセットを開き、
tutorial
テーブルが表示されていることを確認します。 - ドキュメント テーブルをクリックしてスキーマを表示します。
BigQuery Connection を作成する
次に、BigQuery 接続を作成します。BigQuery 接続を使用すると、BigQuery テーブルの行を挿入、読み取り、更新、削除し、生成された出力をインテグレーションで使用できます。BigQuery 接続を作成した後、このチュートリアルの後半のインテグレーションでこの接続を使用して、BigQuery テーブルに行を追加します。
BigQuery 接続を作成するには、次の手順を行います。
- Cloud コンソール ページで、Google Cloud プロジェクトを選択します。
- 接続ページを開きます。
- [+ 新規作成] をクリックして [接続を作成] ページを開きます。
- 接続を構成します。
- [接続を作成] セクションで、次の操作を行います。
- コネクタ: 使用可能なコネクタのプルダウン リストから [BigQuery] を選択します。
- コネクタのバージョン: 使用可能なバージョンのプルダウン リストから最新のコネクタのバージョンを選択します。
- [接続名] フィールドに、接続インスタンスの名前を入力します。このチュートリアルでは、「connector-bq-tutorial」と入力します。
- 必要に応じ、接続インスタンスの説明を追加します。
- サービス アカウント: 必要なロールを持つサービス アカウントを選択します。
- プロジェクト ID: BigQuery データが存在する Google Cloud プロジェクトの ID を入力します。
- データセット ID: 使用する BigQuery データセットの ID を入力します。このチュートリアルでは、「bq_tutorial」と入力します。
- 必要に応じ、[+ ラベルを追加] をクリックして Key-Value ペアの形式でラベルを追加します。
- [Next] をクリックします。
- ロケーション: 接続を実行するリージョンを選択します。コネクタをサポートしているリージョンは次のとおりです。
- [Next] をクリックします。
サポートされているすべてのリージョンのリストについては、ロケーションをご覧ください。
- 認証: BigQuery 接続には認証構成は不要です。[Next] をクリックします。
- 確認: 接続の構成の詳細を確認します。このセクションでは、新しい接続の接続と認証の詳細が表示されます。
- [接続を作成] セクションで、次の操作を行います。
- [Create] をクリックします。
サブインテグレーションを設定する
このチュートリアルでは、サブインテグレーションによって、メインインテグレーションによって送信された各レコードが bq_tutorial
データセットの tutorial
テーブルの行として挿入されます。
サブインテグレーションを作成する
サブ統合を作成するには、次の手順を行います。
- Google Cloud コンソールで [Application Integration] ページに移動します。
- ナビゲーション メニューで [統合] をクリックします。[統合リスト] ページが表示されます。
- [統合を作成] をクリックします。
- [Create integration] ダイアログで、次の操作を行います。
- 「Process-each-record」などの名前を入力します。
- 必要に応じて、説明を入力します。たとえば、「API Trigger to process each record (sub-integration)」と入力します。
- インテグレーションを作成するリージョンを選択します。
- [作成] をクリックして統合エディタを開きます。
API トリガーを追加する
インテグレーションに API トリガーを追加する手順は次のとおりです。
- 統合エディタで、[タスク / トリガーを追加] > [トリガー] を選択して、使用可能なトリガーのリストを表示します。
- [API Trigger] 要素を統合エディタにドラッグします。
データ マッピング タスクを追加する
インテグレーションでデータ マッピング タスクを追加するには、次の手順を行います。
- 統合エディタで [+ Add a task/trigger] > [Tasks] の順に選択して、利用可能なタスクのリストを表示します。
- データ マッピング要素を統合エディタにドラッグします。
BigQuery 接続を構成する
これで、前にサブインテグレーションで作成した BigQuery 接続を使用できます。このインテグレーションで BigQuery 接続を構成するには、次の手順を行います。
- 統合エディタで [+ Add a task/trigger] > [Tasks] の順に選択して、利用可能なタスクのリストを表示します。
- Connectors 要素を統合エディタにドラッグします。
- デザイナーで [Connectors] タスク要素をクリックして、タスク構成ペインを表示します。
- 右側のパネルの編集アイコンをクリックして、ラベルを [BigQuery に行を挿入] に更新します。
- [Configure task] をクリックします。
[Configure connector task] ダイアログが表示されます。
- [Configure connector task] ダイアログで、次の操作を行います。
- BigQuery 接続を作成した接続リージョンを選択します。
- 使用する BigQuery 接続を選択します。このチュートリアルでは、[connector-bq-tutorial] を選択します。
- 接続を選択すると、[Type] 列が表示されます。[エンティティ] を選択し、使用可能なエンティティのリストから [チュートリアル] を選択します。
- タイプが選択されると、[Operation] 列が表示されます。[Create] を選択します。
- [Done] をクリックして接続の構成を完了し、ダイアログを閉じます。
インテグレーションの要素を接続する
次に、エッジ接続を追加して、[API トリガー] を [データ マッピング] タスクに接続し、[データ マッピング] タスクを [コネクタ] タスクに接続します。エッジ接続は、インテグレーションの 2 つの要素間の接続です。エッジとエッジの条件の詳細については、エッジをご覧ください。
エッジ接続を追加する手順は次のとおりです。
- [API Trigger] 要素の下部にある [Fork] コントロール ポイントをクリックします。[Data Mapping] 要素の上部にある [Join] コントロール ポイントでエッジ接続をドラッグ&ドロップします。
- Data Mapping 要素の下部にある [Fork] コントロール ポイントをクリックします。Connectors 要素の上部にある [Join] コントロール ポイントでエッジ接続をドラッグ&ドロップします。
データ マッピング タスクを構成する
データ マッピング タスクを構成するには、次の手順を行います。
- 統合エディタで [Data Mapping] タスクをクリックし、タスク構成ペインを表示します。
- [Open Data Mapping Editor] をクリックします。
- データ マッピング エディタで、[追加] をクリックして新しい変数を追加します。
- [変数の作成] ダイアログで、次の情報を入力します。
- 名前: 「record」と入力します。
- データ型: JSON を選択します。
-
スキーマ: [Infer from a sample JSON payload] を選択します。次のサンプル JSON ペイロードを入力します。
{ "unique_key":"304271", "created_date":"02/06/2007 12:00:00 AM", "closed_date":"03/01/2007 12:00:00 AM", "agency":"TLC", "agency_name":"Taxi and Limousine Commission", "complaint_type":"Taxi Complaint", "descriptor":"Driver Complaint", "location_type":"Street", "incident_zip":"10001", "incident_address":"", "street_name":"", "cross_street_1":"", "cross_street_2":"", "intersection_street_1":"WEST 29 STREET", "intersection_street_2":"7 AVENUE", "address_type":"INTERSECTION", "city":"NEW YORK", "landmark":"", "facility_type":"N/A", "status":"Closed", "due_date":"02/28/2007 12:00:00 AM", "resolution_action_updated_date":"03/01/2007 12:00:00 AM", "community_board":"05 MANHATTAN", "borough":"MANHATTAN", "x_coordinate_state_plane":"986215", "y_coordinate_state_plane":"211740", "park_facility_name":"", "park_borough":"MANHATTAN", "school_name":"", "school_number":"", "school_region":"", "school_code":"", "school_phone_number":"", "school_address":"", "school_city":"", "school_state":"", "school_zip":"", "school_not_found":"", "school_or_citywide_complaint":"", "vehicle_type":"", "taxi_company_borough":"", "taxi_pick_up_location":"Other", "bridge_highway_name":"", "bridge_highway_direction":"", "road_ramp":"", "bridge_highway_segment":"", "garage_lot_name":"", "ferry_direction":"", "ferry_terminal_name":"", "latitude":"40.74785373937869", "longitude":"-73.99290823133913", "location":"(40.74785373937869, -73.99290823133913)" }
- [Create] をクリックします。
- 変数を作成したら、データ マッピング エディタで次の手順を実行します。
- 新しい record 変数を [Input] 列にドラッグします。
- connectorInputPayload 変数を [出力] 列にドラッグします。
- データ マッピング エディタを閉じて統合デザイナーに戻ります。
サブインテグレーションを公開する
サブインテグレーションを公開するには、統合エディタで [Publish] をクリックします。
メインインテグレーションを設定する
このセクションでは、メインインテグレーションを設定します。ここでは、For Each Parallel タスクを使用して、各レコードを処理します。このメインインテグレーションが、各レコードに対してサブインテグレーションを 1 回呼び出します。
メインインテグレーションを作成する
メイン統合を作成するには、次の手順を行います。
- Google Cloud コンソールで [Application Integration] ページに移動します。
- ナビゲーション メニューで [統合] をクリックします。[統合リスト] ページが表示されます。
- [統合を作成] をクリックします。
- [Create integration] ダイアログで、次の操作を行います。
- 名前を入力します(例: process-records)。
- 必要に応じて、説明を入力します。たとえば、「API Trigger to process records (main integration)」と入力します。
- インテグレーションを作成するリージョンを選択します。
- [作成] をクリックして統合エディタを開きます。
API トリガーを追加する
インテグレーションに API トリガーを追加する手順は次のとおりです。
- 統合エディタで、[タスク / トリガーを追加] > [トリガー] を選択して、使用可能なトリガーのリストを表示します。
- [API Trigger] 要素を統合エディタにドラッグします。
For Each Parallel タスクを追加する
インテグレーションで For Each Parallel タスクを追加するには、次の手順を行います。
- 統合エディタで [+ Add a task/trigger] > [Tasks] の順に選択して、利用可能なタスクのリストを表示します。
- [For Each Parallel] 要素を統合エディタにドラッグします。
インテグレーションの要素を接続する
次に、API トリガーを For Each Parallel タスクに接続するエッジ接続を追加します。
エッジ接続を追加するには、API Trigger 要素の下部にある [Fork] コントロール ポイントをクリックします。For Each Parallel タスク要素の上部にある [Join] コントロール ポイントでエッジ接続をドラッグ&ドロップします。
For-Each Parallel タスクを構成する
For Each Parallel タスクを構成する手順は次のとおりです。
- 統合エディタで [For Each Parallel] タスクをクリックして、タスク構成ペインを表示します。
- [配列の選択] > [反復処理のためのリスト] で、[新しい変数を追加] をクリックして新しい変数を追加します。
- [変数の作成] ダイアログで、次の情報を入力します。
- 名前: 「
records
」と入力します。 - データ型: JSON を選択します。
-
スキーマ: [Infer from a sample JSON payload] を選択します。次のサンプル JSON ペイロードを入力します。
[{ "unique_key":"304271", "created_date":"02/06/2007 12:00:00 AM", "closed_date":"03/01/2007 12:00:00 AM", "agency":"TLC", "agency_name":"Taxi and Limousine Commission", "complaint_type":"Taxi Complaint", "descriptor":"Driver Complaint", "location_type":"Street", "incident_zip":"10001", "incident_address":"", "street_name":"", "cross_street_1":"", "cross_street_2":"", "intersection_street_1":"WEST 29 STREET", "intersection_street_2":"7 AVENUE", "address_type":"INTERSECTION", "city":"NEW YORK", "landmark":"", "facility_type":"N/A", "status":"Closed", "due_date":"02/28/2007 12:00:00 AM", "resolution_action_updated_date":"03/01/2007 12:00:00 AM", "community_board":"05 MANHATTAN", "borough":"MANHATTAN", "x_coordinate_state_plane":"986215", "y_coordinate_state_plane":"211740", "park_facility_name":"", "park_borough":"MANHATTAN", "school_name":"", "school_number":"", "school_region":"", "school_code":"", "school_phone_number":"", "school_address":"", "school_city":"", "school_state":"", "school_zip":"", "school_not_found":"", "school_or_citywide_complaint":"", "vehicle_type":"", "taxi_company_borough":"", "taxi_pick_up_location":"Other", "bridge_highway_name":"", "bridge_highway_direction":"", "road_ramp":"", "bridge_highway_segment":"", "garage_lot_name":"", "ferry_direction":"", "ferry_terminal_name":"", "latitude":"40.74785373937869", "longitude":"-73.99290823133913", "location":"(40.74785373937869, -73.99290823133913)" }]
- 名前: 「
- [Create] をクリックします。
- [Sub-integration Details] セクションに、次の情報を入力します。
- API トリガー ID: サブインテグレーションで API トリガー要素を選択します。たとえば、[Process-each-record_API_1] を選択します。
- 実行戦略: [ASYNC] を選択します。
- [Run a single integration] を選択します。
- [On each execution] セクションの [Where to map individual array elements] で、サブ統合のデータ マッピング タスクの変数名を入力します。この場合は「record」と入力します。 サブ統合変数は、公開済みの統合に対してのみ表示されます。変数がリストにない場合は、サブ統合が公開されてから変数が表示されるまでに時間がかかるため、ページを更新してください。
メインインテグレーションを公開する
メイン統合を公開するには、統合エディタで [Publish] をクリックします。
インテグレーションをテストする
インテグレーションをテストするには、次の手順を実行します。
- サンプルデータを Cloud Shell にダウンロードします。
- Google Cloud コンソールから Cloud Shell セッションを起動するには、Cloud コンソール の [Cloud Shell をアクティブにする] アイコンをクリックします。これにより、Google Cloud コンソールの下部ペインでセッションが起動します。
- Cloud Shell ターミナルで次のコマンドを入力します。
wget https://raw.githubusercontent.com/GoogleCloudPlatform/application-integration-samples/main/assets/bq-sample-dataset.json
- サンプルデータがダウンロードされたことを確認するには、Cloud Shell ターミナルに次のコマンドを入力します。
ls -la bq-sample-dataset.json
ダウンロードしたファイルが Cloud Shell ターミナルに表示されます。
- サンプル データセットから 3 つのランダムなエントリを選択し、それらを統合に渡すように保存するには、Cloud Shell ターミナルで次のコマンドを入力します。
AUTH=$(gcloud auth print-access-token) export SAMPLE_DOCS=$(jq $(r=$((RANDOM % 1000)) ; echo ".[$r:$((r + 3))]") < bq-sample-dataset.json | jq -Rs '.') generate_post_data() { cat <<EOF { "triggerId": "api_trigger/process-records_API_1", "inputParameters": { "records": { "jsonValue": $SAMPLE_DOCS } } } EOF }
- テストを開始するには、Cloud Shell ターミナルで次のコマンドを入力します。
curl -X POST \ https://integrations.googleapis.com/v1/projects/project_id/locations/region/integrations/process-records:execute \ -H "Authorization: Bearer $AUTH" \ -H "Content-Type: application/json" \ -d "$(generate_post_data)"
このコマンドでは、次のように置き換えます。project_id
は、Google Cloud プロジェクトの ID に置き換えます。region
は、統合を作成したリージョンに置き換えます。
- BigQuery テーブルにこれらのレコードが含まれていることを確認するには、次の手順を実行します。
- Cloud コンソール ページで、 ナビゲーション メニューをクリックします。
- [分析] で、[BigQuery] をクリックします。
-
プロジェクトを展開し、
bq_tutorial
データセットをクリックします。 -
bq_tutorial データセットを展開し、
tutorial
テーブルをクリックします。 - [Table Explorer] タブをクリックして、挿入されたレコードを表示します。
次のステップ
他のコネクタとのインテグレーションを構築してみる。サポートされているすべてのコネクタのリストについては、コネクタ リファレンスをご覧ください。