Workflows를 사용하여 커스텀 소스에서 메타데이터 가져오기

이 문서에서는 Workflows에서 관리형 연결 파이프라인을 실행하여 서드 파티 소스에서 Dataplex로 메타데이터를 가져오는 방법을 설명합니다.

관리형 연결 파이프라인을 설정하려면 데이터 소스의 커넥터를 빌드합니다. 그런 다음 Workflows에서 파이프라인을 실행합니다. 파이프라인은 데이터 소스에서 메타데이터를 추출한 후 메타데이터를 Dataplex로 가져옵니다. 필요한 경우 파이프라인은 Google Cloud 프로젝트에 Dataplex 카탈로그 항목 그룹도 만듭니다.

관리형 연결에 대한 자세한 내용은 관리형 연결 개요를 참조하세요.

시작하기 전에

메타데이터를 가져오기 전에 이 섹션의 태스크를 수행합니다.

커넥터 빌드

커넥터는 데이터 소스에서 메타데이터를 추출하고 Dataplex에서 가져올 수 있는 메타데이터 가져오기 파일을 생성합니다. 커넥터는 Dataproc Serverless에서 실행할 수 있는 Artifact Registry 이미지입니다.

Google Cloud 리소스 구성

  1. Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.

    Enable the APIs

    일정에 따라 파이프라인을 실행할 계획이 없다면 Cloud Scheduler API를 사용 설정할 필요가 없습니다.

  2. Secret Manager에서 보안 비밀을 만들어 서드 파티 데이터 소스의 사용자 인증 정보를 저장합니다.

  3. Spark 워크로드용 Dataproc Serverless를 실행하도록 가상 프라이빗 클라우드(VPC) 네트워크를 구성합니다.

  4. 메타데이터 가져오기 파일을 저장할 Cloud Storage 버킷을 만듭니다.

  5. 다음 Dataplex 카탈로그 리소스를 만듭니다.

    1. 가져오려는 항목에 대해 커스텀 관점 유형을 만듭니다.

    2. 가져오려는 항목에 대해 커스텀 항목 유형을 만듭니다.

필요한 역할

서비스 계정은 워크플로의 ID를 나타내며 워크플로에 포함된 권한과 액세스할 수 있는 Google Cloud 리소스를 확인합니다. 파이프라인을 실행하기 위한 워크플로 서비스 계정과 커넥터를 실행하기 위한 Dataproc Serverless 서비스 계정이 필요합니다.

Compute Engine 기본 서비스 계정(PROJECT_NUMBER-compute@developer.gserviceaccount.com)을 사용하거나 자체 서비스 계정(또는 계정)을 만들어 관리형 연결 파이프라인을 실행할 수 있습니다.

콘솔

  1. Google Cloud 콘솔에서 IAM 페이지로 이동합니다.

    IAM으로 이동

  2. 메타데이터를 가져올 프로젝트를 선택합니다.

  3. 액세스 권한 부여를 클릭한 후 서비스 계정의 이메일 주소를 입력합니다.

  4. 서비스 계정에 다음 역할을 할당합니다.

    • 로그 작성자
    • Dataplex 항목 그룹 소유자
    • Dataplex 메타데이터 작업 소유자
    • Dataplex 카탈로그 편집자
    • Dataproc 편집자
    • Dataproc 작업자
    • Secret Manager 보안 접근자: 데이터 소스의 사용자 인증 정보를 저장하는 보안 비밀
    • 스토리지 객체 사용자 - Cloud Storage 버킷
    • Artifact Registry 리더: 커넥터 이미지가 포함된 Artifact Registry 저장소
    • 서비스 계정 사용자 - 다른 서비스 계정을 사용하는 경우 Dataproc Serverless 일괄 작업을 실행하는 서비스 계정에서 워크플로를 실행하는 서비스 계정에 이 역할 부여
    • Workflows 호출자: 파이프라인을 예약하려는 경우
  5. 변경사항을 저장합니다.

