Application Integration でサポートされているコネクタをご覧ください。

For Each Parallel タスクを使用して BigQuery にデータを挿入する

このチュートリアルでは、Application Integration とサブ統合を作成し、一連のレコードを処理します。レコードごとに、メイン統合が非同期的にサブ統合を呼び出します。サブ統合が各レコードのデータを取得し、BigQuery データセットのテーブルに行として挿入します。

このチュートリアルでは、次のタスクを行います。

準備

  • Application Integration にアクセスできることを確認します。
  • Google Cloud プロジェクトで次の操作を行います。

    • 接続の作成に使用するサービス アカウントに次のロールを付与します。
      • roles/bigquery.dataEditor
      • roles/bigquery.readSessionUser
      • roles/secretmanager.viewer
      • roles/secretmanager.secretAccessor
    • 次のサービスを有効にします。
      • secretmanager.googleapis.com(Secret Manager API)
      • connectors.googleapis.com(Connectors API)

      以前にプロジェクトでこうしたサービスを有効にしていない場合は、[接続を作成] ページで接続を作成するときに有効にするよう求められます。

BigQuery の接続を設定する

まず、このチュートリアルで使用する BigQuery データセットとテーブルを作成します。データセットとテーブルを作成したら、BigQuery 接続を作成します。この接続は、このチュートリアルの後半にインテグレーションで使用します。

BigQuery データセットとテーブルを設定する

BigQuery データセットとテーブルを設定するには、次の手順を行います。

  1. Cloud コンソール ページで、Google Cloud プロジェクトを選択します。
  2. Google Cloud コンソールから Cloud Shell セッションを起動するには、Cloud コンソールCloud Shell をアクティブにするアイコン [Cloud Shell をアクティブにする] アイコンをクリックします。これにより、Google Cloud コンソールの下部ペインでセッションが起動します。
  3. BigQuery API を有効にするには、Cloud Shell ターミナルで次のコマンドを入力します。
    export PROJECT_ID=project_id
    export REGION=region
    gcloud services enable --project "${PROJECT_ID}" \
        bigquery.googleapis.com \
        bigquerystorage.googleapis.com
    このコマンドでは、次のように置き換えます。
    • project_id は、Google Cloud プロジェクトのプロジェクト ID に置き換えます。
    • region は、BigQuery データセットの作成に使用するリージョンに置き換えます。
  4. bq_tutorial という名前の BigQuery データセットを作成するには、Cloud Shell ターミナルで次のコマンドを入力します。
          bq  --project_id ${PROJECT_ID} --location ${REGION} mk bq_tutorial
        
  5. tutorial という名前の BigQuery テーブルを作成するには、Cloud Shell ターミナルで次のコマンドを入力します。
          bq --project_id ${PROJECT_ID} \
            query  \
            --nouse_legacy_sql \
          'create table bq_tutorial.tutorial (
          unique_key STRING NOT NULL,
          created_date STRING,
          closed_date STRING,
          agency STRING,
          agency_name STRING,
          complaint_type STRING,
          descriptor STRING,
          location_type STRING,
          incident_zip STRING,
          incident_address STRING,
          street_name STRING,
          cross_street_1 STRING,
          cross_street_2 STRING,
          intersection_street_1 STRING,
          intersection_street_2 STRING,
          address_type STRING,
          city STRING,
          landmark STRING,
          facility_type STRING,
          status STRING,
          due_date STRING,
          resolution_action_updated_date STRING,
          community_board STRING,
          borough STRING,
          x_coordinate_state_plane STRING,
          y_coordinate_state_plane STRING,
          park_facility_name STRING,
          park_borough STRING,
          school_name STRING,
          school_number STRING,
          school_region STRING,
          school_code STRING,
          school_phone_number STRING,
          school_address STRING,
          school_city STRING,
          school_state STRING,
          school_zip STRING,
          school_not_found STRING,
          school_or_citywide_complaint STRING,
          vehicle_type STRING,
          taxi_company_borough STRING,
          taxi_pick_up_location STRING,
          bridge_highway_name STRING,
          bridge_highway_direction STRING,
          bridge_highway_segment STRING,
          road_ramp STRING,
          garage_lot_name STRING,
          ferry_direction STRING,
          ferry_terminal_name STRING,
          latitude STRING,
          longitude STRING,
          location STRING
          ) '
      
  6. Verify that your BigQuery table is created.
    1. In the Cloud console page, click the Navigation menu.
    2. In the Analytics section, click BigQuery.
    3. Expand your project and confirm that the bq_tutorial dataset is listed.
    4. Expand the bq_tutorial dataset and confirm that the tutorial table is listed.
    5. Click the documents table to view the schema.

