Importe metadados de uma origem personalizada através do Workflows

Este documento descreve como importar metadados de uma origem de terceiros para o catálogo universal do Dataplex através da execução de um pipeline de conetividade gerido nos Workflows.

Para configurar um pipeline de conetividade gerido, cria um conetor para a sua origem de dados. Em seguida, executa o pipeline nos fluxos de trabalho. O pipeline extrai metadados da sua origem de dados e, em seguida, importa os metadados para o catálogo universal do Dataplex. Se necessário, o pipeline também cria grupos de entradas do catálogo universal do Dataplex no seu Google Cloud projeto.

Para mais informações sobre a conetividade gerida, consulte o artigo Vista geral da conetividade gerida.

Antes de começar

Antes de importar metadados, conclua as tarefas nesta secção.

Crie um conetor

Um conetor extrai os metadados da sua origem de dados e gera um ficheiro de importação de metadados que pode ser importado pelo Dataplex Universal Catalog. O conetor é uma imagem do Artifact Registry que pode ser executada no Google Cloud Serverless para Apache Spark.

Configure Google Cloud recursos

  1. Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

    Se não planeia executar o pipeline de acordo com um horário, não precisa de ativar a API Cloud Scheduler.

  2. Crie segredos no Secret Manager para armazenar as credenciais da sua origem de dados de terceiros.

  3. Configure a sua rede de nuvem virtual privada (VPC) para executar cargas de trabalho do Serverless para Apache Spark.

  4. Crie um contentor do Cloud Storage para armazenar os ficheiros de importação de metadados.

  5. Crie os seguintes recursos do Dataplex Universal Catalog:

    1. Crie tipos de aspetos personalizados para as entradas que quer importar.

    2. Crie tipos de entradas personalizados para as entradas que quer importar.

Funções necessárias

Uma conta de serviço representa a identidade de um fluxo de trabalho e determina as autorizações que o fluxo de trabalho tem e os recursos aos quais pode aceder. Google Cloud Precisa de uma conta de serviço para os fluxos de trabalho (para executar o pipeline) e para o Serverless para Apache Spark (para executar o conector).

Pode usar a conta de serviço predefinida do Compute Engine (PROJECT_NUMBER-compute@developer.gserviceaccount.com) ou criar a sua própria conta de serviço (ou contas) para executar o pipeline de conetividade gerido.

Consola

  1. Na Google Cloud consola, aceda à página IAM.

    Aceda ao IAM

  2. Selecione o projeto para o qual quer importar metadados.

  3. Clique em Conceder acesso e, de seguida, introduza o endereço de email da conta de serviço.

  4. Atribua as seguintes funções à conta de serviço:

    • Escritor de registos
    • Dataplex Entry Group Owner
    • Dataplex Metadata Job Owner
    • Editor do catálogo do Dataplex
    • Editor do Dataproc
    • Dataproc Worker
    • Secret Manager Secret Accessor: no secret que armazena as credenciais da sua origem de dados
    • Utilizador de objetos de armazenamento: no contentor do Cloud Storage
    • Leitor do Artifact Registry: no repositório do Artifact Registry que contém a imagem do conetor
    • Utilizador da conta de serviço: se usar contas de serviço diferentes, conceda esta função à conta de serviço que executa os fluxos de trabalho na conta de serviço que executa os trabalhos em lote do Serverless para Apache Spark
    • Workflows Invoker: se quiser agendar a pipeline
  5. Guarde as alterações.

