Application Integration에 지원되는 커넥터를 참조하세요.

For Each 병렬 태스크를 사용하여 BigQuery에 데이터 삽입

이 튜토리얼에서는 Application Integration 및 하위 통합을 만들어서 일련의 레코드를 처리합니다. 각 레코드에서 기본 통합은 하위 통합을 비동기적으로 호출하여, 각 레코드에 대해 데이터를 가져오고 이를 BigQuery 데이터 세트의 테이블에 행으로 삽입합니다.

이 튜토리얼에서는 다음 태스크를 완료합니다.

시작하기 전에

  • Application Integration에 대한 액세스 권한이 있는지 확인합니다.
  • Google Cloud 프로젝트에서 다음을 수행합니다.

    • 연결을 만드는 데 사용할 서비스 계정에 다음 역할을 부여합니다.
      • roles/bigquery.dataEditor
      • roles/bigquery.readSessionUser
      • roles/secretmanager.viewer
      • roles/secretmanager.secretAccessor
    • 다음 서비스를 사용 설정합니다.
      • secretmanager.googleapis.com(Secret Manager API)
      • connectors.googleapis.com(Connectors API)

      이러한 서비스가 이전에 프로젝트에 대해 사용 설정되지 않았으면 연결 만들기 페이지에서 연결을 만들 때 서비스를 사용 설정하라는 메시지가 표시됩니다.

BigQuery 연결 설정

먼저 이 튜토리얼에서 사용할 BigQuery 데이터 세트 및 테이블을 만듭니다. 데이터 세트와 테이블을 만든 후 BigQuery 연결을 만듭니다. 이 튜토리얼의 뒷 부분에서 통합에 이 연결을 사용합니다.

BigQuery 데이터 세트 및 테이블 설정

BigQuery 데이터 세트 및 테이블을 설정하려면 다음 단계를 수행합니다.

  1. Cloud 콘솔 페이지에서 Google Cloud 프로젝트를 선택합니다.
  2. Google Cloud 콘솔에서 Cloud Shell 세션을 실행하려면 Cloud 콘솔에서 Cloud Shell 활성화 아이콘 Cloud Shell 활성화 아이콘을 클릭합니다. 그러면 Google Cloud 콘솔 하단 창에서 세션이 시작됩니다.
  3. BigQuery API를 사용 설정하려면 Cloud Shell 터미널에 다음 명령어를 입력합니다.
    export PROJECT_ID=project_id
    export REGION=region
    gcloud services enable --project "${PROJECT_ID}" \
        bigquery.googleapis.com \
        bigquerystorage.googleapis.com
    이 명령어에서 다음을 바꿉니다.
    • project_id를 Google Cloud 프로젝트의 프로젝트 ID로 바꿉니다.
    • region을 BigQuery 데이터 세트를 만드는 데 사용할 리전으로 바꿉니다.
  4. 이름이 bq_tutorial인 BigQuery 데이터 세트를 만들기 위해 Cloud Shell 터미널에서 다음 명령어를 입력합니다.
          bq  --project_id ${PROJECT_ID} --location ${REGION} mk bq_tutorial
        
  5. 이름이 tutorial인 BigQuery 테이블을 만들기 위해 Cloud Shell 터미널에서 다음 명령어를 입력합니다.
          bq --project_id ${PROJECT_ID} \
            query  \
            --nouse_legacy_sql \
          'create table bq_tutorial.tutorial (
          unique_key STRING NOT NULL,
          created_date STRING,
          closed_date STRING,
          agency STRING,
          agency_name STRING,
          complaint_type STRING,
          descriptor STRING,
          location_type STRING,
          incident_zip STRING,
          incident_address STRING,
          street_name STRING,
          cross_street_1 STRING,
          cross_street_2 STRING,
          intersection_street_1 STRING,
          intersection_street_2 STRING,
          address_type STRING,
          city STRING,
          landmark STRING,
          facility_type STRING,
          status STRING,
          due_date STRING,
          resolution_action_updated_date STRING,
          community_board STRING,
          borough STRING,
          x_coordinate_state_plane STRING,
          y_coordinate_state_plane STRING,
          park_facility_name STRING,
          park_borough STRING,
          school_name STRING,
          school_number STRING,
          school_region STRING,
          school_code STRING,
          school_phone_number STRING,
          school_address STRING,
          school_city STRING,
          school_state STRING,
          school_zip STRING,
          school_not_found STRING,
          school_or_citywide_complaint STRING,
          vehicle_type STRING,
          taxi_company_borough STRING,
          taxi_pick_up_location STRING,
          bridge_highway_name STRING,
          bridge_highway_direction STRING,
          bridge_highway_segment STRING,
          road_ramp STRING,
          garage_lot_name STRING,
          ferry_direction STRING,
          ferry_terminal_name STRING,
          latitude STRING,
          longitude STRING,
          location STRING
          ) '
      
  6. BigQuery 테이블이 생성되었는지 확인합니다.
    1. Cloud 콘솔 페이지에서 탐색 메뉴를 클릭합니다.
    2. 애널리틱스 섹션에서 BigQuery를 클릭합니다.
    3. 프로젝트를 확장하고 bq_tutorial 데이터 세트가 나열되었는지 확인합니다.
    4. bq_tutorial 데이터 세트를 확장하고 tutorial 테이블이 나열되었는지 확인합니다.
    5. 문서 테이블을 클릭하여 스키마를 확인합니다.