Create a BigQuery connection

Next, you'll create a BigQuery connection. A BigQuery connection lets you insert, read, update and delete rows in a BigQuery table and use the resulting output in an integration. After creating the BigQuery connection, you'll use this connection in an integration later in this tutorial to add rows to the BigQuery table.

To create a BigQuery connection, complete the following steps:

  1. In the Cloud console page, select your Google Cloud project.
  2. Open the connections page.
  3. Click + CREATE NEW to open the Create Connection page.
  4. Configure the connection:
    1. In the Create Connection section, complete the following:
      • Connector: Select BigQuery from the drop down list of available Connectors.
      • Connector version: Select the latest Connector version from the drop down list of available versions.
      • In the Connection Name field, enter a name for the Connection instance. For this tutorial, enter connector-bq-tutorial.
      • Optionally, add a Description of the connection instance.
      • Service Account: Select a service account that has the required roles.
      • Project ID: Enter the ID of the Google Cloud project where the BigQuery data resides.
      • Dataset ID: Enter the ID of the BigQuery dataset that you want to use. For this tutorial, enter bq_tutorial.
      • Optionally, click + ADD LABEL to add a label in the form of a key/value pair.
      • Click Next.
    2. Location: Select a region from where the connection will run. Supported regions for connectors include:

        For the list of all the supported regions, see Locations.

      • Click Next.
    3. Authentication: The BigQuery connection does not require authentication configuration. Click Next.
    4. Review: Review your connection's configuration details. In this section, the connection and authentication details of the new connection are displayed for your review.
  5. Click Create.

Set up a sub-integration

In this tutorial, the sub-integration takes each record sent to it by the main integration and inserts it as a row in the tutorial table in the bq_tutorial dataset.

Create a sub-integration

To create the sub-integration, complete the following steps:

  1. In the Google Cloud console, go to the Application Integration page.

    Go to Application Integration

  2. In the navigation menu, click Integrations. The Integrations List page appears.
  3. Click Create integration.
  4. In the Create Integration dialog, do the following:
    • Enter a name, for example, enter Process-each-record
    • Optionally, enter a description. For example, enter API Trigger to process each record (sub-integration)
    • Select the region where you want to create your integration.
  5. Click Create to open the integration editor.

Add an API Trigger

To add an API Trigger to the integration, do the following:

  1. In the integration editor, select Add a task/trigger > Triggers to display a list of available triggers.
  2. Drag the API Trigger element to the integration editor.

Add a Data Mapping task

To add a Data Mapping task in the integration, complete the following steps:

  1. Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
  2. Drag the Data Mapping element to the integration editor.

Configure the BigQuery connection

Now you are ready to use the BigQuery connection that you created earlier in the sub-integration. To configure the BigQuery connection in this integration, complete the following steps:

  1. Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
  2. Drag the Connectors element to the integration editor.
  3. Click the Connectors task element on the designer to view the task configuration pane.
  4. Click the edit icon on the right panel and update the Label to Insert row to BigQuery.
  5. Click Configure task.

    The Configure connector task dialog appears.

  6. In the Configure connector task dialog, do the following:
    1. Select the connection region where you created your BigQuery connection.
    2. Select the BigQuery connection that you want to use. For this tutorial, select connector-bq-tutorial.
    3. Once a connection is chosen, the Type column appears. Select Entities and then tutorial from the list of available entities.
    4. Once a type is chosen, the Operation column appears. Select Create.
    5. Click Done to complete the connection configuration and close the dialog.

