For Each 並列タスクを使用して BigQuery にデータを挿入する

このチュートリアルでは、Apigee Integration とサブインテグレーションを作成し、一連のレコードを処理します。レコードごとに、メインインテグレーションが非同期的にサブインテグレーションを呼び出します。サブインテグレーションが各レコードのデータを取得し、BigQuery データセットのテーブルに行として挿入します。

このチュートリアルでは、次のタスクを行います。

始める前に

  • Apigee Integration にアクセスできることを確認します。
  • Google Cloud プロジェクトで次の操作を行います。

    • 接続の作成に使用するサービス アカウントに次のロールを付与します。
      • roles/bigquery.dataEditor
      • roles/bigquery.readSessionUser
      • roles/secretmanager.viewer
      • roles/secretmanager.secretAccessor
    • 次のサービスを有効にします。
      • secretmanager.googleapis.com(Secret Manager API)
      • connectors.googleapis.com(Connectors API)

      以前にプロジェクトでこうしたサービスを有効にしていない場合は、[接続を作成] ページで接続を作成するときに有効にするよう求められます。

BigQuery の接続を設定する

まず、このチュートリアルで使用する BigQuery データセットとテーブルを作成します。データセットとテーブルを作成したら、BigQuery 接続を作成します。この接続は、このチュートリアルの後半にインテグレーションで使用します。

BigQuery データセットとテーブルを設定する

BigQuery データセットとテーブルを設定するには、次の手順を行います。

  1. Cloud コンソール ページで、Google Cloud プロジェクトを選択します。
  2. Google Cloud コンソールから Cloud Shell セッションを起動するには、Cloud コンソールCloud Shell をアクティブにするアイコン [Cloud Shell をアクティブにする] アイコンをクリックします。これにより、Google Cloud コンソールの下部ペインでセッションが起動します。
  3. BigQuery API を有効にするには、Cloud Shell ターミナルで次のコマンドを入力します。
    export PROJECT_ID=project_id
    export REGION=region
    gcloud services enable --project "${PROJECT_ID}" \
        bigquery.googleapis.com \
        bigquerystorage.googleapis.com
    このコマンドで、次のように置き換えます。
    • project_id は、Google Cloud プロジェクトのプロジェクト ID に置き換えます。
    • region は、BigQuery データセットの作成に使用するリージョンに置き換えます。
  4. bq_tutorial という名前の BigQuery データセットを作成するには、Cloud Shell ターミナルで次のコマンドを入力します。
          bq  --project_id ${PROJECT_ID} --location ${REGION} mk bq_tutorial
        
  5. tutorial という名前の BigQuery テーブルを作成するには、Cloud Shell ターミナルで次のコマンドを入力します。
          bq --project_id ${PROJECT_ID} \
            query  \
            --nouse_legacy_sql \
          'create table bq_tutorial.tutorial (
          unique_key STRING NOT NULL,
          created_date STRING,
          closed_date STRING,
          agency STRING,
          agency_name STRING,
          complaint_type STRING,
          descriptor STRING,
          location_type STRING,
          incident_zip STRING,
          incident_address STRING,
          street_name STRING,
          cross_street_1 STRING,
          cross_street_2 STRING,
          intersection_street_1 STRING,
          intersection_street_2 STRING,
          address_type STRING,
          city STRING,
          landmark STRING,
          facility_type STRING,
          status STRING,
          due_date STRING,
          resolution_action_updated_date STRING,
          community_board STRING,
          borough STRING,
          x_coordinate_state_plane STRING,
          y_coordinate_state_plane STRING,
          park_facility_name STRING,
          park_borough STRING,
          school_name STRING,
          school_number STRING,
          school_region STRING,
          school_code STRING,
          school_phone_number STRING,
          school_address STRING,
          school_city STRING,
          school_state STRING,
          school_zip STRING,
          school_not_found STRING,
          school_or_citywide_complaint STRING,
          vehicle_type STRING,
          taxi_company_borough STRING,
          taxi_pick_up_location STRING,
          bridge_highway_name STRING,
          bridge_highway_direction STRING,
          bridge_highway_segment STRING,
          road_ramp STRING,
          garage_lot_name STRING,
          ferry_direction STRING,
          ferry_terminal_name STRING,
          latitude STRING,
          longitude STRING,
          location STRING
          ) '
      
  6. Verify that your BigQuery table is created.
    1. In the Cloud console page, click the Navigation menu.
    2. In the Analytics section, click BigQuery.
    3. Expand your project and confirm that the bq_tutorial dataset is listed.
    4. Expand the bq_tutorial dataset and confirm that the tutorial table is listed.
    5. Click the documents table to view the schema.