BigQuery 연결 만들기

다음으로 BigQuery 연결을 만듭니다. BigQuery 커넥터를 사용하면 BigQuery 테이블에서 행 삽입, 읽기, 업데이트, 삭제를 수행하고 통합에서 결과 출력을 사용할 수 있습니다. BigQuery 연결을 만든 후 이 튜토리얼의 뒷 부분에서 통합에 이 연결을 사용하여 BigQuery 테이블에 행을 추가합니다.

BigQuery 연결을 만들려면 다음 단계를 완료합니다.

  1. Cloud 콘솔 페이지에서 Google Cloud 프로젝트를 선택합니다.
  2. 연결 페이지를 엽니다.
  3. + 새로 만들기를 클릭하여 연결 만들기 페이지를 엽니다.
  4. 연결을 구성합니다.
    1. 연결 만들기 섹션에서 다음을 완료합니다.
      • 커넥터: 사용 가능한 커넥터 드롭다운 목록에서 BigQuery를 선택합니다.
      • 커넥터 버전: 사용 가능한 버전의 드롭다운 목록에서 최신 커넥터 버전을 선택합니다.
      • 연결 이름 필드에서 연결 인스턴스의 이름을 입력합니다. 이 튜토리얼에서는 connector-bq-tutorial을 입력합니다.
      • 선택적으로 연결 인스턴스에 대한 설명을 추가합니다.
      • 서비스 계정: 필수 역할이 있는 서비스 계정을 선택합니다.
      • 프로젝트 ID: BigQuery 데이터가 있는 Google Cloud 프로젝트의 ID를 입력합니다.
      • 데이터 세트 ID: 사용하려는 BigQuery 데이터 세트의 ID를 입력합니다. 이 튜토리얼에서는 bq_tutorial을 입력합니다.
      • 선택적으로 + 라벨 추가를 클릭하여 키/값 쌍의 형식으로 라벨을 추가합니다.
      • 다음을 클릭합니다.
    2. 위치: 연결이 실행될 리전을 선택합니다. 커넥터가 지원되는 리전은 다음과 같습니다.

        지원되는 모든 리전 목록은 위치를 참조하세요.

      • 다음을 클릭합니다.
    3. 인증: BigQuery 연결은 인증 구성이 필요하지 않습니다. 다음을 클릭합니다.
    4. 검토: 연결을 구성 세부정보를 검토합니다. 이 섹션에서는 검토를 위해 새 연결의 연결 및 인증 세부정보가 표시되어 있습니다.
  5. 만들기를 클릭합니다.

하위 통합 설정

이 튜토리얼에서는 하위 통합이 기본 통합에 의해 전송된 각 레코드를 가져와서 bq_tutorial 데이터 세트의 tutorial 테이블에 행으로 삽입합니다.