Connect the integration elements

Next, add edge connections to connect the API Trigger to the Data Mapping task and the Data Mapping task to the Connectors task. An edge connection is a connection between any two elements in an integration. For more information on edges and edge conditions, see Edges.

To add the edge connections, complete the following steps:

  1. Click the Fork control point at the bottom of the API Trigger element. Drag and drop the edge connection at the Join control point at the top of the Data Mapping element.
  2. Click the Fork control point at the bottom of the Data Mapping element. Drag and drop the edge connection at the Join control point at the top of the Connectors element.

Configure the Data Mapping task

To configure the Data Mapping task, complete the following steps:

  1. In the integration editor, click the Data Mapping task to view the task configuration pane.
  2. Click Open Data Mapping Editor.
  3. In the Data Mapping Editor, click Add to add a new variable.
  4. In the Create Variable dialog, enter the following information:
    • Name: Enter record.
    • Data Type: Select JSON.
    • Schema: Select Infer from a sample JSON payload. Enter the following sample JSON payload:
                  {
                    "unique_key":"304271",
                    "created_date":"02/06/2007 12:00:00 AM",
                    "closed_date":"03/01/2007 12:00:00 AM",
                    "agency":"TLC",
                    "agency_name":"Taxi and Limousine Commission",
                    "complaint_type":"Taxi Complaint",
                    "descriptor":"Driver Complaint",
                    "location_type":"Street",
                    "incident_zip":"10001",
                    "incident_address":"",
                    "street_name":"",
                    "cross_street_1":"",
                    "cross_street_2":"",
                    "intersection_street_1":"WEST 29 STREET",
                    "intersection_street_2":"7 AVENUE",
                    "address_type":"INTERSECTION",
                    "city":"NEW YORK",
                    "landmark":"",
                    "facility_type":"N/A",
                    "status":"Closed",
                    "due_date":"02/28/2007 12:00:00 AM",
                    "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                    "community_board":"05 MANHATTAN",
                    "borough":"MANHATTAN",
                    "x_coordinate_state_plane":"986215",
                    "y_coordinate_state_plane":"211740",
                    "park_facility_name":"",
                    "park_borough":"MANHATTAN",
                    "school_name":"",
                    "school_number":"",
                    "school_region":"",
                    "school_code":"",
                    "school_phone_number":"",
                    "school_address":"",
                    "school_city":"",
                    "school_state":"",
                    "school_zip":"",
                    "school_not_found":"",
                    "school_or_citywide_complaint":"",
                    "vehicle_type":"",
                    "taxi_company_borough":"",
                    "taxi_pick_up_location":"Other",
                    "bridge_highway_name":"",
                    "bridge_highway_direction":"",
                    "road_ramp":"",
                    "bridge_highway_segment":"",
                    "garage_lot_name":"",
                    "ferry_direction":"",
                    "ferry_terminal_name":"",
                    "latitude":"40.74785373937869",
                    "longitude":"-73.99290823133913",
                    "location":"(40.74785373937869, -73.99290823133913)"
                  }
                
    • [作成] をクリックします。
    • 変数を作成したら、データ マッピング エディタで次の手順を実行します。
      • 新しい record 変数を [Input] 列にドラッグします。
      • connectorInputPayload 変数を [connectorInputPayload] 列にドラッグします。
    • データ マッピング エディタを閉じて統合デザイナーに戻ります。

サブインテグレーションを公開する

サブインテグレーションを公開するには、統合エディタで [Publish] をクリックします。

メインインテグレーションを設定する

このセクションでは、メインインテグレーションを設定します。ここでは、For Each Parallel タスクを使用して、各レコードを処理します。このメインインテグレーションが、各レコードに対してサブインテグレーションを 1 回呼び出します。

メインインテグレーションを作成する

