Importar metadados de uma origem personalizada usando fluxos de trabalho

Este documento descreve como importar metadados de uma fonte de terceiros para Dataplex executando um pipeline de conectividade gerenciado no Workflows.

Para configurar um pipeline de conectividade gerenciado, você cria um conector para sua fonte de dados. Depois, você vai executar o pipeline no Workflows. O O pipeline extrai metadados da fonte de dados e, em seguida, importa esses metadados no Dataplex. Se necessário, o pipeline também cria Grupos de entradas do catálogo do Dataplex no seu projeto do Google Cloud.

Para mais informações sobre a conectividade gerenciada, consulte Visão geral da conectividade gerenciada.

Antes de começar

Antes de importar metadados, conclua as tarefas desta seção.

Criar um conector

Um conector extrai os metadados dos seus dados e gera um arquivo de importação de metadados que pode ser importado pelo o Dataplex. O conector é uma imagem do Artifact Registry que pode ser executada no Dataproc sem servidor.

Configurar recursos do Google Cloud

  1. Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.

    Enable the APIs

    Se você não planeja executar o pipeline em uma programação, não é necessário ativar a API Cloud Scheduler.

  2. Crie secrets no Secret Manager para armazenar as credenciais da sua fonte de dados de terceiros.

  3. Configure sua rede de nuvem privada virtual (VPC) para executar cargas de trabalho do Dataproc sem servidor para o Spark.

  4. Crie um bucket do Cloud Storage para armazenar os arquivos de importação de metadados.

  5. Crie os seguintes recursos do catálogo do Dataplex:

    1. Crie tipos de aspecto personalizados para as entradas que você quer importar.

    2. Crie tipos de entrada personalizados para as entradas que você quer importar.

Funções exigidas

Uma conta de serviço representa a identidade de um fluxo de trabalho e determina quais permissões ele tem e quais recursos do Google Cloud pode acessar. Você precisa de uma conta de serviço para o Workflows (para executar pipeline) e para o Dataproc sem servidor (para executar o conector).

É possível usar a conta de serviço padrão do Compute Engine (PROJECT_NUMBER-compute@developer.gserviceaccount.com) ou criar sua própria conta de serviço (ou contas) para executar o pipeline de conectividade gerenciada.

Console

  1. No console do Google Cloud, abra a página IAM.

    Acessar IAM

  2. Selecione o projeto para importar os metadados.

  3. Clique em Conceder acesso e insira o endereço de e-mail da conta de serviço.

  4. Atribua os seguintes papéis à conta de serviço:

    • Gravador de registros
    • Proprietário de grupos de entradas do Dataplex
    • Proprietário do job de metadados do Dataplex
    • Editor de catálogo do Dataplex
    • Editor do Dataproc
    • Worker do Dataproc
    • Acesso de secret do Secret Manager: no secret que armazena as credenciais da fonte de dados
    • Usuário de objetos do Storage: no bucket do Cloud Storage
    • Leitor do Artifact Registry: no repositório do Artifact Registry que contém a imagem do conector
    • Usuário da conta de serviço: se você usar contas de serviço diferentes, conceda esse papel à conta de serviço que executa os fluxos de trabalho na conta de serviço que executa os jobs em lote do Dataproc sem servidor.
    • Invocador de fluxos de trabalho: se quiser programar o pipeline
  5. Salve as alterações.

gcloud

  1. Conceda papéis à conta de serviço. Execute os comandos a seguir:

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/logging.logWriter
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.entryGroupOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.metadataJobOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.catalogEditor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.editor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.worker
    

    Substitua:

    • PROJECT_ID: o nome do projeto do Google Cloud de destino para importar os metadados.
    • SERVICE_ACCOUNT_ID: a conta de serviço, como my-service-account@my-project.iam.gserviceaccount.com.
  2. Conceda à conta de serviço os seguintes papéis no nível do recurso:

    gcloud secrets add-iam-policy-binding SECRET_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/secretmanager.secretaccessor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/storage.objectUser \
        --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')
    gcloud artifacts repositories add-iam-policy-binding REPOSITORY \
        --location=REPOSITORY_LOCATION \
        --member=SERVICE_ACCOUNT_ID} \
        --role=roles/artifactregistry.reader
    

    Substitua:

    • SECRET_ID: o ID do segredo que armazena as credenciais da sua fonte de dados. Ele usa o formato projects/PROJECT_ID/secrets/SECRET_ID:
    • BUCKET_ID: o nome do Cloud Storage. do Google Cloud.
    • REPOSITORY: o repositório do Artifact Registry que contém a imagem do conector.
    • REPOSITORY_LOCATION: o Google Cloud local onde o repositório está hospedado.
  3. Conceda à conta de serviço que executa fluxos de trabalho o papel roles/iam.serviceAccountUser na conta de serviço que executa os jobs em lote do Dataproc sem servidor. Você precisa conceder esse papel, mesmo que você use a mesma conta de serviço para Workflows e Dataproc sem servidor.

    gcloud iam service-accounts add-iam-policy-binding \
        serviceAccount:SERVICE_ACCOUNT_ID \
        --member='SERVICE_ACCOUNT_ID' \
        --role='roles/iam.serviceAccountUser'
    

    Se você usar contas de serviço diferentes, o valor da sinalização --member é a conta de serviço que executa o Dataproc jobs em lote.

  4. Se você quiser programar o pipeline, conceda à conta de serviço o seguinte papel:

    gcloud projects add-iam-policy-binding PROJECT_ID \
     --member="SERVICE_ACCOUNT_ID" \
     --role=roles/workflows.invoker
    

