For Each 병렬 태스크를 사용하여 BigQuery에 데이터 삽입

이 튜토리얼에서는 Apigee Integration 및 하위 통합을 만들어서 일련의 레코드를 처리합니다. 각 레코드에서 기본 통합은 하위 통합을 비동기적으로 호출하여, 각 레코드에 대해 데이터를 가져오고 이를 BigQuery 데이터 세트의 테이블에 행으로 삽입합니다.

이 튜토리얼에서는 다음 태스크를 완료합니다.

시작하기 전에

  • Apigee Integration에 대한 액세스 권한이 있는지 확인합니다.
  • Google Cloud 프로젝트에서 다음을 수행합니다.

    • 연결을 만드는 데 사용할 서비스 계정에 다음 역할을 부여합니다.
      • roles/bigquery.dataEditor
      • roles/bigquery.readSessionUser
      • roles/secretmanager.viewer
      • roles/secretmanager.secretAccessor
    • 다음 서비스를 사용 설정합니다.
      • secretmanager.googleapis.com(Secret Manager API)
      • connectors.googleapis.com(Connectors API)

      이러한 서비스가 이전에 프로젝트에 대해 사용 설정되지 않았으면 연결 만들기 페이지에서 연결을 만들 때 서비스를 사용 설정하라는 메시지가 표시됩니다.

BigQuery 연결 설정

먼저 이 튜토리얼에서 사용할 BigQuery 데이터 세트 및 테이블을 만듭니다. 데이터 세트와 테이블을 만든 후 BigQuery 연결을 만듭니다. 이 튜토리얼의 뒷 부분에서 통합에 이 연결을 사용합니다.

BigQuery 데이터 세트 및 테이블 설정

BigQuery 데이터 세트 및 테이블을 설정하려면 다음 단계를 수행합니다.

  1. Cloud 콘솔 페이지에서 Google Cloud 프로젝트를 선택합니다.
  2. Google Cloud 콘솔에서 Cloud Shell 세션을 실행하려면 Cloud 콘솔에서 Cloud Shell 활성화 아이콘 Cloud Shell 활성화 아이콘을 클릭합니다. 그러면 Google Cloud 콘솔 하단 창에서 세션이 시작됩니다.
  3. BigQuery API를 사용 설정하려면 Cloud Shell 터미널에 다음 명령어를 입력합니다.
    export PROJECT_ID=project_id
    export REGION=region
    gcloud services enable --project "${PROJECT_ID}" \
        bigquery.googleapis.com \
        bigquerystorage.googleapis.com
    이 명령어에서 다음을 바꿉니다.
    • project_id를 Google Cloud 프로젝트의 프로젝트 ID로 바꿉니다.
    • region을 BigQuery 데이터 세트를 만드는 데 사용할 리전으로 바꿉니다.
  4. 이름이 bq_tutorial인 BigQuery 데이터 세트를 만들기 위해 Cloud Shell 터미널에서 다음 명령어를 입력합니다.
          bq  --project_id ${PROJECT_ID} --location ${REGION} mk bq_tutorial
        
  5. 이름이 tutorial인 BigQuery 테이블을 만들기 위해 Cloud Shell 터미널에서 다음 명령어를 입력합니다.
          bq --project_id ${PROJECT_ID} \
            query  \
            --nouse_legacy_sql \
          'create table bq_tutorial.tutorial (
          unique_key STRING NOT NULL,
          created_date STRING,
          closed_date STRING,
          agency STRING,
          agency_name STRING,
          complaint_type STRING,
          descriptor STRING,
          location_type STRING,
          incident_zip STRING,
          incident_address STRING,
          street_name STRING,
          cross_street_1 STRING,
          cross_street_2 STRING,
          intersection_street_1 STRING,
          intersection_street_2 STRING,
          address_type STRING,
          city STRING,
          landmark STRING,
          facility_type STRING,
          status STRING,
          due_date STRING,
          resolution_action_updated_date STRING,
          community_board STRING,
          borough STRING,
          x_coordinate_state_plane STRING,
          y_coordinate_state_plane STRING,
          park_facility_name STRING,
          park_borough STRING,
          school_name STRING,
          school_number STRING,
          school_region STRING,
          school_code STRING,
          school_phone_number STRING,
          school_address STRING,
          school_city STRING,
          school_state STRING,
          school_zip STRING,
          school_not_found STRING,
          school_or_citywide_complaint STRING,
          vehicle_type STRING,
          taxi_company_borough STRING,
          taxi_pick_up_location STRING,
          bridge_highway_name STRING,
          bridge_highway_direction STRING,
          bridge_highway_segment STRING,
          road_ramp STRING,
          garage_lot_name STRING,
          ferry_direction STRING,
          ferry_terminal_name STRING,
          latitude STRING,
          longitude STRING,
          location STRING
          ) '
      
  6. Verify that your BigQuery table is created.
    1. In the Cloud console page, click the Navigation menu.
    2. In the Analytics section, click BigQuery.
    3. Expand your project and confirm that the bq_tutorial dataset is listed.
    4. Expand the bq_tutorial dataset and confirm that the tutorial table is listed.
    5. Click the documents table to view the schema.