メイン統合を作成するには、次の手順を行います。

  1. Google Cloud コンソールで [Application Integration] ページに移動します。

    Application Integration に移動

  2. ナビゲーション メニューで [統合] をクリックします。[統合リスト] ページが表示されます。
  3. [統合を作成] をクリックします。
  4. [Create integration] ダイアログで、次の操作を行います。
    • 名前を入力します(例: process-records)。
    • 必要に応じて、説明を入力します。たとえば、「API Trigger to process records (main integration)」と入力します。
    • インテグレーションを作成するリージョンを選択します。
  5. [作成] をクリックして統合エディタを開きます。

API トリガーを追加する

インテグレーションに API トリガーを追加する手順は次のとおりです。

  1. 統合エディタで、[タスク / トリガーを追加] > [トリガー] を選択して、使用可能なトリガーのリストを表示します。
  2. [API Trigger] 要素を統合エディタにドラッグします。

For Each Parallel タスクを追加する

インテグレーションで For Each Parallel タスクを追加するには、次の手順を行います。

  1. 統合エディタで [+ Add a task/trigger] > [Tasks] の順に選択して、利用可能なタスクのリストを表示します。
  2. [For Each Parallel] 要素を統合エディタにドラッグします。

インテグレーションの要素を接続する

次に、API トリガーを For Each Parallel タスクに接続するエッジ接続を追加します。

エッジ接続を追加するには、API Trigger 要素の下部にある [Fork] コントロール ポイントをクリックします。For Each Parallel タスク要素の上部にある [Join] コントロール ポイントでエッジ接続をドラッグ&ドロップします。

For-Each Parallel タスクを構成する

For Each Parallel タスクを構成する手順は次のとおりです。

  1. 統合エディタで [For Each Parallel] タスクをクリックして、タスク構成ペインを表示します。
  2. [配列の選択] > [反復処理のためのリスト] で、[新しい変数を追加] をクリックして新しい変数を追加します。
  3. [変数の作成] ダイアログで、次の情報を入力します。
    • 名前: 「records」と入力します。
    • データ型: JSON を選択します。
    • スキーマ: [Infer from a sample JSON payload] を選択します。次のサンプル JSON ペイロードを入力します。
                    [{
                      "unique_key":"304271",
                      "created_date":"02/06/2007 12:00:00 AM",
                      "closed_date":"03/01/2007 12:00:00 AM",
                      "agency":"TLC",
                      "agency_name":"Taxi and Limousine Commission",
                      "complaint_type":"Taxi Complaint",
                      "descriptor":"Driver Complaint",
                      "location_type":"Street",
                      "incident_zip":"10001",
                      "incident_address":"",
                      "street_name":"",
                      "cross_street_1":"",
                      "cross_street_2":"",
                      "intersection_street_1":"WEST 29 STREET",
                      "intersection_street_2":"7 AVENUE",
                      "address_type":"INTERSECTION",
                      "city":"NEW YORK",
                      "landmark":"",
                      "facility_type":"N/A",
                      "status":"Closed",
                      "due_date":"02/28/2007 12:00:00 AM",
                      "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                      "community_board":"05 MANHATTAN",
                      "borough":"MANHATTAN",
                      "x_coordinate_state_plane":"986215",
                      "y_coordinate_state_plane":"211740",
                      "park_facility_name":"",
                      "park_borough":"MANHATTAN",
                      "school_name":"",
                      "school_number":"",
                      "school_region":"",
                      "school_code":"",
                      "school_phone_number":"",
                      "school_address":"",
                      "school_city":"",
                      "school_state":"",
                      "school_zip":"",
                      "school_not_found":"",
                      "school_or_citywide_complaint":"",
                      "vehicle_type":"",
                      "taxi_company_borough":"",
                      "taxi_pick_up_location":"Other",
                      "bridge_highway_name":"",
                      "bridge_highway_direction":"",
                      "road_ramp":"",
                      "bridge_highway_segment":"",
                      "garage_lot_name":"",
                      "ferry_direction":"",
                      "ferry_terminal_name":"",
                      "latitude":"40.74785373937869",
                      "longitude":"-73.99290823133913",
                      "location":"(40.74785373937869, -73.99290823133913)"
                    }]
                  
  4. [作成] をクリックします。
  5. [Sub-integration Details] セクションに、次の情報を入力します。
    • API トリガー ID: サブインテグレーションで API トリガー要素を選択します。たとえば、[Process-each-record_API_1] を選択します。
    • 実行戦略: [ASYNC] を選択します。
    • [Run a single integration] を選択します。
  6. [On each execution] セクションの [Where to map individual array elements] で、サブ統合のデータ マッピング タスクの変数名を入力します。この場合は「record」と入力します。 サブインテグレーション変数は、公開されているインテグレーションでのみ一覧表示されます。変数がリストにない場合は、サブ統合が公開されてから変数が表示されるまでに時間がかかるため、ページを更新してください。

