Importar metadados de uma origem personalizada usando Workflows

Este documento descreve como importar metadados de uma fonte externa para o Dataplex executando um pipeline de conectividade gerenciada no Workflows.

Para configurar um pipeline de conectividade gerenciada, crie um conector para sua fonte de dados. Em seguida, execute o pipeline no Workflows. O pipeline extrai metadados da sua fonte de dados e os importa para o Dataplex. Se necessário, o pipeline também cria grupos de entrada do Dataplex Catalog no projeto do Google Cloud .

Para mais informações sobre a conectividade gerenciada, consulte Visão geral da conectividade gerenciada.

Antes de começar

Antes de importar metadados, conclua as tarefas desta seção.

Criar um conector

Um conector extrai os metadados da sua origem de dados e gera um arquivo de importação de metadados que pode ser importado pelo Dataplex. O conector é uma imagem do Artifact Registry que pode ser executada no Dataproc sem servidor.

Configurar recursos do Google Cloud

  1. Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.

    Enable the APIs

    Se você não planeja executar o pipeline em uma programação, não é necessário ativar a API Cloud Scheduler.

  2. Crie segredos no Secret Manager para armazenar as credenciais da sua fonte de dados externa.

  3. Configure sua rede de nuvem privada virtual (VPC) para executar cargas de trabalho do Dataproc sem servidor para o Spark.

  4. Crie um bucket do Cloud Storage para armazenar os arquivos de importação de metadados.

  5. Crie os seguintes recursos do Dataplex Catalog:

    1. Crie tipos de aspectos personalizados para as entradas que você quer importar.

    2. Crie tipos de entrada personalizados para as entradas que você quer importar.

Funções exigidas

Uma conta de serviço representa a identidade de um fluxo de trabalho e determina quais permissões ele tem e quais recursos do Google Cloud ele pode acessar. Você precisa de uma conta de serviço para os Workflows (para executar o pipeline) e para o Dataproc sem servidor (para executar o conector).

É possível usar a conta de serviço padrão do Compute Engine (PROJECT_NUMBER-compute@developer.gserviceaccount.com) ou criar sua própria conta de serviço (ou contas) para executar o pipeline de conectividade gerenciada.

Console

  1. No console do Google Cloud , acesse a página IAM.

    Acessar IAM

  2. Selecione o projeto para importar os metadados.

  3. Clique em Conceder acesso e insira o endereço de e-mail da conta de serviço.

  4. Atribua os seguintes papéis à conta de serviço:

    • Gravador de registros
    • Proprietário de grupos de entradas do Dataplex
    • Proprietário do job de metadados do Dataplex
    • Editor do catálogo do Dataplex
    • Editor do Dataproc
    • Worker do Dataproc
    • Acessador de secret do Secret Manager: no secret que armazena as credenciais da sua fonte de dados.
    • Usuário de objetos do Storage: no bucket do Cloud Storage
    • Leitor do Artifact Registry: no repositório do Artifact Registry que contém a imagem do conector.
    • Usuário da conta de serviço: se você usar contas de serviço diferentes, conceda esse papel à conta de serviço que executa os Workflows na conta de serviço que executa os jobs em lote do Dataproc sem servidor.
    • Workflows Invoker: se você quiser programar o pipeline
  5. Salve as alterações.