gcloud

  1. Conceda funções à conta de serviço. Execute os seguintes comandos:

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/logging.logWriter
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.entryGroupOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.metadataJobOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.catalogEditor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.editor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.worker
    

    Substitua o seguinte:

    • PROJECT_ID: o nome do projeto Google Cloud de destino para o qual os metadados vão ser importados.
    • SERVICE_ACCOUNT_ID: a conta de serviço, como my-service-account@my-project.iam.gserviceaccount.com.
  2. Conceda à conta de serviço as seguintes funções ao nível do recurso:

    gcloud secrets add-iam-policy-binding SECRET_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/secretmanager.secretaccessor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/storage.objectUser \
        --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')
    gcloud artifacts repositories add-iam-policy-binding REPOSITORY \
        --location=REPOSITORY_LOCATION \
        --member=SERVICE_ACCOUNT_ID} \
        --role=roles/artifactregistry.reader
    

    Substitua o seguinte:

    • SECRET_ID: o ID do segredo que armazena as credenciais da sua origem de dados. Usa o formato projects/PROJECT_ID/secrets/SECRET_ID.
    • BUCKET_ID: o nome do contentor do Cloud Storage.
    • REPOSITORY: o repositório do Artifact Registry que contém a imagem do conetor.
    • REPOSITORY_LOCATION: a Google Cloud localização onde o repositório está alojado.
  3. Conceda à conta de serviço que executa os fluxos de trabalho a função de roles/iam.serviceAccountUser na conta de serviço que executa as tarefas em lote do Serverless para Apache Spark. Tem de conceder esta função mesmo que use a mesma conta de serviço para o Workflows e o Serverless para Apache Spark.

    gcloud iam service-accounts add-iam-policy-binding \
        serviceAccount:SERVICE_ACCOUNT_ID \
        --member='SERVICE_ACCOUNT_ID' \
        --role='roles/iam.serviceAccountUser'
    

    Se usar contas de serviço diferentes, o valor da flag --member é a conta de serviço que executa os trabalhos em lote do Serverless para Apache Spark.

  4. Se quiser agendar o pipeline, conceda à conta de serviço a seguinte função:

    gcloud projects add-iam-policy-binding PROJECT_ID \
     --member="SERVICE_ACCOUNT_ID" \
     --role=roles/workflows.invoker
    

Importe metadados

Para importar metadados, crie e, em seguida, execute um fluxo de trabalho que execute o pipeline de conetividade gerido. Opcionalmente, também pode criar um agendamento para executar o pipeline.