Create a BigQuery connection

Next, you'll create a BigQuery connection. A BigQuery connection lets you insert, read, update and delete rows in a BigQuery table and use the resulting output in an integration. After creating the BigQuery connection, you'll use this connection in an integration later in this tutorial to add rows to the BigQuery table.

To create a BigQuery connection, complete the following steps:

  1. In the Cloud console page, select your Google Cloud project.
  2. Open the connections page.
  3. Click + CREATE NEW to open the Create Connection page.
  4. Configure the connection:
    1. In the Create Connection section, complete the following:
      • Connector: Select BigQuery from the drop down list of available Connectors.
      • Connector version: Select the latest Connector version from the drop down list of available versions.
      • In the Connection Name field, enter a name for the Connection instance. For this tutorial, enter connector-bq-tutorial.
      • Optionally, add a Description of the connection instance.
      • Service Account: Select a service account that has the required roles.
      • Project ID: Enter the ID of the Google Cloud project where the BigQuery data resides.
      • Dataset ID: Enter the ID of the BigQuery dataset that you want to use. For this tutorial, enter bq_tutorial.
      • Optionally, click + ADD LABEL to add a label in the form of a key/value pair.
      • Click Next.
    2. Location: Select a region from where the connection will run. Supported regions for connectors include:

        For the list of all the supported regions, see Locations.

      • Click Next.
    3. Authentication: The BigQuery connection does not require authentication configuration. Click Next.
    4. Review: Review your connection's configuration details. In this section, the connection and authentication details of the new connection are displayed for your review.
  5. Click Create.

Set up a sub-integration

In this tutorial, the sub-integration takes each record sent to it by the main integration and inserts it as a row in the tutorial table in the bq_tutorial dataset.

Create a sub-integration

To create the sub-integration, complete the following steps:

  1. In the Apigee UI, select your Apigee Organization.
  2. Click Develop > Integrations.
  3. Click Create integration.
  4. In the Create Integration dialog, do the following:
    • Enter a name, for example, enter Process-each-record
    • Optionally, enter a description. For example, enter API Trigger to process each record (sub-integration)
    • Select the region where you want to create your integration.
  5. Click Create to open the integration editor.

Add an API Trigger

To add an API Trigger to the integration, do the following:

  1. In the integration editor, select Add a task/trigger > Triggers to display a list of available triggers.
  2. Drag the API Trigger element to the integration editor.

Add a Data Mapping task

To add a Data Mapping task in the integration, complete the following steps:

  1. Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
  2. Drag the Data Mapping element to the integration editor.

Configure the BigQuery connection