gcloud

  1. Conceda papéis à conta de serviço. Execute os comandos a seguir:

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/logging.logWriter
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.entryGroupOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.metadataJobOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.catalogEditor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.editor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.worker
    

    Substitua:

    • PROJECT_ID: o nome do projeto de destino do Google Cloud para importar os metadados.
    • SERVICE_ACCOUNT_ID: a conta de serviço, como my-service-account@my-project.iam.gserviceaccount.com.
  2. Conceda à conta de serviço os seguintes papéis no nível do recurso:

    gcloud secrets add-iam-policy-binding SECRET_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/secretmanager.secretaccessor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/storage.objectUser \
        --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')
    gcloud artifacts repositories add-iam-policy-binding REPOSITORY \
        --location=REPOSITORY_LOCATION \
        --member=SERVICE_ACCOUNT_ID} \
        --role=roles/artifactregistry.reader
    

    Substitua:

    • SECRET_ID: o ID do segredo que armazena as credenciais da sua fonte de dados. Ele usa o formato projects/PROJECT_ID/secrets/SECRET_ID.
    • BUCKET_ID: o nome do bucket do Cloud Storage.
    • REPOSITORY: o repositório do Artifact Registry que contém a imagem do conector.
    • REPOSITORY_LOCATION: o Google Cloud local em que o repositório está hospedado.
  3. Conceda à conta de serviço que executa Workflows o papel roles/iam.serviceAccountUser na conta de serviço que executa os jobs em lote do Dataproc sem servidor. É necessário conceder essa função mesmo que você use a mesma conta de serviço para os Workflows e o Dataproc Serverless.

    gcloud iam service-accounts add-iam-policy-binding \
        serviceAccount:SERVICE_ACCOUNT_ID \
        --member='SERVICE_ACCOUNT_ID' \
        --role='roles/iam.serviceAccountUser'
    

    Se você usar contas de serviço diferentes, o valor da flag --member será a conta de serviço que executa os jobs em lote do Dataproc Serverless.

  4. Se você quiser programar o pipeline, conceda à conta de serviço o seguinte papel:

    gcloud projects add-iam-policy-binding PROJECT_ID \
     --member="SERVICE_ACCOUNT_ID" \
     --role=roles/workflows.invoker
    

Importar metadados

Para importar metadados, crie e execute um fluxo de trabalho que execute o pipeline de conectividade gerenciado. Também é possível criar uma programação para executar o pipeline.