gcloud

  1. 서비스 계정에 역할을 부여합니다. 다음 명령어를 실행합니다.

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/logging.logWriter
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.entryGroupOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.metadataJobOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.catalogEditor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.editor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.worker
    

    다음을 바꿉니다.

    • PROJECT_ID: 메타데이터를 가져올 대상 Google Cloud프로젝트의 이름입니다.
    • SERVICE_ACCOUNT_ID: 서비스 계정입니다(예: my-service-account@my-project.iam.gserviceaccount.com).
  2. 서비스 계정에 리소스 수준에서 다음 역할을 부여합니다.

    gcloud secrets add-iam-policy-binding SECRET_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/secretmanager.secretaccessor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/storage.objectUser \
        --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')
    gcloud artifacts repositories add-iam-policy-binding REPOSITORY \
        --location=REPOSITORY_LOCATION \
        --member=SERVICE_ACCOUNT_ID} \
        --role=roles/artifactregistry.reader
    

    다음을 바꿉니다.

    • SECRET_ID: 데이터 소스의 사용자 인증 정보를 저장하는 보안 비밀의 ID입니다. projects/PROJECT_ID/secrets/SECRET_ID 형식을 사용합니다.
    • BUCKET_ID: Cloud Storage 버킷의 이름입니다.
    • REPOSITORY: 커넥터 이미지가 포함된 Artifact Registry 저장소입니다.
    • REPOSITORY_LOCATION: 저장소가 호스팅되는 Google Cloud위치입니다.
  3. 워크플로를 실행하는 서비스 계정에 Dataproc Serverless 일괄 작업을 실행하는 서비스 계정에 대한 roles/iam.serviceAccountUser 역할을 부여합니다. 워크플로와 Dataproc Serverless에 동일한 서비스 계정을 사용하는 경우에도 이 역할을 부여해야 합니다.

    gcloud iam service-accounts add-iam-policy-binding \
        serviceAccount:SERVICE_ACCOUNT_ID \
        --member='SERVICE_ACCOUNT_ID' \
        --role='roles/iam.serviceAccountUser'
    

    다른 서비스 계정을 사용하는 경우 --member 플래그의 값은 Dataproc 서버리스 일괄 작업을 실행하는 서비스 계정입니다.

  4. 파이프라인을 예약하려면 서비스 계정에 다음 역할을 부여합니다.

    gcloud projects add-iam-policy-binding PROJECT_ID \
     --member="SERVICE_ACCOUNT_ID" \
     --role=roles/workflows.invoker
    

메타데이터 가져오기

메타데이터를 가져오려면 관리형 연결 파이프라인을 실행하는 워크플로를 만든 후 실행합니다. 원하는 경우 파이프라인 실행 일정을 만들 수도 있습니다.