Now you are ready to use the BigQuery connection that you created earlier in the sub-integration. To configure the BigQuery connection in this integration, complete the following steps:

  1. Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
  2. Drag the Connectors element to the integration editor.
  3. Click the Connectors task element on the designer to view the task configuration pane.
  4. Click the edit icon on the right panel and update the Label to Insert row to BigQuery.
  5. Click Configure task.

    The Configure connector task dialog appears.

  6. In the Configure connector task dialog, do the following:
    1. Select the connection region where you created your BigQuery connection.
    2. Select the BigQuery connection that you want to use. For this tutorial, select connector-bq-tutorial.
    3. Once a connection is chosen, the Type column appears. Select Entities and then tutorial from the list of available entities.
    4. Once a type is chosen, the Operation column appears. Select Create.
    5. Click Done to complete the connection configuration and close the dialog.

Connect the integration elements

Next, add edge connections to connect the API Trigger to the Data Mapping task and the Data Mapping task to the Connectors task. An edge connection is a connection between any two elements in an integration. For more information on edges and edge conditions, see Edges.

To add the edge connections, complete the following steps:

  1. Click the Fork control point at the bottom of the API Trigger element. Drag and drop the edge connection at the Join control point at the top of the Data Mapping element.
  2. Click the Fork control point at the bottom of the Data Mapping element. Drag and drop the edge connection at the Join control point at the top of the Connectors element.

Configure the Data Mapping task

To configure the Data Mapping task, complete the following steps:

  1. In the integration editor, click the Data Mapping task to view the task configuration pane.
  2. Click Open Data Mapping Editor.
  3. In the Data Mapping Editor, click Add to add a new variable.
  4. In the Create Variable dialog, enter the following information:
    • Name: Enter record.
    • Data Type: Select JSON.
    • Schema: Select Infer from a sample JSON payload. Enter the following sample JSON payload:
                  {
                    "unique_key":"304271",
                    "created_date":"02/06/2007 12:00:00 AM",
                    "closed_date":"03/01/2007 12:00:00 AM",
                    "agency":"TLC",
                    "agency_name":"Taxi and Limousine Commission",
                    "complaint_type":"Taxi Complaint",
                    "descriptor":"Driver Complaint",
                    "location_type":"Street",
                    "incident_zip":"10001",
                    "incident_address":"",
                    "street_name":"",
                    "cross_street_1":"",
                    "cross_street_2":"",
                    "intersection_street_1":"WEST 29 STREET",
                    "intersection_street_2":"7 AVENUE",
                    "address_type":"INTERSECTION",
                    "city":"NEW YORK",
                    "landmark":"",
                    "facility_type":"N/A",
                    "status":"Closed",
                    "due_date":"02/28/2007 12:00:00 AM",
                    "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                    "community_board":"05 MANHATTAN",
                    "borough":"MANHATTAN",
                    "x_coordinate_state_plane":"986215",
                    "y_coordinate_state_plane":"211740",
                    "park_facility_name":"",
                    "park_borough":"MANHATTAN",
                    "school_name":"",
                    "school_number":"",
                    "school_region":"",
                    "school_code":"",
                    "school_phone_number":"",
                    "school_address":"",
                    "school_city":"",
                    "school_state":"",
                    "school_zip":"",
                    "school_not_found":"",
                    "school_or_citywide_complaint":"",
                    "vehicle_type":"",
                    "taxi_company_borough":"",
                    "taxi_pick_up_location":"Other",
                    "bridge_highway_name":"",
                    "bridge_highway_direction":"",
                    "road_ramp":"",
                    "bridge_highway_segment":"",
                    "garage_lot_name":"",
                    "ferry_direction":"",
                    "ferry_terminal_name":"",
                    "latitude":"40.74785373937869",
                    "longitude":"-73.99290823133913",
                    "location":"(40.74785373937869, -73.99290823133913)"
                  }
                
    • 만들기를 클릭합니다.
    • 변수가 생성되면 데이터 매핑 편집기에서 다음 단계를 완료합니다.
      • 레코드 변수를 입력 열로 드래그합니다.
      • connectorInputPayload 변수를 출력 열로 드래그합니다.
    • 데이터 매핑 편집기를 닫고 통합 편집기로 돌아갑니다.