Consola

  1. Crie o fluxo de trabalho. Forneça as seguintes informações:

    • Conta de serviço: a conta de serviço que configurou na secção Funções necessárias deste documento.
    • Encriptação: selecione Google-managed encryption key.

    • Definir fluxo de trabalho: faculte o seguinte ficheiro de definição:

      main:
        params: [args]
        steps:
          - init:
              assign:
              - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
              - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
              - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
              - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
      
          - check_networking:
              switch:
                - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                  raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
                - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                  steps:
                   - submit_extract_job_with_default_network_uri:
                        assign:
                          - NETWORK_TYPE: "networkUri"
                          - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
                - condition: ${NETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_network_uri:
                        assign:
                          - NETWORKING: ${NETWORK_URI}
                          - NETWORK_TYPE: "networkUri"
                - condition: ${SUBNETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_subnetwork_uri:
                        assign:
                          - NETWORKING: ${SUBNETWORK_URI}
                          - NETWORK_TYPE: "subnetworkUri"
              next: check_create_target_entry_group
      
          - check_create_target_entry_group:
              switch:
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                  next: create_target_entry_group
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                  next: prepare_pyspark_job_body
      
          - create_target_entry_group:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: prepare_pyspark_job_body
      
          - prepare_pyspark_job_body:
              assign:
                - pyspark_batch_body:
                    mainPythonFileUri: file:///main.py
                    args:
                      - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                      - ${"--target_location_id=" + args.CLOUD_REGION}
                      - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                      - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                      - ${"--output_folder=" + WORKFLOW_ID}
                      - ${args.ADDITIONAL_CONNECTOR_ARGS}
              next: add_jar_file_uri_if_present
      
          - add_jar_file_uri_if_present:
              switch:
                - condition: ${args.JAR_FILE_URI != "" and args.JAR_FILE_URI != null}
                  assign:
                    - pyspark_batch_body.jarFileUris : ${args.JAR_FILE_URI}
              next: generate_extract_job_link
      
          - generate_extract_job_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                  severity: "INFO"
              next: submit_pyspark_extract_job
      
          - submit_pyspark_extract_job:
              call: http.post
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                headers:
                  Content-Type: "application/json"
                query:
                  batchId: ${WORKFLOW_ID}
                body:
                  pysparkBatch: ${pyspark_batch_body}
                  runtimeConfig:
                      containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                  environmentConfig:
                      executionConfig:
                          serviceAccount: ${args.SERVICE_ACCOUNT}
                          stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                          ${NETWORK_TYPE}: ${NETWORKING}
                          networkTags: ${NETWORK_TAGS}
              result: RESPONSE_MESSAGE
              next: check_pyspark_extract_job
      
          - check_pyspark_extract_job:
              call: http.get
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: PYSPARK_EXTRACT_JOB_STATUS
              next: check_pyspark_extract_job_done
      
          - check_pyspark_extract_job_done:
              switch:
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                  next: generate_import_logs_link
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              next: pyspark_extract_job_wait
      
          - pyspark_extract_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_pyspark_extract_job
      
          - generate_import_logs_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                  severity: "INFO"
              next: submit_import_job
      
          - submit_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                body:
                  type: IMPORT
                  import_spec:
                    source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                    entry_sync_mode: FULL
                    aspect_sync_mode: INCREMENTAL
                    log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                    scope:
                      entry_groups: 
                        - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                      entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                      aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
              result: IMPORT_JOB_RESPONSE
              next: get_job_start_time
      
          - get_job_start_time:
              assign:
                - importJobStartTime: ${sys.now()}
              next: import_job_startup_wait
      
          - import_job_startup_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: initial_get_import_job
      
          - initial_get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_status_available
      
          - check_import_job_status_available:
              switch:
                - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                  next: kill_import_job
                - condition: ${"status" in IMPORT_JOB_STATUS.body}
                  next: check_import_job_done
              next: import_job_status_wait
      
          - import_job_status_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_import_job_status_available
      
          - check_import_job_done:
              switch:
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                  next: the_end
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                  next: kill_import_job
              next: import_job_wait
      
          - get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_done
      
          - import_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: get_import_job
      
          - kill_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: get_killed_import_job
      
          - get_killed_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: KILLED_IMPORT_JOB_STATUS
              next: killed
      
          - killed:
              raise: ${KILLED_IMPORT_JOB_STATUS}
      
          - the_end:
              return: ${IMPORT_JOB_STATUS}
      
  2. Para executar o pipeline a pedido, execute o fluxo de trabalho.

    Forneça os seguintes argumentos de tempo de execução:

    {
        "TARGET_PROJECT_ID": "PROJECT_ID",
        "CLOUD_REGION": "LOCATION_ID",
        "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
        "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
        "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
        "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
        "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
        "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
        "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
        "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
        "IMPORT_JOB_LOG_LEVEL": "INFO",
        "JAR_FILE_URI": "",
        "NETWORK_TAGS": [],
        "NETWORK_URI": "",
        "SUBNETWORK_URI": ""
     }
    

    Substitua o seguinte:

    • PROJECT_ID: o nome do projeto Google Cloud de destino para o qual os metadados vão ser importados.
    • LOCATION_ID: a localização Google Cloud de destino onde os trabalhos de importação de metadados e do Serverless para Apache Spark vão ser executados e os metadados vão ser importados.
    • ENTRY_GROUP_ID: o ID do grupo de entradas para o qual importar os metadados. O ID do grupo de entradas pode conter letras minúsculas, números e hífens.

      O nome do recurso completo deste grupo de entradas é projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: se quiser que o pipeline crie o grupo de entradas se ainda não existir no seu projeto, defina este valor como true.
    • BUCKET_ID: o nome do contentor do Cloud Storage para armazenar o ficheiro de importação de metadados gerado pelo conector. Cada execução do fluxo de trabalho cria uma nova pasta.
    • SERVICE_ACCOUNT_ID: a conta de serviço que configurou na secção Funções necessárias deste documento. A conta de serviço executa o conector no Serverless para Apache Spark.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: uma lista de argumentos adicionais a transmitir ao conector. Para ver exemplos, consulte o artigo Desenvolva um conector personalizado para a importação de metadados. Coloque cada argumento entre aspas duplas e separe os argumentos com vírgulas.
    • CONTAINER_IMAGE: a imagem do contentor personalizada do conetor alojado no Artifact Registry.
    • ENTRY_TYPES: uma lista de tipos de entradas que estão no âmbito da importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. O LOCATION_ID tem de ser a mesma localização Google Cloud para a qual importa metadados ou global.
    • ASPECT_TYPES: uma lista de tipos de aspetos no âmbito da importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. O LOCATION_ID tem de ser a mesma localização Google Cloud para a qual importa metadados ou global.
    • Opcional: para o argumento NETWORK_TAGS, forneça uma lista de etiquetas de rede.
    • Opcional: para o argumento NETWORK_URI, indique o URI da rede VPC que se liga à origem de dados. Se fornecer uma rede, omita o argumento subnetwork.
    • Opcional: para o argumento SUBNETWORK_URI, indique o URI da sub-rede que se liga à origem de dados. Se fornecer uma sub-rede, omita o argumento de rede.

    Consoante a quantidade de metadados que importar, o pipeline pode demorar vários minutos ou mais a ser executado. Para mais informações sobre como ver o progresso, consulte o artigo Aceda aos resultados da execução do fluxo de trabalho.

    Após a conclusão da execução do pipeline, pode pesquisar os metadados importados no catálogo universal do Dataplex.

  3. Opcional: se quiser executar o pipeline de acordo com um agendamento, crie um agendamento através do Cloud Scheduler. Forneça as seguintes informações:

    • Frequência: uma expressão unix-cron que define o agendamento para executar o pipeline.
    • Argumento do fluxo de trabalho: os argumentos de tempo de execução do conector, conforme descrito no passo anterior.
    • Conta de serviço: a conta de serviço. A conta de serviço gere o programador.

gcloud

  1. Guarde a seguinte definição de carga de trabalho como um ficheiro YAML:

    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                steps:
                 - submit_extract_job_with_default_network_uri:
                      assign:
                        - NETWORK_TYPE: "networkUri"
                        - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
              - condition: ${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: ${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: ${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: ${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: prepare_pyspark_job_body
    
        - create_target_entry_group:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: prepare_pyspark_job_body
    
        - prepare_pyspark_job_body:
            assign:
              - pyspark_batch_body:
                  mainPythonFileUri: file:///main.py
                  args:
                    - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - ${"--target_location_id=" + args.CLOUD_REGION}
                    - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--output_folder=" + WORKFLOW_ID}
                    - ${args.ADDITIONAL_CONNECTOR_ARGS}
            next: add_jar_file_uri_if_present
    
        - add_jar_file_uri_if_present:
            switch:
              - condition: ${args.JAR_FILE_URI != "" and args.JAR_FILE_URI != null}
                assign:
                  - pyspark_batch_body.jarFileUris : ${args.JAR_FILE_URI}
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch: ${pyspark_batch_body}
                runtimeConfig:
                    containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: ${args.SERVICE_ACCOUNT}
                        stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                        ${NETWORK_TYPE}: ${NETWORKING}
                        networkTags: ${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: ${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: ${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: ${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: ${IMPORT_JOB_STATUS}
    
  2. Defina variáveis Bash, crie o fluxo de trabalho e, opcionalmente, crie um agendamento para executar o pipeline:

    # Define Bash variables (replace with your actual values)
    project_id="PROJECT_ID"
    region="LOCATION_ID"
    service_account="SERVICE_ACCOUNT_ID"
    workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    # Create Workflows resource
    gcloud workflows deploy ${workflow_name} \
      --project=${project_id} \
      --location=${region} \
      --source=${workflow_source} \
      --service-account=${service_account}
    
    # Create Cloud Scheduler job
    gcloud scheduler jobs create http ${workflow_name}-scheduler \
      --project=${project_id} \
      --location=${region} \
      --schedule="CRON_SCHEDULE_EXPRESSION" \
      --time-zone="UTC" \
      --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \
      --http-method="POST" \
      --oauth-service-account-email=${service_account} \
      --headers="Content-Type=application/json" \
      --message-body='{"argument": ${workflow_args}}'
    

    Substitua o seguinte:

    • PROJECT_ID: o nome do projeto Google Cloud de destino para o qual os metadados vão ser importados.
    • LOCATION_ID: a localização Google Cloud de destino onde os trabalhos de importação de metadados e do Serverless para Apache Spark vão ser executados e os metadados vão ser importados.
    • SERVICE_ACCOUNT_ID: a conta de serviço que configurou na secção Funções necessárias deste documento.
    • WORKFLOW_DEFINITION_FILE: o caminho para o ficheiro YAML de definição do fluxo de trabalho.
    • WORKFLOW_NAME: o nome do fluxo de trabalho.
    • WORKFLOW_ARGUMENTS: os argumentos de tempo de execução a transmitir ao conector. Os argumentos estão no formato JSON:

      {
          "TARGET_PROJECT_ID": "PROJECT_ID",
          "CLOUD_REGION": "LOCATION_ID",
          "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
          "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
          "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
          "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
          "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
          "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
          "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
          "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
          "IMPORT_JOB_LOG_LEVEL": "INFO",
          "JAR_FILE_URI": "",
          "NETWORK_TAGS": [],
          "NETWORK_URI": "",
          "SUBNETWORK_URI": ""
       }
      

      Para o Cloud Scheduler, as aspas duplas na string entre aspas têm carateres de escape com barras invertidas (\). Por exemplo: --message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}".

      Substitua o seguinte:

      • ENTRY_GROUP_ID: o ID do grupo de entradas para o qual importar os metadados. O ID do grupo de entradas pode conter letras minúsculas, números e hífens.

        O nome do recurso completo deste grupo de entradas é projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

      • CREATE_ENTRY_GROUP_BOOLEAN: se quiser que o pipeline crie o grupo de entradas se ainda não existir no seu projeto, defina este valor como true.
      • BUCKET_ID: o nome do contentor do Cloud Storage para armazenar o ficheiro de importação de metadados gerado pelo conector. Cada execução do fluxo de trabalho cria uma nova pasta.
      • ADDITIONAL_CONNECTOR_ARGUMENTS: uma lista de argumentos adicionais a transmitir ao conector. Para ver exemplos, consulte o artigo Desenvolva um conector personalizado para a importação de metadados.
      • CONTAINER_IMAGE: a imagem do contentor personalizada do conetor alojado no Artifact Registry.
      • ENTRY_TYPES: uma lista de tipos de entradas que estão no âmbito da importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. O LOCATION_ID tem de ser a mesma localização Google Cloud para a qual importa metadados ou global.
      • ASPECT_TYPES: uma lista de tipos de aspetos no âmbito da importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. O LOCATION_ID tem de ser a mesma localização Google Cloud para a qual importa metadados ou global.
      • Opcional: para o argumento NETWORK_TAGS, forneça uma lista de etiquetas de rede.
      • Opcional: para o argumento NETWORK_URI, indique o URI da rede VPC que se liga à origem de dados. Se fornecer uma rede, omita o argumento subnetwork.
      • Opcional: para o argumento SUBNETWORK_URI, indique o URI da sub-rede que se liga à origem de dados. Se fornecer uma sub-rede, omita o argumento de rede.
    • CRON_SCHEDULE_EXPRESSION: uma expressão cron que define o horário de execução do pipeline. Por exemplo, para executar a programação à meia-noite todos os dias, use a expressão 0 0 * * *.

  3. Para executar o pipeline a pedido, execute o fluxo de trabalho:

    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    gcloud workflows run "${workflow_name}" --project=${project_id} --location=${location} --data '${workflow_args}'
    

    Os argumentos do fluxo de trabalho estão no formato JSON, mas não têm carateres de escape.

    Consoante a quantidade de metadados que importar, o fluxo de trabalho pode demorar vários minutos ou mais a ser executado. Para mais informações sobre como ver o progresso, consulte o artigo Aceda aos resultados da execução do fluxo de trabalho.

    Após a conclusão da execução do pipeline, pode pesquisar os metadados importados no catálogo universal do Dataplex.

Terraform

  1. Clone o repositório cloud-dataplex.

    O repositório inclui os seguintes ficheiros do Terraform:

  2. Edite o ficheiro .tfvars para substituir os marcadores de posição pelas informações do seu conetor.

    project_id                      = "PROJECT_ID"
    region                          = "LOCATION_ID"
    service_account                 = "SERVICE_ACCOUNT_ID"
    cron_schedule                   = "CRON_SCHEDULE_EXPRESSION"
    workflow_args                   = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID", "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [], "NETWORK_URI": "", "SUBNETWORK_URI": ""}
    
    
    workflow_source                 = <<EOF
    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: $${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: $${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: $${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: $${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: $${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: $${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - $${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - $${"--target_location_id=" + args.CLOUD_REGION}
                    - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - $${"--output_folder=" + WORKFLOW_ID}
                    - $${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: $${args.SERVICE_ACCOUNT}
                        stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
                        $${NETWORK_TYPE}: $${NETWORKING}
                        networkTags: $${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: $${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: $${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: $${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: $${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: $${IMPORT_JOB_STATUS}
    EOF
    

    Substitua o seguinte:

    • PROJECT_ID: o nome do projeto Google Cloud de destino para o qual os metadados vão ser importados.
    • LOCATION_ID: a localização Google Cloud de destino onde os trabalhos de importação de metadados e do Serverless para Apache Spark vão ser executados e os metadados vão ser importados.
    • SERVICE_ACCOUNT_ID: a conta de serviço que configurou na secção Funções necessárias deste documento.
    • CRON_SCHEDULE_EXPRESSION: uma expressão cron que define o horário de execução do pipeline. Por exemplo, para executar a programação à meia-noite todos os dias, use a expressão 0 0 * * *.
    • ENTRY_GROUP_ID: o ID do grupo de entradas para o qual importar os metadados. O ID do grupo de entradas pode conter letras minúsculas, números e hífens.

      O nome do recurso completo deste grupo de entradas é projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: se quiser que o pipeline crie o grupo de entradas se ainda não existir no seu projeto, defina este valor como true.
    • BUCKET_ID: o nome do contentor do Cloud Storage para armazenar o ficheiro de importação de metadados gerado pelo conector. Cada execução do fluxo de trabalho cria uma nova pasta.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: uma lista de argumentos adicionais a transmitir ao conector. Para ver exemplos, consulte o artigo Desenvolva um conector personalizado para a importação de metadados. Coloque cada argumento entre aspas duplas e separe os argumentos com vírgulas.
    • CONTAINER_IMAGE: a imagem do contentor personalizada do conetor alojado no Artifact Registry.
    • ENTRY_TYPES: uma lista de tipos de entradas que estão no âmbito da importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. O LOCATION_ID tem de ser a mesma localização Google Cloud para a qual importa metadados ou global.
    • ASPECT_TYPES: uma lista de tipos de aspetos no âmbito da importação, no formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. O LOCATION_ID tem de ser a mesma localização Google Cloud para a qual importa metadados ou global.
    • Opcional: para o argumento NETWORK_TAGS, forneça uma lista de etiquetas de rede.
    • Opcional: para o argumento NETWORK_URI, indique o URI da rede VPC que se liga à origem de dados. Se fornecer uma rede, omita o argumento subnetwork.
    • Opcional: para o argumento SUBNETWORK_URI, indique o URI da sub-rede que se liga à origem de dados. Se fornecer uma sub-rede, omita o argumento de rede.
  3. Inicialize o Terraform:

    terraform init
    
  4. Valide o Terraform com o seu ficheiro .tfvars:

    terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Substitua CONNECTOR_VARIABLES_FILE pelo nome do ficheiro de definições de variáveis.

  5. Implemente o Terraform com o seu ficheiro .tfvars:

    terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    O Terraform cria um fluxo de trabalho e uma tarefa do Cloud Scheduler no projeto especificado. Os fluxos de trabalho executam o pipeline na programação especificada.

    Consoante a quantidade de metadados que importar, o fluxo de trabalho pode demorar vários minutos ou mais a ser executado. Para mais informações sobre como ver o progresso, consulte o artigo Aceda aos resultados da execução do fluxo de trabalho.

    Após a conclusão da execução do pipeline, pode pesquisar os metadados importados no catálogo universal do Dataplex.

Veja registos de tarefas

Use o Cloud Logging para ver os registos de um pipeline de conetividade gerido. A carga útil do registo inclui um link para os registos da tarefa em lote do Serverless for Apache Spark e da tarefa de importação de metadados, conforme relevante. Para mais informações, consulte o artigo Veja os registos do fluxo de trabalho.

Resolução de problemas

Use as seguintes sugestões de resolução de problemas:

O que se segue?