Create a BigQuery connection

Next, you'll create a BigQuery connection. A BigQuery connection lets you insert, read, update and delete rows in a BigQuery table and use the resulting output in an integration. After creating the BigQuery connection, you'll use this connection in an integration later in this tutorial to add rows to the BigQuery table.

To create a BigQuery connection, complete the following steps:

  1. In the Cloud console page, select your Google Cloud project.
  2. Open the connections page.
  3. Click + CREATE NEW to open the Create Connection page.
  4. Configure the connection:
    1. In the Create Connection section, complete the following:
      • Connector: Select BigQuery from the drop down list of available Connectors.
      • Connector version: Select the latest Connector version from the drop down list of available versions.
      • In the Connection Name field, enter a name for the Connection instance. For this tutorial, enter connector-bq-tutorial.
      • Optionally, add a Description of the connection instance.
      • Service Account: Select a service account that has the required roles.
      • Project ID: Enter the ID of the Google Cloud project where the BigQuery data resides.
      • Dataset ID: Enter the ID of the BigQuery dataset that you want to use. For this tutorial, enter bq_tutorial.
      • Optionally, click + ADD LABEL to add a label in the form of a key/value pair.
      • Click Next.
    2. Location: Select a region from where the connection will run. Supported regions for connectors include:

        For the list of all the supported regions, see Locations.

      • Click Next.
    3. Authentication: The BigQuery connection does not require authentication configuration. Click Next.
    4. Review: Review your connection's configuration details. In this section, the connection and authentication details of the new connection are displayed for your review.
  5. Click Create.

Set up a sub-integration

In this tutorial, the sub-integration takes each record sent to it by the main integration and inserts it as a row in the tutorial table in the bq_tutorial dataset.

Create a sub-integration

To create the sub-integration, complete the following steps:

  1. In the Apigee UI, select your Apigee Organization.
  2. Click Develop > Integrations.
  3. Click Create integration.
  4. In the Create Integration dialog, do the following:
    • Enter a name, for example, enter Process-each-record
    • Optionally, enter a description. For example, enter API Trigger to process each record (sub-integration)
    • Select the region where you want to create your integration.
  5. Click Create to open the integration editor.

Add an API Trigger

To add an API Trigger to the integration, do the following:

  1. In the integration editor, select Add a task/trigger > Triggers to display a list of available triggers.
  2. Drag the API Trigger element to the integration editor.

Add a Data Mapping task

To add a Data Mapping task in the integration, complete the following steps:

  1. Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
  2. Drag the Data Mapping element to the integration editor.

Configure the BigQuery connection

Now you are ready to use the BigQuery connection that you created earlier in the sub-integration. To configure the BigQuery connection in this integration, complete the following steps:

  1. Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
  2. Drag the Connectors element to the integration editor.
  3. Click the Connectors task element on the designer to view the task configuration pane.
  4. Click the edit icon on the right panel and update the Label to Insert row to BigQuery.
  5. Click Configure task.

    The Configure connector task dialog appears.

  6. In the Configure connector task dialog, do the following:
    1. Select the connection region where you created your BigQuery connection.
    2. Select the BigQuery connection that you want to use. For this tutorial, select connector-bq-tutorial.
    3. Once a connection is chosen, the Type column appears. Select Entities and then tutorial from the list of available entities.
    4. Once a type is chosen, the Operation column appears. Select Create.
    5. Click Done to complete the connection configuration and close the dialog.

Connect the integration elements

Next, add edge connections to connect the API Trigger to the Data Mapping task and the Data Mapping task to the Connectors task. An edge connection is a connection between any two elements in an integration. For more information on edges and edge conditions, see Edges.

To add the edge connections, complete the following steps:

  1. Click the Fork control point at the bottom of the API Trigger element. Drag and drop the edge connection at the Join control point at the top of the Data Mapping element.
  2. Click the Fork control point at the bottom of the Data Mapping element. Drag and drop the edge connection at the Join control point at the top of the Connectors element.

Configure the Data Mapping task