하위 통합 게시

하위 통합을 게시하려면 통합 편집기에서 게시를 클릭합니다.

기본 통합 설정

이 섹션에서는 각 레코드 처리를 위해 For Each 병렬 태스크를 사용하는 기본 통합을 설정합니다. 그런 다음 이 기본 통합이 각 레코드에 대해 한 번씩 하위 통합을 호출합니다.

기본 통합 만들기

기본 통합을 만들려면 다음 단계를 완료합니다.

  1. Apigee UI에서 Apigee 조직을 선택합니다.
  2. 개발 > 통합을 클릭합니다.
  3. 통합 만들기를 클릭합니다.
  4. 통합 만들기 대화상자에서 다음을 수행합니다.
    • 이름을 입력합니다. 예를 들어 process-records를 입력합니다.
    • 선택적으로 설명을 입력합니다. 예를 들어 레코드 처리를 위한 API 트리거(기본 통합)를 입력합니다.
    • 통합을 만들려는 리전을 선택합니다.
  5. 만들기를 클릭하여 통합 편집기를 엽니다.

API 트리거 추가

통합에 API 트리거를 추가하려면 다음을 수행합니다.

  1. 통합 편집기에서 태스크/트리거 추가 > 트리거를 선택하여 사용 가능한 트리거 목록을 표시합니다.
  2. API 트리거 요소를 통합 편집기로 드래그합니다.

For Each 병렬 태스크 추가

통합에서 For Each 병렬 태스크를 추가하려면 다음 단계를 완료합니다.

  1. 통합 편집기에서 +태스크/트리거 추가 > 태스크를 선택하여 사용 가능한 태스크 목록을 표시합니다.
  2. For Each 병렬 태스크 요소를 통합 편집기로 드래그합니다.

통합 요소 연결

다음으로 에지 연결을 추가하여 API 트리거를 For Each 병렬 태스크에 연결합니다.

에지 연결을 추가하려면 API 트리거 요소 하단에서 포크 제어 지점을 클릭합니다. For Each 병렬 태스크 요소 상단의 조인 제어 지점에서 에지 연결을 드래그 앤 드롭합니다.

For Each 병렬 태스크 구성

For Each 병렬 태스크를 구성하려면 다음 단계를 완료합니다.

  1. 통합 편집기에서 For Each 병렬 태스크를 클릭하여 태스크 구성 창을 봅니다.
  2. 배열 선택 > 반복할 목록에서 새 변수 추가를 클릭하여 새 변수를 추가합니다.
  3. 변수 만들기 대화상자에 다음 정보를 입력합니다.
    • 이름: records를 입력합니다.
    • 데이터 유형: JSON을 선택합니다.
    • 스키마: 샘플 JSON 페이로드에서 유추를 선택합니다. 다음 샘플 JSON 페이로드를 입력합니다.
                    [{
                      "unique_key":"304271",
                      "created_date":"02/06/2007 12:00:00 AM",
                      "closed_date":"03/01/2007 12:00:00 AM",
                      "agency":"TLC",
                      "agency_name":"Taxi and Limousine Commission",
                      "complaint_type":"Taxi Complaint",
                      "descriptor":"Driver Complaint",
                      "location_type":"Street",
                      "incident_zip":"10001",
                      "incident_address":"",
                      "street_name":"",
                      "cross_street_1":"",
                      "cross_street_2":"",
                      "intersection_street_1":"WEST 29 STREET",
                      "intersection_street_2":"7 AVENUE",
                      "address_type":"INTERSECTION",
                      "city":"NEW YORK",
                      "landmark":"",
                      "facility_type":"N/A",
                      "status":"Closed",
                      "due_date":"02/28/2007 12:00:00 AM",
                      "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                      "community_board":"05 MANHATTAN",
                      "borough":"MANHATTAN",
                      "x_coordinate_state_plane":"986215",
                      "y_coordinate_state_plane":"211740",
                      "park_facility_name":"",
                      "park_borough":"MANHATTAN",
                      "school_name":"",
                      "school_number":"",
                      "school_region":"",
                      "school_code":"",
                      "school_phone_number":"",
                      "school_address":"",
                      "school_city":"",
                      "school_state":"",
                      "school_zip":"",
                      "school_not_found":"",
                      "school_or_citywide_complaint":"",
                      "vehicle_type":"",
                      "taxi_company_borough":"",
                      "taxi_pick_up_location":"Other",
                      "bridge_highway_name":"",
                      "bridge_highway_direction":"",
                      "road_ramp":"",
                      "bridge_highway_segment":"",
                      "garage_lot_name":"",
                      "ferry_direction":"",
                      "ferry_terminal_name":"",
                      "latitude":"40.74785373937869",
                      "longitude":"-73.99290823133913",
                      "location":"(40.74785373937869, -73.99290823133913)"
                    }]
                  
  4. 만들기를 클릭합니다.
  5. 하위 통합 세부정보 섹션에서 다음 정보를 입력합니다.
    • API 트리거 ID: 하위 통합에서 API 트리거 요소를 선택합니다. 예를 들어 Process-each-record_API_1을 선택합니다.
    • 실행 전략: ASYNC를 선택합니다.
    • 단일 통합 실행을 선택합니다.
  6. 각 실행 섹션의 개별 배열 요소를 매핑할 위치에 대해 하위 통합의 데이터 매핑 태스크에 변수 이름을 입력합니다. 이 경우 녹화를 입력합니다. 하위 통합 변수는 게시된 통합에 대해서만 나열됩니다. 변수가 나열되지 않으면 하위 통합이 게시된 후 변수가 표시되는 데 다소 시간이 소요되므로 페이지를 새로고칩니다.