하위 통합 만들기

하위 통합을 만들려면 다음 단계를 완료합니다.

  1. Google Cloud 콘솔에서 Application Integration 페이지로 이동합니다.

    Application Integration으로 이동

  2. 탐색 메뉴에서 통합을 클릭합니다. 통합 목록 페이지가 나타납니다.
  3. 통합 만들기를 클릭합니다.
  4. 통합 만들기 대화상자에서 다음을 수행합니다.
    • 이름을 입력합니다. 예를 들어 Process-each-record를 입력합니다.
    • 선택적으로 설명을 입력합니다. 예를 들어 각 레코드 처리를 위한 API 트리거(하위 통합)를 입력합니다.
    • 통합을 만들려는 리전을 선택합니다.
  5. 만들기를 클릭하여 통합 편집기를 엽니다.

API 트리거 추가

통합에 API 트리거를 추가하려면 다음을 수행합니다.

  1. 통합 편집기에서 태스크/트리거 추가 > 트리거를 선택하여 사용 가능한 트리거 목록을 표시합니다.
  2. API 트리거 요소를 통합 편집기로 드래그합니다.

데이터 매핑 태스크 추가

통합에 데이터 매핑 태스크를 추가하려면 다음 단계를 완료합니다.

  1. 통합 편집기에서 +태스크/트리거 추가 > 태스크를 선택하여 사용 가능한 태스크 목록을 표시합니다.
  2. 데이터 매핑 요소를 통합 편집기로 드래그합니다.

BigQuery 연결 구성

이제 하위 통합에서 앞에서 만든 BigQuery 연결을 사용할 준비가 되었습니다. 이 통합에서 BigQuery 연결을 구성하려면 다음 단계를 완료합니다.

  1. 통합 편집기에서 +태스크/트리거 추가 > 태스크를 선택하여 사용 가능한 태스크 목록을 표시합니다.
  2. 커넥터 요소를 통합 편집기로 드래그합니다.
  3. 디자이너에서 커넥터 태스크 요소를 클릭하여 태스크 구성 창을 확인합니다.
  4. 오른쪽 패널의 수정 아이콘을 클릭하여 BigQuery에 행을 삽입하도록 라벨을 업데이트합니다.
  5. 태스크 구성을 클릭합니다.

    커넥터 태스크 구성 대화상자가 나타납니다.

  6. 커넥터 태스크 구성 대화상자에서 다음을 수행합니다.
    1. BigQuery 연결을 만든 연결 리전을 선택합니다.
    2. 사용할 BigQuery 연결을 선택합니다. 이 튜토리얼에서는 connector-bq-tutorial을 선택합니다.
    3. 연결을 선택하면 유형 열이 표시됩니다. 항목을 선택한 후 사용 가능한 항목 목록에서 튜토리얼을 선택합니다.
    4. 유형을 선택하면 작업 열이 나타납니다. 만들기를 선택합니다.
    5. 완료를 클릭하여 연결 구성을 완료하고 대화상자를 닫습니다.

통합 요소 연결

다음으로 에지 연결을 추가하여 API 트리거데이터 매핑 태스크에 연결하고 데이터 매핑 태스크를 커넥터 태스크에 연결합니다. 에지 연결은 통합에서 두 요소 간의 연결입니다. 에지 및 에지 조건에 대한 자세한 내용은 에지를 참조하세요.

에지 연결을 추가하려면 다음 단계를 완료합니다.

  1. API 트리거 요소 아래에서 포크 제어 지점을 클릭합니다. 데이터 매핑 요소 상단의 조인 제어 지점에서 에지 연결을 드래그 앤 드롭합니다.
  2. 데이터 매핑 요소 하단에서 포크 제어 지점을 클릭합니다. 커넥터 요소 상단의 조인 제어 지점에서 에지 연결을 드래그 앤 드롭합니다.

데이터 매핑 태스크 구성