To configure the Data Mapping task, complete the following steps:

  1. In the integration editor, click the Data Mapping task to view the task configuration pane.
  2. Click Open Data Mapping Editor.
  3. In the Data Mapping Editor, click Add to add a new variable.
  4. In the Create Variable dialog, enter the following information:
    • Name: Enter record.
    • Data Type: Select JSON.
    • Schema: Select Infer from a sample JSON payload. Enter the following sample JSON payload:
                  {
                    "unique_key":"304271",
                    "created_date":"02/06/2007 12:00:00 AM",
                    "closed_date":"03/01/2007 12:00:00 AM",
                    "agency":"TLC",
                    "agency_name":"Taxi and Limousine Commission",
                    "complaint_type":"Taxi Complaint",
                    "descriptor":"Driver Complaint",
                    "location_type":"Street",
                    "incident_zip":"10001",
                    "incident_address":"",
                    "street_name":"",
                    "cross_street_1":"",
                    "cross_street_2":"",
                    "intersection_street_1":"WEST 29 STREET",
                    "intersection_street_2":"7 AVENUE",
                    "address_type":"INTERSECTION",
                    "city":"NEW YORK",
                    "landmark":"",
                    "facility_type":"N/A",
                    "status":"Closed",
                    "due_date":"02/28/2007 12:00:00 AM",
                    "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                    "community_board":"05 MANHATTAN",
                    "borough":"MANHATTAN",
                    "x_coordinate_state_plane":"986215",
                    "y_coordinate_state_plane":"211740",
                    "park_facility_name":"",
                    "park_borough":"MANHATTAN",
                    "school_name":"",
                    "school_number":"",
                    "school_region":"",
                    "school_code":"",
                    "school_phone_number":"",
                    "school_address":"",
                    "school_city":"",
                    "school_state":"",
                    "school_zip":"",
                    "school_not_found":"",
                    "school_or_citywide_complaint":"",
                    "vehicle_type":"",
                    "taxi_company_borough":"",
                    "taxi_pick_up_location":"Other",
                    "bridge_highway_name":"",
                    "bridge_highway_direction":"",
                    "road_ramp":"",
                    "bridge_highway_segment":"",
                    "garage_lot_name":"",
                    "ferry_direction":"",
                    "ferry_terminal_name":"",
                    "latitude":"40.74785373937869",
                    "longitude":"-73.99290823133913",
                    "location":"(40.74785373937869, -73.99290823133913)"
                  }
                
    • [Create] をクリックします。
    • 変数を作成したら、データ マッピング エディタで次の手順を実行します。
      • 新しい record 変数を [Input] 列にドラッグします。
      • connectorInputPayload 変数を [出力] 列にドラッグします。
    • データ マッピング エディタを閉じて統合エディタに戻ります。

サブインテグレーションを公開する

サブインテグレーションを公開するには、統合エディタで [Publish] をクリックします。

メインインテグレーションを設定する

このセクションでは、メインインテグレーションを設定します。ここでは、For Each Parallel タスクを使用して、各レコードを処理します。このメインインテグレーションが、各レコードに対してサブインテグレーションを 1 回呼び出します。

メインインテグレーションを作成する

メインインテグレーションを作成するには、次の手順を行います。

  1. Apigee UI で、Apigee 組織を選択します。
  2. [Develop] > [Integrations] の順にクリックします。
  3. [Create integration] をクリックします。
  4. [Create integration] ダイアログで、次の操作を行います。
    • 名前を入力します(例: process-records)。
    • 必要に応じて、説明を入力します。たとえば、「API Trigger to process records (main integration)」と入力します。
    • インテグレーションを作成するリージョンを選択します。
  5. [Create] をクリックして統合エディタを開きます。

API トリガーを追加する

インテグレーションに API トリガーを追加する手順は次のとおりです。

  1. 統合エディタで [Add a task/trigger] > [Triggers] の順に選択して、使用可能なトリガーのリストを表示します。
  2. [API Trigger] 要素を統合エディタにドラッグします。

For Each Parallel タスクを追加する

インテグレーションで For Each Parallel タスクを追加するには、次の手順を行います。

  1. 統合エディタで [+ Add a task/trigger] > [Tasks] の順に選択して、利用可能なタスクのリストを表示します。
  2. [For Each Parallel] 要素を統合エディタにドラッグします。

インテグレーションの要素を接続する

次に、API トリガーを For Each Parallel タスクに接続するエッジ接続を追加します。

エッジ接続を追加するには、API Trigger 要素の下部にある [Fork] コントロール ポイントをクリックします。For Each Parallel タスク要素の上部にある [Join] コントロール ポイントでエッジ接続をドラッグ&ドロップします。

For-Each Parallel タスクを構成する