メインインテグレーションを公開する

メイン統合を公開するには、統合エディタで [Publish] をクリックします。

インテグレーションをテストする

インテグレーションをテストするには、次の手順を実行します。

  1. サンプルデータを Cloud Shell にダウンロードします。
    1. Google Cloud コンソールから Cloud Shell セッションを起動するには、Cloud コンソールCloud Shell をアクティブにするアイコン [Cloud Shell をアクティブにする] アイコンをクリックします。これにより、Google Cloud コンソールの下部ペインでセッションが起動します。
    2. Cloud Shell ターミナルで次のコマンドを入力します。
      wget https://raw.githubusercontent.com/GoogleCloudPlatform/application-integration-samples/main/assets/bq-sample-dataset.json
              
    3. サンプルデータがダウンロードされたことを確認するには、Cloud Shell ターミナルに次のコマンドを入力します。
      ls -la bq-sample-dataset.json
      ダウンロードしたファイルが Cloud Shell ターミナルに表示されます。
  2. サンプル データセットから 3 つのランダムなエントリを選択し、それらを統合に渡せる方法で保存するには、Cloud Shell ターミナルで次のコマンドを入力します。
    AUTH=$(gcloud auth print-access-token)
    export SAMPLE_DOCS=$(jq $(r=$((RANDOM % 1000)) ; echo ".[$r:$((r + 3))]") < bq-sample-dataset.json | jq -Rs '.')
                
    generate_post_data()
      {
        cat <<EOF
          {
            "triggerId": "api_trigger/process-records_API_1",
            "inputParameters": 
              {
                "records": 
                  {
                    "jsonValue": $SAMPLE_DOCS
                  }
              }
          }
          EOF
      }
  3. テストを開始するには、Cloud Shell ターミナルで次のコマンドを入力します。
    curl -X POST \
      https://integrations.googleapis.com/v1/projects/project_id/locations/region/integrations/process-records:execute \
      -H "Authorization: Bearer $AUTH" \
      -H "Content-Type: application/json" \
      -d "$(generate_post_data)"
    このコマンドでは、次のように置き換えます。
    • project_id は、Google Cloud プロジェクトの ID に置き換えます。
    • region は、統合を作成したリージョンに置き換えます。
    このコマンドは、メイン統合を呼び出し、サンプル データセットからメイン統合にエントリを渡します。メイン統合では、各エントリがサブ統合に渡され、データが BigQuery テーブルの行として追加されます。
  4. BigQuery テーブルにこれらのレコードが含まれていることを確認するには、次の手順を実行します。
    1. Cloud コンソール ページで、 ナビゲーション メニューをクリックします。
    2. [分析] で、[BigQuery] をクリックします。
    3. プロジェクトを展開し、bq_tutorial データセットをクリックします。
    4. bq_tutorial データセットを展開し、tutorial テーブルをクリックします。
    5. [Table Explorer] タブをクリックして、挿入されたレコードを表示します。

次のステップ

他のコネクタとのインテグレーションを構築してみる。サポートされているすべてのコネクタのリストについては、コネクタ リファレンスをご覧ください。