콘솔

  1. 워크플로를 만듭니다. 다음 정보를 제공하세요.

    • 서비스 계정: 이 문서의 필수 역할 섹션에서 구성한 서비스 계정입니다.
    • 암호화: Google 관리 암호화 키를 선택합니다.

    • 워크플로 정의: 다음 정의 파일을 제공합니다.

      main:
        params: [args]
        steps:
          - init:
              assign:
              - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
              - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
              - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
              - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
      
          - check_networking:
              switch:
                - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                  raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
                - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                  steps:
                   - submit_extract_job_with_default_network_uri:
                        assign:
                          - NETWORK_TYPE: "networkUri"
                          - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
                - condition: ${NETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_network_uri:
                        assign:
                          - NETWORKING: ${NETWORK_URI}
                          - NETWORK_TYPE: "networkUri"
                - condition: ${SUBNETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_subnetwork_uri:
                        assign:
                          - NETWORKING: ${SUBNETWORK_URI}
                          - NETWORK_TYPE: "subnetworkUri"
              next: check_create_target_entry_group
      
          - check_create_target_entry_group:
              switch:
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                  next: create_target_entry_group
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                  next: generate_extract_job_link
      
          - create_target_entry_group:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: generate_extract_job_link
      
          - generate_extract_job_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                  severity: "INFO"
              next: submit_pyspark_extract_job
      
          - submit_pyspark_extract_job:
              call: http.post
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                headers:
                  Content-Type: "application/json"
                query:
                  batchId: ${WORKFLOW_ID}
                body:
                  pysparkBatch:
                    mainPythonFileUri: file:///main.py
                    args:
                      - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                      - ${"--target_location_id=" + args.CLOUD_REGION}
                      - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                      - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                      - ${"--output_folder=" + WORKFLOW_ID}
                      - ${args.ADDITIONAL_CONNECTOR_ARGS}
                  runtimeConfig:
                      containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                  environmentConfig:
                      executionConfig:
                          serviceAccount: ${args.SERVICE_ACCOUNT}
                          stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                          ${NETWORK_TYPE}: ${NETWORKING}
                          networkTags: ${NETWORK_TAGS}
              result: RESPONSE_MESSAGE
              next: check_pyspark_extract_job
      
          - check_pyspark_extract_job:
              call: http.get
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: PYSPARK_EXTRACT_JOB_STATUS
              next: check_pyspark_extract_job_done
      
          - check_pyspark_extract_job_done:
              switch:
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                  next: generate_import_logs_link
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              next: pyspark_extract_job_wait
      
          - pyspark_extract_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_pyspark_extract_job
      
          - generate_import_logs_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                  severity: "INFO"
              next: submit_import_job
      
          - submit_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                body:
                  type: IMPORT
                  import_spec:
                    source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                    entry_sync_mode: FULL
                    aspect_sync_mode: INCREMENTAL
                    log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                    scope:
                      entry_groups: 
                        - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                      entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                      aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
              result: IMPORT_JOB_RESPONSE
              next: get_job_start_time
      
          - get_job_start_time:
              assign:
                - importJobStartTime: ${sys.now()}
              next: import_job_startup_wait
      
          - import_job_startup_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: initial_get_import_job
      
          - initial_get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_status_available
      
          - check_import_job_status_available:
              switch:
                - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                  next: kill_import_job
                - condition: ${"status" in IMPORT_JOB_STATUS.body}
                  next: check_import_job_done
              next: import_job_status_wait
      
          - import_job_status_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_import_job_status_available
      
          - check_import_job_done:
              switch:
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                  next: the_end
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                  next: kill_import_job
              next: import_job_wait
      
          - get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_done
      
          - import_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: get_import_job
      
          - kill_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: get_killed_import_job
      
          - get_killed_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: KILLED_IMPORT_JOB_STATUS
              next: killed
      
          - killed:
              raise: ${KILLED_IMPORT_JOB_STATUS}
      
          - the_end:
              return: ${IMPORT_JOB_STATUS}
  2. 파이프라인을 주문형으로 실행하려면 워크플로를 실행합니다.

    다음 런타임 인수를 제공합니다.

    {
        "TARGET_PROJECT_ID": "PROJECT_ID",
        "CLOUD_REGION": "LOCATION_ID",
        "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
        "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
        "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
        "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
        "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
        "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
        "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
        "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
        "IMPORT_JOB_LOG_LEVEL": "INFO",
        "NETWORK_TAGS": [],
        "NETWORK_URI": "",
        "SUBNETWORK_URI": ""
     }
    

    다음을 바꿉니다.

    • PROJECT_ID: 메타데이터를 가져올 대상 Google Cloud프로젝트의 이름입니다.
    • LOCATION_ID: Dataproc Serverless 및 메타데이터 가져오기 작업이 실행되고 메타데이터가 가져올 대상 Google Cloud 위치입니다.
    • ENTRY_GROUP_ID: 메타데이터를 가져올 항목 그룹의 ID입니다. 항목 그룹 ID는 소문자, 숫자, 하이픈을 포함할 수 있습니다.

      이 항목 그룹의 전체 리소스 이름은 projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID입니다.

    • CREATE_ENTRY_GROUP_BOOLEAN: 프로젝트에 아직 항목 그룹이 없는 경우 파이프라인에서 항목 그룹을 만들도록 하려면 이 값을 true로 설정합니다.
    • BUCKET_ID: 커넥터에서 생성된 메타데이터 가져오기 파일을 저장할 Cloud Storage 버킷의 이름입니다. 워크플로를 실행할 때마다 새 폴더가 생성됩니다.
    • SERVICE_ACCOUNT_ID: 이 문서의 필수 역할 섹션에서 구성한 서비스 계정입니다. 서비스 계정은 Dataproc Serverless에서 커넥터를 실행합니다.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: 커넥터에 전달할 추가 인수 목록입니다. 예시는 메타데이터 가져오기를 위한 커스텀 커넥터 개발을 참조하세요. 각 인수를 큰따옴표로 묶고 인수를 쉼표로 구분합니다.
    • CONTAINER_IMAGE: Artifact Registry에 호스팅된 커넥터의 맞춤 컨테이너 이미지입니다.
    • ENTRY_TYPES: 가져오기 범위 내에 있는 항목 유형 목록입니다(projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID 형식). LOCATION_ID는 메타데이터를 가져오는Google Cloud 위치와 동일하거나 global여야 합니다.
    • ASPECT_TYPES: 가져오기 범위 내에 있는 관점 유형 목록입니다(projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID 형식). LOCATION_ID는 메타데이터를 가져오는Google Cloud 위치와 동일하거나 global여야 합니다.
    • 선택사항: NETWORK_TAGS 인수의 경우 네트워크 태그 목록을 제공합니다.
    • 선택사항: NETWORK_URI 인수의 경우 데이터 소스에 연결되는 VPC 네트워크의 URI를 제공합니다. 네트워크를 제공하는 경우 서브네트워크 인수를 생략합니다.
    • 선택사항: SUBNETWORK_URI 인수의 경우 데이터 소스에 연결되는 서브네트워크의 URI를 제공합니다. 서브넷을 제공하는 경우 네트워크 인수를 생략합니다.

    가져오는 메타데이터의 양에 따라 파이프라인을 실행하는 데 몇 분 이상 걸릴 수 있습니다. 진행률을 확인하는 방법에 관한 자세한 내용은 워크플로 실행 결과 액세스를 참조하세요.

    파이프라인 실행이 완료되면 Dataplex 카탈로그에서 가져온 메타데이터를 검색할 수 있습니다.

  3. 선택사항: 일정에 따라 파이프라인을 실행하려면 Cloud Scheduler를 사용하여 일정을 만듭니다. 다음 정보를 제공하세요.

    • 빈도: 파이프라인을 실행할 일정을 정의하는 unix-cron 표현식입니다.
    • 워크플로 인수: 이전 단계에 설명된 대로 커넥터의 런타임 인수입니다.
    • 서비스 계정: 서비스 계정입니다. 서비스 계정은 스케줄러를 관리합니다.

gcloud

  1. 다음 워크로드 정의를 YAML 파일로 저장합니다.

    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                steps:
                 - submit_extract_job_with_default_network_uri:
                      assign:
                        - NETWORK_TYPE: "networkUri"
                        - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
              - condition: ${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: ${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: ${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: ${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - ${"--target_location_id=" + args.CLOUD_REGION}
                    - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--output_folder=" + WORKFLOW_ID}
                    - ${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: ${args.SERVICE_ACCOUNT}
                        stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                        ${NETWORK_TYPE}: ${NETWORKING}
                        networkTags: ${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: ${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: ${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: ${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: ${IMPORT_JOB_STATUS}
  2. Bash 변수를 정의하며 워크플로를 만들고 원하는 경우 파이프라인 실행을 위한 일정을 만듭니다.

    # Define Bash variables (replace with your actual values)
    project_id="PROJECT_ID"
    region="LOCATION_ID"
    service_account="SERVICE_ACCOUNT_ID"
    workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    # Create Workflows resource
    gcloud workflows deploy ${workflow_name} \
      --project=${project_id} \
      --location=${region} \
      --source=${workflow_source} \
      --service-account=${service_account}
    
    # Create Cloud Scheduler job
    gcloud scheduler jobs create http ${workflow_name}-scheduler \
      --project=${project_id} \
      --location=${region} \
      --schedule="CRON_SCHEDULE_EXPRESSION" \
      --time-zone="UTC" \
      --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \
      --http-method="POST" \
      --oauth-service-account-email=${service_account} \
      --headers="Content-Type=application/json" \
      --message-body='{"argument": ${workflow_args}}'
    

    다음을 바꿉니다.

    • PROJECT_ID: 메타데이터를 가져올 대상 Google Cloud프로젝트의 이름입니다.
    • LOCATION_ID: Dataproc Serverless 및 메타데이터 가져오기 작업이 실행되고 메타데이터가 가져올 대상 Google Cloud 위치입니다.
    • SERVICE_ACCOUNT_ID: 이 문서의 필수 역할 섹션에서 구성한 서비스 계정입니다.
    • WORKFLOW_DEFINITION_FILE: 워크플로 정의 YAML 파일의 경로입니다.
    • WORKFLOW_NAME: 워크플로의 이름입니다.
    • WORKFLOW_ARGUMENTS: 커넥터에 전달할 런타임 인수입니다. 인수는 JSON 형식입니다.

      {
          "TARGET_PROJECT_ID": "PROJECT_ID",
          "CLOUD_REGION": "LOCATION_ID",
          "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
          "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
          "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
          "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
          "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
          "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
          "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
          "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
          "IMPORT_JOB_LOG_LEVEL": "INFO",
          "NETWORK_TAGS": [],
          "NETWORK_URI": "",
          "SUBNETWORK_URI": ""
       }
      

      Cloud Scheduler의 경우 따옴표로 묶인 문자열 내의 큰따옴표는 백슬래시(\)로 이스케이프 처리됩니다. 예를 들면 --message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}"입니다.

      다음을 바꿉니다.

      • ENTRY_GROUP_ID: 메타데이터를 가져올 항목 그룹의 ID입니다. 항목 그룹 ID는 소문자, 숫자, 하이픈을 포함할 수 있습니다.

        이 항목 그룹의 전체 리소스 이름은 projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID입니다.

      • CREATE_ENTRY_GROUP_BOOLEAN: 프로젝트에 아직 항목 그룹이 없는 경우 파이프라인에서 항목 그룹을 만들도록 하려면 이 값을 true로 설정합니다.
      • BUCKET_ID: 커넥터에서 생성된 메타데이터 가져오기 파일을 저장할 Cloud Storage 버킷의 이름입니다. 워크플로를 실행할 때마다 새 폴더가 생성됩니다.
      • ADDITIONAL_CONNECTOR_ARGUMENTS: 커넥터에 전달할 추가 인수 목록입니다. 예시는 메타데이터 가져오기를 위한 커스텀 커넥터 개발을 참조하세요.
      • CONTAINER_IMAGE: Artifact Registry에 호스팅된 커넥터의 맞춤 컨테이너 이미지입니다.
      • ENTRY_TYPES: 가져오기 범위 내에 있는 항목 유형 목록입니다(projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID 형식). LOCATION_ID는 메타데이터를 가져오는Google Cloud 위치와 동일하거나 global여야 합니다.
      • ASPECT_TYPES: 가져오기 범위 내에 있는 관점 유형 목록입니다(projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID 형식). LOCATION_ID는 메타데이터를 가져오는Google Cloud 위치와 동일하거나 global여야 합니다.
      • 선택사항: NETWORK_TAGS 인수의 경우 네트워크 태그 목록을 제공합니다.
      • 선택사항: NETWORK_URI 인수의 경우 데이터 소스에 연결되는 VPC 네트워크의 URI를 제공합니다. 네트워크를 제공하는 경우 서브네트워크 인수를 생략합니다.
      • 선택사항: SUBNETWORK_URI 인수의 경우 데이터 소스에 연결되는 서브네트워크의 URI를 제공합니다. 서브넷을 제공하는 경우 네트워크 인수를 생략합니다.
    • CRON_SCHEDULE_EXPRESSION: 파이프라인을 실행할 일정을 정의하는 크론 표현식입니다. 예를 들어 매일 자정에 일정을 실행하려면 0 0 * * * 표현식을 사용합니다.

  3. 파이프라인을 주문형으로 실행하려면 워크플로를 실행합니다.

    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    gcloud workflows run "${workflow_name}" --project=${project_id} --location=${location} --data '${workflow_args}'
    

    워크플로 인수는 JSON 형식이지만 이스케이프 처리되지 않습니다.

    가져오는 메타데이터의 양에 따라 워크플로를 실행하는 데 몇 분 이상 걸릴 수 있습니다. 진행률을 확인하는 방법에 관한 자세한 내용은 워크플로 실행 결과 액세스를 참조하세요.

    파이프라인 실행이 완료되면 Dataplex 카탈로그에서 가져온 메타데이터를 검색할 수 있습니다.

Terraform

  1. cloud-dataplex 저장소를 클론합니다.

    저장소에는 다음과 같은 Terraform 파일이 포함됩니다.

  2. .tfvars 파일을 수정하여 자리표시자를 커넥터의 정보로 바꿉니다.

    project_id                      = "PROJECT_ID"
    region                          = "LOCATION_ID"
    service_account                 = "SERVICE_ACCOUNT_ID"
    cron_schedule                   = "CRON_SCHEDULE_EXPRESSION"
    workflow_args                   = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID", "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [], "NETWORK_URI": "", "SUBNETWORK_URI": ""}
    
    
    workflow_source                 = <<EOF
    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: $${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: $${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: $${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: $${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: $${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: $${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - $${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - $${"--target_location_id=" + args.CLOUD_REGION}
                    - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - $${"--output_folder=" + WORKFLOW_ID}
                    - $${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: $${args.SERVICE_ACCOUNT}
                        stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
                        $${NETWORK_TYPE}: $${NETWORKING}
                        networkTags: $${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: $${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: $${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: $${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: $${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: $${IMPORT_JOB_STATUS}
    EOF
    

    다음을 바꿉니다.

    • PROJECT_ID: 메타데이터를 가져올 대상 Google Cloud프로젝트의 이름입니다.
    • LOCATION_ID: Dataproc Serverless 및 메타데이터 가져오기 작업이 실행되고 메타데이터가 가져올 대상 Google Cloud 위치입니다.
    • SERVICE_ACCOUNT_ID: 이 문서의 필수 역할 섹션에서 구성한 서비스 계정입니다.
    • CRON_SCHEDULE_EXPRESSION: 파이프라인을 실행할 일정을 정의하는 크론 표현식입니다. 예를 들어 매일 자정에 일정을 실행하려면 0 0 * * * 표현식을 사용합니다.
    • ENTRY_GROUP_ID: 메타데이터를 가져올 항목 그룹의 ID입니다. 항목 그룹 ID는 소문자, 숫자, 하이픈을 포함할 수 있습니다.

      이 항목 그룹의 전체 리소스 이름은 projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID입니다.

    • CREATE_ENTRY_GROUP_BOOLEAN: 프로젝트에 아직 항목 그룹이 없는 경우 파이프라인에서 항목 그룹을 만들도록 하려면 이 값을 true로 설정합니다.
    • BUCKET_ID: 커넥터에서 생성된 메타데이터 가져오기 파일을 저장할 Cloud Storage 버킷의 이름입니다. 워크플로를 실행할 때마다 새 폴더가 생성됩니다.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: 커넥터에 전달할 추가 인수 목록입니다. 예시는 메타데이터 가져오기를 위한 커스텀 커넥터 개발을 참조하세요. 각 인수를 큰따옴표로 묶고 인수를 쉼표로 구분합니다.
    • CONTAINER_IMAGE: Artifact Registry에 호스팅된 커넥터의 맞춤 컨테이너 이미지입니다.
    • ENTRY_TYPES: 가져오기 범위 내에 있는 항목 유형 목록입니다(projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID 형식). LOCATION_ID는 메타데이터를 가져오는Google Cloud 위치와 동일하거나 global여야 합니다.
    • ASPECT_TYPES: 가져오기 범위 내에 있는 관점 유형 목록입니다(projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID 형식). LOCATION_ID는 메타데이터를 가져오는Google Cloud 위치와 동일하거나 global여야 합니다.
    • 선택사항: NETWORK_TAGS 인수의 경우 네트워크 태그 목록을 제공합니다.
    • 선택사항: NETWORK_URI 인수의 경우 데이터 소스에 연결되는 VPC 네트워크의 URI를 제공합니다. 네트워크를 제공하는 경우 서브네트워크 인수를 생략합니다.
    • 선택사항: SUBNETWORK_URI 인수의 경우 데이터 소스에 연결되는 서브네트워크의 URI를 제공합니다. 서브넷을 제공하는 경우 네트워크 인수를 생략합니다.
  3. Terraform을 초기화합니다.

    terraform init
    
  4. .tfvars 파일로 Terraform을 검증합니다.

    terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    CONNECTOR_VARIABLES_FILE을 변수 정의 파일의 이름으로 바꿉니다.

  5. .tfvars 파일로 Terraform을 배포합니다.

    terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Terraform이 지정된 프로젝트에 워크플로와 Cloud Scheduler 작업을 만듭니다. Workflows는 지정된 일정에 따라 파이프라인을 실행합니다.

    가져오는 메타데이터의 양에 따라 워크플로를 실행하는 데 몇 분 이상 걸릴 수 있습니다. 진행률을 확인하는 방법에 관한 자세한 내용은 워크플로 실행 결과 액세스를 참조하세요.

    파이프라인 실행이 완료되면 Dataplex 카탈로그에서 가져온 메타데이터를 검색할 수 있습니다.

작업 로그 보기

Cloud Logging을 사용하여 관리형 연결 파이프라인의 로그를 확인합니다. 로그 페이로드에는 Dataproc Serverless 일괄 작업 및 메타데이터 가져오기 작업의 로그 링크(해당하는 경우)가 포함됩니다. 자세한 내용은 워크플로 로그 보기를 참조하세요.

문제 해결

다음 문제 해결 제안사항을 사용하세요.

다음 단계