For Each Parallel タスクを構成する手順は次のとおりです。

  1. 統合エディタで [For Each Parallel] タスクをクリックして、タスク構成ペインを表示します。
  2. [配列の選択] > [反復処理のためのリスト] で、[新しい変数を追加] をクリックして新しい変数を追加します。
  3. [変数の作成] ダイアログで、次の情報を入力します。
    • 名前: 「records」と入力します。
    • データ型: JSON を選択します。
    • スキーマ: [Infer from a sample JSON payload] を選択します。次のサンプル JSON ペイロードを入力します。
                    [{
                      "unique_key":"304271",
                      "created_date":"02/06/2007 12:00:00 AM",
                      "closed_date":"03/01/2007 12:00:00 AM",
                      "agency":"TLC",
                      "agency_name":"Taxi and Limousine Commission",
                      "complaint_type":"Taxi Complaint",
                      "descriptor":"Driver Complaint",
                      "location_type":"Street",
                      "incident_zip":"10001",
                      "incident_address":"",
                      "street_name":"",
                      "cross_street_1":"",
                      "cross_street_2":"",
                      "intersection_street_1":"WEST 29 STREET",
                      "intersection_street_2":"7 AVENUE",
                      "address_type":"INTERSECTION",
                      "city":"NEW YORK",
                      "landmark":"",
                      "facility_type":"N/A",
                      "status":"Closed",
                      "due_date":"02/28/2007 12:00:00 AM",
                      "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                      "community_board":"05 MANHATTAN",
                      "borough":"MANHATTAN",
                      "x_coordinate_state_plane":"986215",
                      "y_coordinate_state_plane":"211740",
                      "park_facility_name":"",
                      "park_borough":"MANHATTAN",
                      "school_name":"",
                      "school_number":"",
                      "school_region":"",
                      "school_code":"",
                      "school_phone_number":"",
                      "school_address":"",
                      "school_city":"",
                      "school_state":"",
                      "school_zip":"",
                      "school_not_found":"",
                      "school_or_citywide_complaint":"",
                      "vehicle_type":"",
                      "taxi_company_borough":"",
                      "taxi_pick_up_location":"Other",
                      "bridge_highway_name":"",
                      "bridge_highway_direction":"",
                      "road_ramp":"",
                      "bridge_highway_segment":"",
                      "garage_lot_name":"",
                      "ferry_direction":"",
                      "ferry_terminal_name":"",
                      "latitude":"40.74785373937869",
                      "longitude":"-73.99290823133913",
                      "location":"(40.74785373937869, -73.99290823133913)"
                    }]
                  
  4. [作成] をクリックします。
  5. [Sub-integration Details] セクションに、次の情報を入力します。
    • API トリガー ID: サブインテグレーションで API トリガー要素を選択します。たとえば、[Process-each-record_API_1] を選択します。
    • 実行戦略: [ASYNC] を選択します。
    • [Run a single integration] を選択します。
  6. [On each execution] セクションの [Where to map individual array elements] で、サブインテグレーションのデータ マッピング タスクの変数名を入力します。この場合は「record」と入力します。サブインテグレーション変数は、公開されているインテグレーションでのみ一覧表示されます。サブインテグレーションが公開されてから変数が表示されるまでに時間がかかるため、変数がリストにない場合はページを更新してください。

メインインテグレーションを公開する

メインインテグレーションを公開するには、統合エディタで [Publish] をクリックします。

Apigee API プロキシを作成する