Console

  1. Crie o fluxo de trabalho. Forneça as seguintes informações:

    • Conta de serviço: a conta de serviço que você configurou na seção Papéis necessários deste documento.
    • Criptografia: selecione gerenciada pelo Google com tecnologia do Google Cloud.

    • Definir fluxo de trabalho: forneça o seguinte arquivo de definição:

      main:
        params: [args]
        steps:
          - init:
              assign:
              - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
              - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
              - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
              - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
      
          - check_networking:
              switch:
                - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                  raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
                - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                  steps:
                   - submit_extract_job_with_default_network_uri:
                        assign:
                          - NETWORK_TYPE: "networkUri"
                          - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
                - condition: ${NETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_network_uri:
                        assign:
                          - NETWORKING: ${NETWORK_URI}
                          - NETWORK_TYPE: "networkUri"
                - condition: ${SUBNETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_subnetwork_uri:
                        assign:
                          - NETWORKING: ${SUBNETWORK_URI}
                          - NETWORK_TYPE: "subnetworkUri"
              next: check_create_target_entry_group
      
          - check_create_target_entry_group:
              switch:
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                  next: create_target_entry_group
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                  next: generate_extract_job_link
      
          - create_target_entry_group:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: generate_extract_job_link
      
          - generate_extract_job_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                  severity: "INFO"
              next: submit_pyspark_extract_job
      
          - submit_pyspark_extract_job:
              call: http.post
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                headers:
                  Content-Type: "application/json"
                query:
                  batchId: ${WORKFLOW_ID}
                body:
                  pysparkBatch:
                    mainPythonFileUri: file:///main.py
                    args:
                      - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                      - ${"--target_location_id=" + args.CLOUD_REGION}
                      - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                      - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                      - ${"--output_folder=" + WORKFLOW_ID}
                      - ${args.ADDITIONAL_CONNECTOR_ARGS}
                  runtimeConfig:
                      containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                  environmentConfig:
                      executionConfig:
                          serviceAccount: ${args.SERVICE_ACCOUNT}
                          stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                          ${NETWORK_TYPE}: ${NETWORKING}
                          networkTags: ${NETWORK_TAGS}
              result: RESPONSE_MESSAGE
              next: check_pyspark_extract_job
      
          - check_pyspark_extract_job:
              call: http.get
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: PYSPARK_EXTRACT_JOB_STATUS
              next: check_pyspark_extract_job_done
      
          - check_pyspark_extract_job_done:
              switch:
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                  next: generate_import_logs_link
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              next: pyspark_extract_job_wait
      
          - pyspark_extract_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_pyspark_extract_job
      
          - generate_import_logs_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                  severity: "INFO"
              next: submit_import_job
      
          - submit_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                body:
                  type: IMPORT
                  import_spec:
                    source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                    entry_sync_mode: FULL
                    aspect_sync_mode: INCREMENTAL
                    log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                    scope:
                      entry_groups: 
                        - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                      entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                      aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
              result: IMPORT_JOB_RESPONSE
              next: get_job_start_time
      
          - get_job_start_time:
              assign:
                - importJobStartTime: ${sys.now()}
              next: import_job_startup_wait
      
          - import_job_startup_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: initial_get_import_job
      
          - initial_get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_status_available
      
          - check_import_job_status_available:
              switch:
                - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                  next: kill_import_job
                - condition: ${"status" in IMPORT_JOB_STATUS.body}
                  next: check_import_job_done
              next: import_job_status_wait
      
          - import_job_status_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_import_job_status_available
      
          - check_import_job_done:
              switch:
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                  next: the_end
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                  next: kill_import_job
              next: import_job_wait
      
          - get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_done
      
          - import_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: get_import_job
      
          - kill_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: get_killed_import_job
      
          - get_killed_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: KILLED_IMPORT_JOB_STATUS
              next: killed
      
          - killed:
              raise: ${KILLED_IMPORT_JOB_STATUS}
      
          - the_end:
              return: ${IMPORT_JOB_STATUS}
  2. Para executar o pipeline sob demanda, execute o fluxo de trabalho.

    Forneça os seguintes argumentos de execução:

    {
        "TARGET_PROJECT_ID": "PROJECT_ID",
        "CLOUD_REGION": "LOCATION_ID",
        "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
        "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
        "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
        "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
        "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
        "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
        "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
        "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
        "IMPORT_JOB_LOG_LEVEL": "INFO",
        "NETWORK_TAGS": [],
        "NETWORK_URI": "",
        "SUBNETWORK_URI": ""
     }
    

    Substitua:

    • PROJECT_ID: o nome do projeto de destino do Google Cloud para importar os metadados.
    • LOCATION_ID: o local do Google Cloud , onde os trabalhos de importação de dados e sem servidor do Dataproc serão executados e os metadados serão importados.
    • ENTRY_GROUP_ID: o ID do grupo de entrada para importar metadados. O ID do grupo de entrada pode conter letras minúsculas, números e hífens.

      O nome completo do recurso desse grupo de entradas é projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: se você quiser que o pipeline crie o grupo de entrada se ele ainda não existir no projeto, defina esse valor como true.
    • BUCKET_ID: o nome do bucket do Cloud Storage para armazenar o arquivo de importação de metadados gerado pelo conector. Cada execução de fluxo de trabalho cria uma nova pasta.
    • SERVICE_ACCOUNT_ID: a conta de serviço que você configurou na seção Papéis obrigatórios deste documento. A conta de serviço executa o conector no Dataproc Serverless.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: uma lista de argumentos adicionais para transmitir ao conector. Para conferir exemplos, consulte Desenvolver um conector personalizado para importação de metadados. Coloque cada argumento entre aspas duplas e separe-os com vírgulas.
    • CONTAINER_IMAGE: a imagem de contêiner personalizada do conector hospedado no Artifact Registry.
    • ENTRY_TYPES: uma lista de tipos de entrada no escopo para importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. O LOCATION_ID precisa ser o mesmo Google Cloud para onde você importa os metadados ou global.
    • ASPECT_TYPES: uma lista de tipos de aspectos que estão no escopo para importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. O LOCATION_ID precisa ser o mesmo Google Cloud para onde você importa os metadados ou global.
    • Opcional: para o argumento NETWORK_TAGS, forneça uma lista de tags de rede.
    • Opcional: para o argumento NETWORK_URI, forneça o URI da rede VPC que se conecta à fonte de dados. Se você fornecer uma rede, omita o argumento de sub-rede.
    • Opcional: para o argumento SUBNETWORK_URI, forneça o URI da sub-rede que se conecta à fonte de dados. Se você fornecer uma sub-rede, omita o argumento de rede.

    Dependendo da quantidade de metadados importados, o pipeline pode levar vários minutos ou mais para ser executado. Para mais informações sobre como conferir o progresso, consulte Acessar os resultados da execução do fluxo de trabalho.

    Depois que o pipeline terminar de ser executado, você poderá pesquisar os metadados importados no Dataplex Catalog.

  3. Opcional: se você quiser executar o pipeline em uma programação, crie uma programação usando o Cloud Scheduler. Forneça as seguintes informações:

    • Frequência: uma expressão unix-cron que define a programação para executar o pipeline.
    • Argumento de fluxo de trabalho: os argumentos de execução do conector, conforme descrito na etapa anterior.
    • Conta de serviço: a conta de serviço. A conta de serviço gerencia o programador.

gcloud

  1. Salve a seguinte definição de carga de trabalho como um arquivo YAML:

    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                steps:
                 - submit_extract_job_with_default_network_uri:
                      assign:
                        - NETWORK_TYPE: "networkUri"
                        - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
              - condition: ${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: ${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: ${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: ${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - ${"--target_location_id=" + args.CLOUD_REGION}
                    - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--output_folder=" + WORKFLOW_ID}
                    - ${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: ${args.SERVICE_ACCOUNT}
                        stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                        ${NETWORK_TYPE}: ${NETWORKING}
                        networkTags: ${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: ${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: ${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: ${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: ${IMPORT_JOB_STATUS}
  2. Defina variáveis do Bash, crie o fluxo de trabalho e, se quiser, crie uma programação para executar o pipeline:

    # Define Bash variables (replace with your actual values)
    project_id="PROJECT_ID"
    region="LOCATION_ID"
    service_account="SERVICE_ACCOUNT_ID"
    workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    # Create Workflows resource
    gcloud workflows deploy ${workflow_name} \
      --project=${project_id} \
      --location=${region} \
      --source=${workflow_source} \
      --service-account=${service_account}
    
    # Create Cloud Scheduler job
    gcloud scheduler jobs create http ${workflow_name}-scheduler \
      --project=${project_id} \
      --location=${region} \
      --schedule="CRON_SCHEDULE_EXPRESSION" \
      --time-zone="UTC" \
      --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \
      --http-method="POST" \
      --oauth-service-account-email=${service_account} \
      --headers="Content-Type=application/json" \
      --message-body='{"argument": ${workflow_args}}'
    

    Substitua:

    • PROJECT_ID: o nome do projeto de destino do Google Cloud para importar os metadados.
    • LOCATION_ID: o local do Google Cloud , onde os trabalhos de importação de dados e sem servidor do Dataproc serão executados e os metadados serão importados.
    • SERVICE_ACCOUNT_ID: a conta de serviço que você configurou na seção Papéis obrigatórios deste documento.
    • WORKFLOW_DEFINITION_FILE: o caminho para o arquivo YAML de definição do fluxo de trabalho.
    • WORKFLOW_NAME: o nome do fluxo de trabalho.
    • WORKFLOW_ARGUMENTS: os argumentos de execução a serem transmitidos ao conector. Os argumentos estão no formato JSON:

      {
          "TARGET_PROJECT_ID": "PROJECT_ID",
          "CLOUD_REGION": "LOCATION_ID",
          "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
          "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
          "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
          "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
          "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
          "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
          "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
          "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
          "IMPORT_JOB_LOG_LEVEL": "INFO",
          "NETWORK_TAGS": [],
          "NETWORK_URI": "",
          "SUBNETWORK_URI": ""
       }
      

      No Cloud Scheduler, as aspas duplas dentro da string entre aspas são escapadas usando barras invertidas (\). Por exemplo: --message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}".

      Substitua:

      • ENTRY_GROUP_ID: o ID do grupo de entrada para importar metadados. O ID do grupo de entrada pode conter letras minúsculas, números e hífens.

        O nome completo do recurso desse grupo de entradas é projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

      • CREATE_ENTRY_GROUP_BOOLEAN: se você quiser que o pipeline crie o grupo de entrada se ele ainda não existir no projeto, defina esse valor como true.
      • BUCKET_ID: o nome do bucket do Cloud Storage para armazenar o arquivo de importação de metadados gerado pelo conector. Cada execução de fluxo de trabalho cria uma nova pasta.
      • ADDITIONAL_CONNECTOR_ARGUMENTS: uma lista de argumentos adicionais para transmitir ao conector. Para conferir exemplos, consulte Desenvolver um conector personalizado para importação de metadados.
      • CONTAINER_IMAGE: a imagem de contêiner personalizada do conector hospedado no Artifact Registry.
      • ENTRY_TYPES: uma lista de tipos de entrada no escopo para importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. O LOCATION_ID precisa ser o mesmo Google Cloud para onde você importa os metadados ou global.
      • ASPECT_TYPES: uma lista de tipos de aspectos que estão no escopo para importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. O LOCATION_ID precisa ser o mesmo Google Cloud para onde você importa os metadados ou global.
      • Opcional: para o argumento NETWORK_TAGS, forneça uma lista de tags de rede.
      • Opcional: para o argumento NETWORK_URI, forneça o URI da rede VPC que se conecta à fonte de dados. Se você fornecer uma rede, omita o argumento de sub-rede.
      • Opcional: para o argumento SUBNETWORK_URI, forneça o URI da sub-rede que se conecta à fonte de dados. Se você fornecer uma sub-rede, omita o argumento de rede.
    • CRON_SCHEDULE_EXPRESSION: uma expressão cron que define a programação para executar o pipeline. Por exemplo, para executar a programação à meia-noite todos os dias, use a expressão 0 0 * * *.

  3. Para executar o pipeline sob demanda, execute o fluxo de trabalho:

    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    gcloud workflows run "${workflow_name}" --project=${project_id} --location=${location} --data '${workflow_args}'
    

    Os argumentos do fluxo de trabalho estão no formato JSON, mas não são escapados.

    Dependendo da quantidade de metadados importados, a execução do fluxo de trabalho pode levar vários minutos ou mais. Para mais informações sobre como ver o progresso, consulte Acessar os resultados da execução do fluxo de trabalho.

    Depois que o pipeline terminar de ser executado, você poderá pesquisar os metadados importados no Dataplex Catalog.

Terraform

  1. Clone o repositório cloud-dataplex.

    O repositório inclui os seguintes arquivos do Terraform:

  2. Edite o arquivo .tfvars para substituir os marcadores de posição pelas informações do conector.

    project_id                      = "PROJECT_ID"
    region                          = "LOCATION_ID"
    service_account                 = "SERVICE_ACCOUNT_ID"
    cron_schedule                   = "CRON_SCHEDULE_EXPRESSION"
    workflow_args                   = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID", "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [], "NETWORK_URI": "", "SUBNETWORK_URI": ""}
    
    
    workflow_source                 = <<EOF
    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: $${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: $${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: $${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: $${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: $${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: $${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - $${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - $${"--target_location_id=" + args.CLOUD_REGION}
                    - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - $${"--output_folder=" + WORKFLOW_ID}
                    - $${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: $${args.SERVICE_ACCOUNT}
                        stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
                        $${NETWORK_TYPE}: $${NETWORKING}
                        networkTags: $${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: $${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: $${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: $${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: $${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: $${IMPORT_JOB_STATUS}
    EOF
    

    Substitua:

    • PROJECT_ID: o nome do projeto de destino do Google Cloud para importar os metadados.
    • LOCATION_ID: o local do Google Cloud alvo, onde os trabalhos de importação de dados e sem servidor do Dataproc serão executados e os metadados serão importados.
    • SERVICE_ACCOUNT_ID: a conta de serviço que você configurou na seção Papéis obrigatórios deste documento.
    • CRON_SCHEDULE_EXPRESSION: uma expressão cron que define a programação para executar o pipeline. Por exemplo, para executar a programação à meia-noite todos os dias, use a expressão 0 0 * * *.
    • ENTRY_GROUP_ID: o ID do grupo de entrada para importar metadados. O ID do grupo de entrada pode conter letras minúsculas, números e hífens.

      O nome completo do recurso desse grupo de entradas é projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: se você quiser que o pipeline crie o grupo de entrada se ele ainda não existir no projeto, defina esse valor como true.
    • BUCKET_ID: o nome do bucket do Cloud Storage para armazenar o arquivo de importação de metadados gerado pelo conector. Cada execução de fluxo de trabalho cria uma nova pasta.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: uma lista de argumentos adicionais para transmitir ao conector. Para conferir exemplos, consulte Desenvolver um conector personalizado para importação de metadados. Coloque cada argumento entre aspas duplas e separe-os com vírgulas.
    • CONTAINER_IMAGE: a imagem de contêiner personalizada do conector hospedado no Artifact Registry.
    • ENTRY_TYPES: uma lista de tipos de entrada no escopo para importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. O LOCATION_ID precisa ser o mesmo Google Cloud para onde você importa os metadados ou global.
    • ASPECT_TYPES: uma lista de tipos de aspectos que estão no escopo para importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. O LOCATION_ID precisa ser o mesmo Google Cloud para onde você importa os metadados ou global.
    • Opcional: para o argumento NETWORK_TAGS, forneça uma lista de tags de rede.
    • Opcional: para o argumento NETWORK_URI, forneça o URI da rede VPC que se conecta à fonte de dados. Se você fornecer uma rede, omita o argumento de sub-rede.
    • Opcional: para o argumento SUBNETWORK_URI, forneça o URI da sub-rede que se conecta à fonte de dados. Se você fornecer uma sub-rede, omita o argumento de rede.
  3. Inicialize o Terraform:

    terraform init
    
  4. Valide o Terraform com seu arquivo .tfvars:

    terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Substitua CONNECTOR_VARIABLES_FILE pelo nome do arquivo de definições de variáveis.

  5. Implante o Terraform com seu arquivo .tfvars:

    terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    O Terraform cria um fluxo de trabalho e um job do Cloud Scheduler no projeto especificado. O Workflows executa o pipeline na programação especificada.

    Dependendo da quantidade de metadados importados, a execução do fluxo de trabalho pode levar vários minutos ou mais. Para mais informações sobre como ver o progresso, consulte Acessar os resultados da execução do fluxo de trabalho.

    Depois que o pipeline terminar de ser executado, você poderá pesquisar os metadados importados no Dataplex Catalog.

Ver registros do job

Use o Cloud Logging para conferir os registros de um pipeline de conectividade gerenciado. O payload de registro inclui um link para os registros do job em lote do Dataproc sem servidor e do job de importação de metadados, conforme for relevante. Para mais informações, consulte Conferir os registros do fluxo de trabalho.

Solução de problemas

Use as seguintes sugestões de solução de problemas:

A seguir