기본 통합 게시

기본 통합을 게시하려면 통합 편집기에서 게시를 클릭합니다.

Apigee API 프록시 만들기

통합을 트리거하려면 통합을 대상으로 사용해서 Apigee API 프록시를 만듭니다. 이렇게 하려면 다음 단계를 완료해야 합니다.

  1. Google Cloud 프로젝트에서 서비스 계정을 만들고 여기에 필요한 역할, Apigee 통합 호출자를 할당합니다. IAM 역할 할당에 대한 자세한 내용은 IAM 역할 및 권한을 참조하세요.
  2. Apigee UI에 로그인합니다.
  3. 탐색 메뉴에서 개발 > API 프록시를 선택합니다.
  4. 새로 만들기를 클릭합니다.
  5. 프록시 만들기 마법사에서 통합 대상을 클릭합니다.
  6. 프록시 세부정보 페이지에서 다음 정보를 입력합니다.
    • 이름: process-records를 입력합니다.
    • 기본 경로: /v1/process-records를 입력합니다.
    • 통합 리전: 통합을 만드는 데 사용한 리전을 선택합니다.
    • 통합 대상: 생성한 기본 통합을 선택합니다. 이 튜토리얼에서는 process-records를 선택합니다.
    • 트리거: 필요한 API 트리거를 선택합니다. 이 튜토리얼에서는 process-records_API_1을 선택합니다.
    • 엔드포인트 유형: 동기화를 선택합니다.
  7. 다음을 클릭하고 일반 정책 페이지에서 다음을 다시 클릭합니다.
  8. 요약 페이지에서 만들기를 클릭합니다.
  9. 프록시가 생성되면 프록시 수정을 클릭합니다.
  10. 개발 탭을 클릭합니다.
  11. 정책에서 통합 요청 설정 정책을 다음 정책으로 업데이트합니다.
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <SetIntegrationRequest continueOnError="false" enabled="true" name="set-integration-request">
      <DisplayName>Set Integration Request</DisplayName>
      <ProjectId>project_id</ProjectId>
      <IntegrationName>process-records</IntegrationName>
      <IntegrationRegion>region</IntegrationRegion>
      <ApiTrigger>api_trigger/process-records_API_1</ApiTrigger>
      <Parameters>
        <Parameter name="records" type="json" ref="request.content"/>
      </Parameters>
    </SetIntegrationRequest>
    다음을 바꿉니다.
    • project_id를 Google Cloud 프로젝트의 프로젝트 ID로 바꿉니다.
    • region을 통합을 만든 리전으로 바꿉니다.
  12. 변경사항을 저장합니다.
  13. 배포를 클릭합니다.
  14. 버전에서 업데이트된 버전을 선택합니다.
  15. 환경에서 통합을 배포하는 데 사용할 환경을 선택합니다.
  16. API 프록시를 배포할 때는 이전에 만든 서비스 계정의 이메일을 필요한 역할과 함께 제공합니다.
  17. 배포를 클릭합니다.