Importar metadados

Para importar metadados, crie e execute um fluxo de trabalho que executa o pipeline de conectividade. Também é possível criar uma programação o pipeline.

Console

  1. Crie o fluxo de trabalho. Forneça as seguintes informações:

    • Conta de serviço: a conta de serviço que você configurou na seção Papéis necessários deste documento.
    • Criptografia: selecione Chave de criptografia gerenciada pelo Google.

    • Definir fluxo de trabalho: forneça o seguinte arquivo de definição:

      main:
        params: [args]
        steps:
          - init:
              assign:
              - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
              - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
              - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
              - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
      
          - check_networking:
              switch:
                - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                  raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
                - condition: ${NETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_network_uri:
                        assign:
                          - NETWORKING: ${NETWORK_URI}
                          - NETWORK_TYPE: "networkUri"
                - condition: ${SUBNETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_subnetwork_uri:
                        assign:
                          - NETWORKING: ${SUBNETWORK_URI}
                          - NETWORK_TYPE: "subnetworkUri"
              next: set_default_networking
      
          - set_default_networking:
              assign:
                - NETWORK_TYPE: "networkUri"
                - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
              next: check_create_target_entry_group
      
          - check_create_target_entry_group:
              switch:
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                  next: create_target_entry_group
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                  next: generate_extract_job_link
      
          - create_target_entry_group:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: generate_extract_job_link
      
          - generate_extract_job_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                  severity: "INFO"
              next: submit_pyspark_extract_job
      
          - submit_pyspark_extract_job:
              call: http.post
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                headers:
                  Content-Type: "application/json"
                query:
                  batchId: ${WORKFLOW_ID}
                body:
                  pysparkBatch:
                    mainPythonFileUri: file:///main.py
                    args:
                      - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                      - ${"--target_location_id=" + args.CLOUD_REGION}
                      - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                      - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                      - ${"--output_folder=" + WORKFLOW_ID}
                      - ${args.ADDITIONAL_CONNECTOR_ARGS}
                  runtimeConfig:
                      containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                  environmentConfig:
                      executionConfig:
                          serviceAccount: ${args.SERVICE_ACCOUNT}
                          stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                          ${NETWORK_TYPE}: ${NETWORKING}
                          networkTags: ${NETWORK_TAGS}
              result: RESPONSE_MESSAGE
              next: check_pyspark_extract_job
      
          - check_pyspark_extract_job:
              call: http.get
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: PYSPARK_EXTRACT_JOB_STATUS
              next: check_pyspark_extract_job_done
      
          - check_pyspark_extract_job_done:
              switch:
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                  next: generate_import_logs_link
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              next: pyspark_extract_job_wait
      
          - pyspark_extract_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_pyspark_extract_job
      
          - generate_import_logs_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                  severity: "INFO"
              next: submit_import_job
      
          - submit_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                body:
                  type: IMPORT
                  import_spec:
                    source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                    entry_sync_mode: FULL
                    aspect_sync_mode: INCREMENTAL
                    log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                    scope:
                      entry_groups: 
                        - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                      entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                      aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
              result: IMPORT_JOB_RESPONSE
              next: get_job_start_time
      
          - get_job_start_time:
              assign:
                - importJobStartTime: ${sys.now()}
              next: import_job_startup_wait
      
          - import_job_startup_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: initial_get_import_job
      
          - initial_get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_status_available
      
          - check_import_job_status_available:
              switch:
                - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                  next: kill_import_job
                - condition: ${"status" in IMPORT_JOB_STATUS.body}
                  next: check_import_job_done
              next: import_job_status_wait
      
          - import_job_status_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_import_job_status_available
      
          - check_import_job_done:
              switch:
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                  next: the_end
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                  next: kill_import_job
              next: import_job_wait
      
          - get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_done
      
          - import_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: get_import_job
      
          - kill_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: get_killed_import_job
      
          - get_killed_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: KILLED_IMPORT_JOB_STATUS
              next: killed
      
          - killed:
              raise: ${KILLED_IMPORT_JOB_STATUS}
      
          - the_end:
              return: ${IMPORT_JOB_STATUS}
  2. Para executar o pipeline sob demanda, executar o fluxo de trabalho.

    Forneça os seguintes argumentos do ambiente de execução:

    {
        "TARGET_PROJECT_ID": "PROJECT_ID",
        "CLOUD_REGION": "LOCATION_ID",
        "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
        "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
        "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
        "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
        "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
        "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
        "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
        "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
        "IMPORT_JOB_LOG_LEVEL": "INFO",
        "NETWORK_TAGS": [],
        "NETWORK_URI": "",
        "SUBNETWORK_URI": ""
     }
    

    Substitua:

    • PROJECT_ID: o nome do Google Cloud de destino. em que os metadados serão importados.
    • LOCATION_ID: o local de destino do Google Cloud. em que serão executados os jobs de importação de metadados e sem servidor do Dataproc. e os metadados serão importados.
    • ENTRY_GROUP_ID: o ID do grupo de entradas a ser importado. os metadados. O ID do grupo de entrada pode conter letras minúsculas, números e hífens.

      O nome completo do recurso desse grupo de entradas é projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: se você quiser que o pipeline crie o grupo de entrada se ele ainda não existir no projeto, configure como true.
    • BUCKET_ID: o nome do bucket do Cloud Storage para armazenar o arquivo de importação de metadados gerado pelo conector. Cada execução de fluxo de trabalho cria uma nova pasta.
    • SERVICE_ACCOUNT_ID: a conta de serviço que você configurou nos Papéis obrigatórios deste documento. A conta de serviço executa o conector no Dataproc Serverless.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: uma lista de argumentos adicionais a serem transmitidos ao conector. Para conferir exemplos, consulte Desenvolver um conector personalizado para importação de metadados. Coloque cada argumento entre aspas aspas e separe os argumentos com vírgulas.
    • CONTAINER_IMAGE: a imagem do contêiner personalizada do de rede hospedado no Artifact Registry.
    • ENTRY_TYPES: uma lista de tipos de entrada que estão no escopo. para importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. O LOCATION_ID precisa ser o mesmo local do Google Cloud para importação de metadados ou global.
    • ASPECT_TYPES: uma lista de tipos de aspecto que estão no escopo. para importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. O LOCATION_ID precisa ser o mesmo local do Google Cloud para importação de metadados ou global.
    • Opcional: para o argumento NETWORK_TAGS, forneça uma lista de tags de rede.
    • Opcional: para o argumento NETWORK_URI, forneça o URI da rede VPC que se conecta à fonte de dados. Se você fornecer uma rede, omita a argumento de sub-rede.
    • Opcional: para o argumento SUBNETWORK_URI, forneça o URI da sub-rede que se conecta à fonte de dados. Se você fornecer uma sub-rede, omita o argumento network.

    Dependendo da quantidade de metadados importados, o pipeline pode levar vários minutos ou mais para ser executado. Para mais informações sobre como conferir o progresso, consulte Acessar os resultados da execução do fluxo de trabalho.

    Após a execução do pipeline, será possível pesquise os metadados importados no Catálogo do Dataplex.

  3. Opcional: se você quiser executar o pipeline de acordo com uma programação, criar uma programação usando Cloud Scheduler. Forneça as seguintes informações:

    • Frequência: uma expressão unix-cron que define a programação para executar o pipeline.
    • Argumento do fluxo de trabalho: os argumentos do ambiente de execução para o conector, conforme descritas na etapa anterior.
    • Conta de serviço: a conta de serviço. A conta de serviço gerencia o programador.

gcloud

  1. Salve a seguinte definição de carga de trabalho como um arquivo YAML:

    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: ${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: ${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: ${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: ${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - ${"--target_location_id=" + args.CLOUD_REGION}
                    - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--output_folder=" + WORKFLOW_ID}
                    - ${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: ${args.SERVICE_ACCOUNT}
                        stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                        ${NETWORK_TYPE}: ${NETWORKING}
                        networkTags: ${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: ${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: ${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: ${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: ${IMPORT_JOB_STATUS}
  2. Defina variáveis do Bash, crie o fluxo de trabalho e, opcionalmente, crie uma programação para executar o pipeline:

    # Define Bash variables (replace with your actual values)
    project_id="PROJECT_ID"
    region="LOCATION_ID"
    service_account="SERVICE_ACCOUNT_ID"
    workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    # Create Workflows resource
    gcloud workflows deploy ${workflow_name} \
      --project=${project_id} \
      --location=${region} \
      --source=${workflow_source} \
      --service-account=${service_account}
    
    # Create Cloud Scheduler job
    gcloud scheduler jobs create http ${workflow_name}-scheduler \
      --project=${project_id} \
      --location=${region} \
      --schedule="CRON_SCHEDULE_EXPRESSION" \
      --time-zone="UTC" \
      --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \
      --http-method="POST" \
      --oauth-service-account-email=${service_account} \
      --headers="Content-Type=application/json" \
      --message-body='{"argument": ${workflow_args}}'
    

    Substitua:

    • PROJECT_ID: o nome do Google Cloud de destino. em que os metadados serão importados.
    • LOCATION_ID: o local de destino do Google Cloud. em que serão executados os jobs de importação de metadados e sem servidor do Dataproc. e os metadados serão importados.
    • SERVICE_ACCOUNT_ID: a conta de serviço que você configurou nos Papéis obrigatórios deste documento.
    • WORKFLOW_DEFINITION_FILE: o caminho para o arquivo YAML de definição do fluxo de trabalho.
    • WORKFLOW_NAME: o nome do fluxo de trabalho.
    • WORKFLOW_ARGUMENTS: os argumentos de execução a serem transmitidos ao conector. Os argumentos estão no formato JSON:

      {
          "TARGET_PROJECT_ID": "PROJECT_ID",
          "CLOUD_REGION": "LOCATION_ID",
          "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
          "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
          "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
          "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
          "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
          "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
          "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
          "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
          "IMPORT_JOB_LOG_LEVEL": "INFO",
          "NETWORK_TAGS": [],
          "NETWORK_URI": "",
          "SUBNETWORK_URI": ""
       }
      

      No Cloud Scheduler, as aspas duplas dentro da string entre aspas têm escape usando barras invertidas (\). Por exemplo: --message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}":

      Substitua:

      • ENTRY_GROUP_ID: o ID do grupo de entrada para importar metadados. O ID do grupo de entrada pode conter letras minúsculas, números e hífens.

        O nome completo do recurso desse grupo de entradas é projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

      • CREATE_ENTRY_GROUP_BOOLEAN: se você quiser que o pipeline crie o grupo de entrada se ele ainda não existir no projeto, configure como true.
      • BUCKET_ID: o nome do Cloud Storage. para armazenar o arquivo de importação de metadados gerado pelo conector. Cada execução de fluxo de trabalho cria uma nova pasta.
      • ADDITIONAL_CONNECTOR_ARGUMENTS: uma lista de outros a serem passados para o conector. Para conferir exemplos, consulte Desenvolver um conector personalizado para importação de metadados.
      • CONTAINER_IMAGE: a imagem do contêiner personalizada do conector hospedado no Artifact Registry.
      • ENTRY_TYPES: uma lista de tipos de entrada que estão no escopo. para importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. O LOCATION_ID precisa ser o mesmo local do Google Cloud para importação de metadados ou global.
      • ASPECT_TYPES: uma lista de tipos de aspecto que estão no escopo. para importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. O LOCATION_ID precisa ser o mesmo local do Google Cloud para importação de metadados ou global.
      • Opcional: para o argumento NETWORK_TAGS, forneça uma lista de tags de rede.
      • Opcional: para o argumento NETWORK_URI, forneça o URI da rede VPC que se conecta à fonte de dados. Se você fornecer uma rede, omita a argumento de sub-rede.
      • Opcional: para o argumento SUBNETWORK_URI, forneça o URI da sub-rede que se conecta à fonte de dados. Se você fornecer uma sub-rede, omita o argumento de rede.
    • CRON_SCHEDULE_EXPRESSION: uma expressão cron que define a programação para executar o pipeline. Por exemplo, para executar a programação todos os dias à meia-noite, use a expressão 0 0 * * *.

  3. Para executar o pipeline sob demanda, Execute o fluxo de trabalho:

    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    gcloud workflows run "${workflow_name}" --project=${project_id} --location=${location} --data '${workflow_args}'
    

    Os argumentos do fluxo de trabalho estão no formato JSON, mas não são escapados.

    Dependendo da quantidade de metadados importados, a execução do fluxo de trabalho pode levar vários minutos ou mais. Para mais informações sobre como conferir o progresso, consultar Acesse os resultados da execução do fluxo de trabalho.

    Após a execução do pipeline, será possível pesquise os metadados importados no Catálogo do Dataplex.

Terraform

  1. Clone o Repositório cloud-dataplex.

    O repositório inclui os seguintes arquivos do Terraform:

  2. Edite o arquivo .tfvars para substituir os marcadores de posição pelas informações. do seu conector.

    project_id                      = "PROJECT_ID"
    region                          = "LOCATION_ID"
    service_account                 = "SERVICE_ACCOUNT_ID"
    cron_schedule                   = "CRON_SCHEDULE_EXPRESSION"
    workflow_args                   = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID", "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [], "NETWORK_URI": "", "SUBNETWORK_URI": ""}
    
    
    workflow_source                 = <<EOF
    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: $${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: $${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: $${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: $${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: $${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: $${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - $${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - $${"--target_location_id=" + args.CLOUD_REGION}
                    - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - $${"--output_folder=" + WORKFLOW_ID}
                    - $${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: $${args.SERVICE_ACCOUNT}
                        stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
                        $${NETWORK_TYPE}: $${NETWORKING}
                        networkTags: $${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: $${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: $${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: $${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: $${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: $${IMPORT_JOB_STATUS}
    EOF
    

    Substitua:

    • PROJECT_ID: o nome do Google Cloud de destino. em que os metadados serão importados.
    • LOCATION_ID: o local de destino do Google Cloud. em que serão executados os jobs de importação de metadados e sem servidor do Dataproc. e os metadados serão importados.
    • SERVICE_ACCOUNT_ID: a conta de serviço que você configurou nos Papéis obrigatórios deste documento.
    • CRON_SCHEDULE_EXPRESSION: uma expressão cron que define a programação para executar o pipeline. Por exemplo, para executar a programação à meia-noite todos os dias, use a expressão 0 0 * * *.
    • ENTRY_GROUP_ID: o ID do grupo de entrada para importar metadados. O ID do grupo de entrada pode conter letras minúsculas, números e hífens.

      O nome completo do recurso desse grupo de entradas é projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: se você quiser que o pipeline crie o grupo de entrada se ele ainda não existir no projeto, configure como true.
    • BUCKET_ID: o nome do Cloud Storage. para armazenar o arquivo de importação de metadados gerado pelo conector. Cada execução de fluxo de trabalho cria uma nova pasta.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: uma lista de argumentos adicionais a serem transmitidos ao conector. Para conferir exemplos, consulte Desenvolver um conector personalizado para importação de metadados. Coloque cada argumento entre aspas aspas e separe os argumentos com vírgulas.
    • CONTAINER_IMAGE: a imagem do contêiner personalizada do de rede hospedado no Artifact Registry.
    • ENTRY_TYPES: uma lista de tipos de entrada que estão no escopo. para importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. O LOCATION_ID precisa ser o mesmo local do Google Cloud para importação de metadados ou global.
    • ASPECT_TYPES: uma lista de tipos de aspecto que estão no escopo. para importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. O LOCATION_ID precisa ser o mesmo local do Google Cloud para importação de metadados ou global.
    • Opcional: para o argumento NETWORK_TAGS, forneça uma lista de tags de rede.
    • Opcional: para o argumento NETWORK_URI, forneça o URI da rede VPC que se conecta à fonte de dados. Se você fornecer uma rede, omita a argumento de sub-rede.
    • Opcional: para o argumento SUBNETWORK_URI, forneça o URI da sub-rede que se conecta à fonte de dados. Se você fornecer uma sub-rede, omita o argumento de rede.
  3. Inicialize o Terraform:

    terraform init
    
  4. Valide o Terraform com seu arquivo .tfvars:

    terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Substitua CONNECTOR_VARIABLES_FILE pelo nome do arquivo de definições de variáveis.

  5. Implante o Terraform com seu arquivo .tfvars:

    terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    O Terraform cria um fluxo de trabalho e um job do Cloud Scheduler no do projeto especificado. O Workflows executa o pipeline na programação especificada.

    Dependendo da quantidade de metadados importada, o fluxo de trabalho pode levar vários minutos ou mais para ser executado. Para mais informações sobre como ver o progresso, consulte Acessar os resultados da execução do fluxo de trabalho.

    Após a execução do pipeline, será possível pesquise os metadados importados no Catálogo do Dataplex.

Ver registros do job

Use o Cloud Logging para conferir os registros de um pipeline de conectividade gerenciado. O registro inclui um link para os registros da instância do Dataproc o job em lote e o job de importação de metadados, conforme relevante. Para mais informações, consulte Veja os registros do fluxo de trabalho.

Solução de problemas

Use as seguintes sugestões de solução de problemas:

A seguir