インテグレーションをトリガーするには、インテグレーションをターゲットにした Apigee API プロキシを作成します。その方法は次のとおりです。

  1. Google Cloud プロジェクトでサービス アカウントを作成し、必要なロールである [Apigee Integration Invoker] を割り当てます。IAM ロールの割り当ての詳細については、IAM のロールと権限をご覧ください。
  2. Apigee UI にログインします。
  3. ナビゲーション バーで、[Develop] > [API Proxies] を選択します。
  4. [Create New] をクリックします。
  5. [Create Proxy] ウィザードで [Integration target] をクリックします。
  6. [Proxy details] ページで、次の情報を入力します。
    • 名前:process-records」と入力します。
    • ベースパス:/v1/process-records」と入力します。
    • インテグレーション リージョン: インテグレーションの作成に使用したリージョンを選択します。
    • インテグレーション ターゲット: 作成したメインインテグレーションを選択します。このチュートリアルでは、[process-records] を選択します。
    • トリガー: 必要な API トリガーを選択します。このチュートリアルでは、[process-records_API_1] を選択します。
    • エンドポイントの種類: [同期] を選択します。
  7. [次へ] をクリックし、[一般的なポリシー] ページでもう一度 [次へ] をクリックします。
  8. [Summary] ページで、[Create] をクリックします。
  9. プロキシが作成されたら、[Edit Proxy] をクリックします。
  10. [Develop] タブをクリックします。
  11. [Policies] の [Set Integration Request] ポリシーを次のポリシーに更新します。
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <SetIntegrationRequest continueOnError="false" enabled="true" name="set-integration-request">
      <DisplayName>Set Integration Request</DisplayName>
      <ProjectId>project_id</ProjectId>
      <IntegrationName>process-records</IntegrationName>
      <IntegrationRegion>region</IntegrationRegion>
      <ApiTrigger>api_trigger/process-records_API_1</ApiTrigger>
      <Parameters>
        <Parameter name="records" type="json" ref="request.content"/>
      </Parameters>
    </SetIntegrationRequest>
    次のように置き換えます。
    • project_id は、Google Cloud プロジェクトのプロジェクト ID に置き換えます。
    • region は、インテグレーションを作成したリージョンに置き換えます。
  12. 変更を保存します。
  13. [デプロイ] をクリックします。
  14. [Revision] で、更新されたリビジョンを選択します。
  15. [Environment] で、インテグレーションのデプロイに使用する環境を選択します。
  16. API プロキシをデプロイする際は、前に作成したサービス アカウントのメールアドレスを必要なロールとともに指定します。
  17. [デプロイ] をクリックします。

インテグレーションをテストする

インテグレーションをテストするには、次の手順を実行します。

  1. サンプルデータを Cloud Shell にダウンロードします。
    1. Google Cloud コンソールから Cloud Shell セッションを起動するには、Cloud コンソールCloud Shell をアクティブにするアイコン [Cloud Shell をアクティブにする] アイコンをクリックします。これにより、Google Cloud コンソールの下部ペインでセッションが起動します。
    2. Cloud Shell ターミナルで次のコマンドを入力します。
      wget https://raw.githubusercontent.com/GoogleCloudPlatform/application-integration-samples/main/assets/bq-sample-dataset.json
              
    3. サンプルデータがダウンロードされたことを確認するには、Cloud Shell ターミナルに次のコマンドを入力します。
      ls -la bq-sample-dataset.json
      ダウンロードしたファイルが Cloud Shell ターミナルに表示されます。
  2. Apigee プロキシでデバッグを有効にするには、次の操作を行います。
    1. Apigee UI に作成した API プロキシに戻ります。
    2. [Debug] タブをクリックします。
    3. [デバッグ セッションを開始] をクリックして、次の情報を入力します。
      1. デバッグ セッションを実行する環境を選択します。
      2. (省略可)[Filter] プルダウン リストから、作成するデバッグ セッションのすべてのトランザクションに適用するフィルタを選択します。デフォルトは None (All transactions) で、デバッグデータ内のすべてのトランザクションが含まれます。
      3. [Start] をクリックします。
  3. テストを開始するには、Cloud Shell ターミナルで次のコマンドを入力します。
    export APIGEE_DOMAIN=<your-Apigee-domain>
    export SAMPLE_DOCS=$(jq  $(r=$((RANDOM % 1000)) ; echo ".[$r:$((r + 3))]") < bq-sample-dataset.json)
            
    curl -X POST https://$APIGEE_DOMAIN/v1/process-records \
      -H 'Content-Type: application/json' \
      -d "$SAMPLE_DOCS"
          
    このテストでは、サンプル データセットから 3 つのランダムなエントリを選択し、メイン インテグレーションに渡します。メイン インテグレーションでは、各エントリがサブインテグレーションに渡され、データが BigQuery テーブルの行として追加されます。
  4. BigQuery テーブルにこれらのレコードが含まれていることを確認するには、次の手順を実行します。
    1. Cloud コンソール ページで、 ナビゲーション メニューをクリックします。
    2. [分析] で、[BigQuery] をクリックします。
    3. プロジェクトを開き、bq_tutorial データセットをクリックします。
    4. bq_tutorial データセットを開き、tutorial テーブルをクリックします。
    5. [Table Explorer] タブをクリックして、挿入されたレコードを表示します。

次のステップ

他のコネクタとのインテグレーションを構築してみる。サポートされているすべてのコネクタのリストについては、コネクタ リファレンスをご覧ください。