통합 테스트

통합을 테스트하려면 다음 단계를 완료합니다.

  1. Cloud Shell에 샘플 데이터를 다운로드합니다.
    1. Google Cloud 콘솔에서 Cloud Shell 세션을 실행하려면 Cloud 콘솔에서 Cloud Shell 활성화 아이콘 Cloud Shell 활성화 아이콘을 클릭합니다. 그러면 Google Cloud 콘솔 하단 창에서 세션이 시작됩니다.
    2. Cloud Shell 터미널에서 다음 명령어를 입력합니다.
      wget https://raw.githubusercontent.com/GoogleCloudPlatform/application-integration-samples/main/assets/bq-sample-dataset.json
              
    3. 샘플 데이터가 다운로드되었는지 확인하려면 Cloud Shell 터미널에 다음 명령어를 입력합니다.
      ls -la bq-sample-dataset.json
      다운로드한 파일이 Cloud Shell 터미널에 나열됩니다.
  2. Apigee 프록시에서 디버깅을 사용 설정하려면 다음 단계를 완료합니다.
    1. Apigee UI에서 방금 만든 API 프록시로 전환합니다.
    2. 디버그 탭을 클릭합니다.
    3. 디버그 세션 시작을 클릭하고 다음 정보를 입력합니다.
      1. 디버그 세션을 실행할 환경을 선택합니다.
      2. (선택사항) 필터 드롭다운 목록에서 생성 중인 디버그 세션에서 모든 트랜잭션에 적용할 필터를 선택합니다. 기본값은 디버그 데이터의 모든 트랜잭션을 포함하는 None (All transactions)입니다.
      3. 시작을 클릭합니다.
  3. 테스트를 시작하려면 Cloud Shell 터미널에 다음 명령어를 입력합니다.
    export APIGEE_DOMAIN=<your-Apigee-domain>
    export SAMPLE_DOCS=$(jq  $(r=$((RANDOM % 1000)) ; echo ".[$r:$((r + 3))]") < bq-sample-dataset.json)
            
    curl -X POST https://$APIGEE_DOMAIN/v1/process-records \
      -H 'Content-Type: application/json' \
      -d "$SAMPLE_DOCS"
          
    이 테스트는 샘플 데이터 세트에서 3개의 무작위 항목을 선택하고 이를 기본 통합에 전달합니다. 기본 통합이 하위 통합에 각 항목을 전달하여, 데이터가 BigQuery 테이블에서 행으로 추가됩니다.
  4. BigQuery 테이블에 이제 이러한 레코드가 포함되었는지 확인하려면 다음 단계를 수행합니다.
    1. Cloud 콘솔 페이지에서 탐색 메뉴를 클릭합니다.
    2. 애널리틱스 섹션에서 BigQuery를 클릭합니다.
    3. 프로젝트를 확장하고 bq_tutorial 데이터 세트를 클릭합니다.
    4. bq_tutorial 데이터 세트를 확장하고 tutorial 테이블을 클릭합니다.
    5. 테이블 탐색기 탭을 클릭하여 삽입된 레코드를 확인합니다.

다음 단계

다른 커넥터로 통합 빌드를 시도합니다. 지원되는 모든 커넥터 목록은 커넥터 참조를 확인하세요.