데이터 매핑 태스크를 구성하려면 다음 단계를 완료합니다.

  1. 통합 편집기에서 데이터 매핑 태스크를 클릭하여 태스크 구성 창을 봅니다.
  2. 데이터 매핑 편집기 열기를 클릭합니다.
  3. 데이터 매핑 편집기에서 추가를 클릭하여 새 변수를 추가합니다.
  4. 변수 만들기 대화상자에 다음 정보를 입력합니다.
    • 이름: 레코드를 입력합니다.
    • 데이터 유형: JSON을 선택합니다.
    • 스키마: 샘플 JSON 페이로드에서 유추를 선택합니다. 다음 샘플 JSON 페이로드를 입력합니다.
                  {
                    "unique_key":"304271",
                    "created_date":"02/06/2007 12:00:00 AM",
                    "closed_date":"03/01/2007 12:00:00 AM",
                    "agency":"TLC",
                    "agency_name":"Taxi and Limousine Commission",
                    "complaint_type":"Taxi Complaint",
                    "descriptor":"Driver Complaint",
                    "location_type":"Street",
                    "incident_zip":"10001",
                    "incident_address":"",
                    "street_name":"",
                    "cross_street_1":"",
                    "cross_street_2":"",
                    "intersection_street_1":"WEST 29 STREET",
                    "intersection_street_2":"7 AVENUE",
                    "address_type":"INTERSECTION",
                    "city":"NEW YORK",
                    "landmark":"",
                    "facility_type":"N/A",
                    "status":"Closed",
                    "due_date":"02/28/2007 12:00:00 AM",
                    "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                    "community_board":"05 MANHATTAN",
                    "borough":"MANHATTAN",
                    "x_coordinate_state_plane":"986215",
                    "y_coordinate_state_plane":"211740",
                    "park_facility_name":"",
                    "park_borough":"MANHATTAN",
                    "school_name":"",
                    "school_number":"",
                    "school_region":"",
                    "school_code":"",
                    "school_phone_number":"",
                    "school_address":"",
                    "school_city":"",
                    "school_state":"",
                    "school_zip":"",
                    "school_not_found":"",
                    "school_or_citywide_complaint":"",
                    "vehicle_type":"",
                    "taxi_company_borough":"",
                    "taxi_pick_up_location":"Other",
                    "bridge_highway_name":"",
                    "bridge_highway_direction":"",
                    "road_ramp":"",
                    "bridge_highway_segment":"",
                    "garage_lot_name":"",
                    "ferry_direction":"",
                    "ferry_terminal_name":"",
                    "latitude":"40.74785373937869",
                    "longitude":"-73.99290823133913",
                    "location":"(40.74785373937869, -73.99290823133913)"
                  }
                
  5. 만들기를 클릭합니다.
  6. 변수가 생성되면 데이터 매핑 편집기에서 다음 단계를 완료합니다.
    • 레코드 변수를 입력 열로 드래그합니다.
    • connectorInputPayload 변수를 출력 열로 드래그합니다.
  7. 데이터 매핑 편집기를 닫고 통합 편집기로 돌아갑니다.

하위 통합 게시

하위 통합을 게시하려면 통합 편집기에서 게시를 클릭합니다.

기본 통합 설정

이 섹션에서는 각 레코드 처리를 위해 For Each 병렬 태스크를 사용하는 기본 통합을 설정합니다. 그런 다음 이 기본 통합이 각 레코드에 대해 한 번씩 하위 통합을 호출합니다.

기본 통합 만들기

기본 통합을 만들려면 다음 단계를 완료합니다.

  1. Google Cloud 콘솔에서 Application Integration 페이지로 이동합니다.

    Application Integration으로 이동

  2. 탐색 메뉴에서 통합을 클릭합니다. 통합 목록 페이지가 나타납니다.
  3. 통합 만들기를 클릭합니다.
  4. 통합 만들기 대화상자에서 다음을 수행합니다.
    • 이름을 입력합니다. 예를 들어 process-records를 입력합니다.
    • 선택적으로 설명을 입력합니다. 예를 들어 레코드 처리를 위한 API 트리거(기본 통합)를 입력합니다.
    • 통합을 만들려는 리전을 선택합니다.
  5. 만들기를 클릭하여 통합 편집기를 엽니다.

API 트리거 추가

통합에 API 트리거를 추가하려면 다음을 수행합니다.

  1. 통합 편집기에서 태스크/트리거 추가 > 트리거를 선택하여 사용 가능한 트리거 목록을 표시합니다.
  2. API 트리거 요소를 통합 편집기로 드래그합니다.

For Each 병렬 태스크 추가

통합에서 For Each 병렬 태스크를 추가하려면 다음 단계를 완료합니다.

  1. 통합 편집기에서 +태스크/트리거 추가 > 태스크를 선택하여 사용 가능한 태스크 목록을 표시합니다.
  2. For Each 병렬 태스크 요소를 통합 편집기로 드래그합니다.

통합 요소 연결

다음으로 에지 연결을 추가하여 API 트리거를 For Each 병렬 태스크에 연결합니다.

에지 연결을 추가하려면 API 트리거 요소 하단에서 포크 제어 지점을 클릭합니다. For Each 병렬 태스크 요소 상단의 조인 제어 지점에서 에지 연결을 드래그 앤 드롭합니다.

For Each 병렬 태스크 구성

For Each 병렬 태스크를 구성하려면 다음 단계를 완료합니다.

  1. 통합 편집기에서 For Each 병렬 태스크를 클릭하여 태스크 구성 창을 봅니다.
  2. 배열 선택 > 반복할 목록에서 새 변수 추가를 클릭하여 새 변수를 추가합니다.
  3. 변수 만들기 대화상자에 다음 정보를 입력합니다.
    • 이름: records를 입력합니다.
    • 데이터 유형: JSON을 선택합니다.
    • 스키마: 샘플 JSON 페이로드에서 유추를 선택합니다. 다음 샘플 JSON 페이로드를 입력합니다.
                    [{
                      "unique_key":"304271",
                      "created_date":"02/06/2007 12:00:00 AM",
                      "closed_date":"03/01/2007 12:00:00 AM",
                      "agency":"TLC",
                      "agency_name":"Taxi and Limousine Commission",
                      "complaint_type":"Taxi Complaint",
                      "descriptor":"Driver Complaint",
                      "location_type":"Street",
                      "incident_zip":"10001",
                      "incident_address":"",
                      "street_name":"",
                      "cross_street_1":"",
                      "cross_street_2":"",
                      "intersection_street_1":"WEST 29 STREET",
                      "intersection_street_2":"7 AVENUE",
                      "address_type":"INTERSECTION",
                      "city":"NEW YORK",
                      "landmark":"",
                      "facility_type":"N/A",
                      "status":"Closed",
                      "due_date":"02/28/2007 12:00:00 AM",
                      "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                      "community_board":"05 MANHATTAN",
                      "borough":"MANHATTAN",
                      "x_coordinate_state_plane":"986215",
                      "y_coordinate_state_plane":"211740",
                      "park_facility_name":"",
                      "park_borough":"MANHATTAN",
                      "school_name":"",
                      "school_number":"",
                      "school_region":"",
                      "school_code":"",
                      "school_phone_number":"",
                      "school_address":"",
                      "school_city":"",
                      "school_state":"",
                      "school_zip":"",
                      "school_not_found":"",
                      "school_or_citywide_complaint":"",
                      "vehicle_type":"",
                      "taxi_company_borough":"",
                      "taxi_pick_up_location":"Other",
                      "bridge_highway_name":"",
                      "bridge_highway_direction":"",
                      "road_ramp":"",
                      "bridge_highway_segment":"",
                      "garage_lot_name":"",
                      "ferry_direction":"",
                      "ferry_terminal_name":"",
                      "latitude":"40.74785373937869",
                      "longitude":"-73.99290823133913",
                      "location":"(40.74785373937869, -73.99290823133913)"
                    }]
                  
  4. 만들기를 클릭합니다.
  5. 하위 통합 세부정보 섹션에서 다음 정보를 입력합니다.
    • API 트리거 ID: 하위 통합에서 API 트리거 요소를 선택합니다. 예를 들어 Process-each-record_API_1을 선택합니다.
    • 실행 전략: ASYNC를 선택합니다.
    • 단일 통합 실행을 선택합니다.
  6. 각 실행 섹션의 개별 배열 요소를 매핑할 위치에 대해 하위 통합의 데이터 매핑 태스크에 변수 이름을 입력합니다. 이 경우 녹화를 입력합니다. 하위 통합 변수는 게시된 통합에만 나열됩니다. 변수가 나열되지 않으면 하위 통합이 게시된 후 변수가 표시되는 데 다소 시간이 소요되므로 페이지를 새로고칩니다.

기본 통합 게시

기본 통합을 게시하려면 통합 편집기에서 게시를 클릭합니다.

통합 테스트

통합을 테스트하려면 다음 단계를 완료합니다.

  1. Cloud Shell에 샘플 데이터를 다운로드합니다.
    1. Google Cloud 콘솔에서 Cloud Shell 세션을 실행하려면 Cloud 콘솔에서 Cloud Shell 활성화 아이콘 Cloud Shell 활성화 아이콘을 클릭합니다. 그러면 Google Cloud 콘솔 하단 창에서 세션이 시작됩니다.
    2. Cloud Shell 터미널에서 다음 명령어를 입력합니다.
      wget https://raw.githubusercontent.com/GoogleCloudPlatform/application-integration-samples/main/assets/bq-sample-dataset.json
              
    3. 샘플 데이터가 다운로드되었는지 확인하려면 Cloud Shell 터미널에 다음 명령어를 입력합니다.
      ls -la bq-sample-dataset.json
      다운로드한 파일이 Cloud Shell 터미널에 나열됩니다.
  2. 샘플 데이터 세트에서 임의의 3개 항목을 선택하고 이를 통합에 전달할 수 있도록 저장하려면 Cloud Shell 터미널에 다음 명령어를 입력합니다.
    AUTH=$(gcloud auth print-access-token)
    export SAMPLE_DOCS=$(jq $(r=$((RANDOM % 1000)) ; echo ".[$r:$((r + 3))]") < bq-sample-dataset.json | jq -Rs '.')
    
    generate_post_data()
      {
        cat <<EOF
          {
            "triggerId": "api_trigger/process-records_API_1",
            "inputParameters":
              {
                "records":
                  {
                    "jsonValue": $SAMPLE_DOCS
                  }
              }
          }
          EOF
      }
  3. 테스트를 시작하려면 Cloud Shell 터미널에서 다음 명령어를 입력합니다.
    curl -X POST \
      https://integrations.googleapis.com/v1/projects/project_id/locations/region/integrations/process-records:execute \
      -H "Authorization: Bearer $AUTH" \
      -H "Content-Type: application/json" \
      -d "$(generate_post_data)"
    이 명령어에서 다음을 바꿉니다.
    • project_id를 Google Cloud 프로젝트의 프로젝트 ID로 바꿉니다.
    • region을 통합을 만든 리전으로 바꿉니다.
    이 명령어는 기본 통합을 호출하고 샘플 데이터 세트의 항목을 기본 통합으로 전달합니다. 기본 통합이 하위 통합에 각 항목을 전달하여, 데이터가 BigQuery 테이블에서 행으로 추가됩니다.
  4. BigQuery 테이블에 이제 이러한 레코드가 포함되었는지 확인하려면 다음 단계를 수행합니다.
    1. Cloud 콘솔 페이지에서 탐색 메뉴를 클릭합니다.
    2. 애널리틱스 섹션에서 BigQuery를 클릭합니다.
    3. 프로젝트를 확장하고 bq_tutorial 데이터 세트를 클릭합니다.
    4. bq_tutorial 데이터 세트를 확장하고 tutorial 테이블을 클릭합니다.
    5. 테이블 탐색기 탭을 클릭하여 삽입된 레코드를 확인합니다.

다음 단계

다른 커넥터로 통합 빌드를 시도합니다. 지원되는 모든 커넥터 목록은 커넥터 참조를 확인하세요.