Importa metadati da un'origine personalizzata utilizzando i flussi di lavoro

Questo documento descrive come importare i metadati da un'origine di terze parti in Dataplex eseguendo una pipeline di connettività gestita in Workflows.

Per configurare una pipeline di connettività gestita, devi creare un connettore per origine dati. Poi esegui la pipeline in Workflows. La la pipeline estrae i metadati dall'origine dati e poi li importa in Dataplex. Se necessario, la pipeline crea anche Gruppi di voci Dataplex Catalog nel tuo progetto Google Cloud.

Per ulteriori informazioni sulla connettività gestita, vedi Panoramica della connettività gestita.

Prima di iniziare

Prima di importare i metadati, completa le attività in questa sezione.

Crea un connettore

Un connettore estrae i metadati dall'origine dati e genera un file di importazione dei metadati che può essere importato da Dataplex. Il connettore è un'immagine Artifact Registry che può essere eseguita su Dataproc Serverless.

Configura le risorse Google Cloud

  1. Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.

    Enable the APIs

    Se non prevedi di eseguire la pipeline in base a una pianificazione, non è necessario abilitare l'API Cloud Scheduler.

  2. Crea secret in Secret Manager per archiviare le credenziali per l'origine dati di terze parti.

  3. Configura la tua rete Virtual Private Cloud (VPC) per eseguire Dataproc Serverless per i carichi di lavoro Spark.

  4. Crea un bucket Cloud Storage per memorizzare i file di importazione dei metadati.

  5. Crea le seguenti risorse Dataplex Catalog:

    1. Crea tipi di aspetti personalizzati per le voci da importare.

    2. Crea tipi di voci personalizzate per le voci da importare.

Ruoli obbligatori

Un account di servizio rappresenta l'identità di un flusso di lavoro e determina le autorizzazioni di cui dispone e le risorse Google Cloud a cui può accedere. È necessario un account di servizio per Workflows (per eseguire pipeline) e per Dataproc serverless (per eseguire il connettore).

Puoi utilizzare l'account di servizio predefinito di Compute Engine (PROJECT_NUMBER-compute@developer.gserviceaccount.com) o creare il tuo account di servizio (o i tuoi account) per eseguire la pipeline di connettività gestita.

Console

  1. Nella console Google Cloud, vai alla pagina IAM.

    Vai a IAM

  2. Seleziona il progetto in cui vuoi importare i metadati.

  3. Fai clic su Concedi accesso, poi inserisci l'indirizzo email dell'account di servizio.

  4. Assegna i ruoli seguenti all'account di servizio:

    • Writer log
    • Dataplex Entry Group Owner
    • Dataplex Metadata Job Owner
    • Editor Dataplex Catalog
    • Editor Dataproc
    • Worker Dataproc
    • Funzione di accesso ai secret di Secret Manager: sul secret in cui sono archiviati le credenziali dell'origine dati
    • Storage Object User (Utente oggetto archiviazione) nel bucket Cloud Storage
    • Artifact Registry Reader: nel repository Artifact Registry che contiene l'immagine del connettore
    • Utente account di servizio: se utilizzi account di servizio diversi, conceda questo ruolo all'account di servizio che esegue i flussi di lavoro nell'account di servizio che esegue i job batch Dataproc Serverless
    • Invoker di Workflows: se vuoi pianificare la pipeline
  5. Salva le modifiche.

gcloud

  1. Concedi i ruoli all'account di servizio. Esegui questi comandi:

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT" \
        --role=roles/logging.logWriter
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT" \
        --role=roles/dataplex.entryGroupOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT" \
        --role=roles/dataplex.metadataJobOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT" \
        --role=roles/dataplex.catalogEditor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT" \
        --role=roles/dataproc.editor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT" \
        --role=roles/dataproc.worker
    

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto Google Cloud di destinazione in cui importare i metadati.
    • SERVICE_ACCOUNT: l'account di servizio, ad esempio my-service-account@my-project.iam.gserviceaccount.com.
  2. Concedi all'account di servizio i seguenti ruoli a livello di risorsa:

    gcloud secrets add-iam-policy-binding SECRET_ID \
        --member="serviceAccount:SERVICE_ACCOUNT" \
        --role=roles/secretmanager.secretaccessor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT" \
        --role=roles/storage.objectUser \
        --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')
    gcloud artifacts repositories add-iam-policy-binding REPOSITORY \
        --location=REPOSITORY_LOCATION \
        --member=SERVICE_ACCOUNT} \
        --role=roles/artifactregistry.reader
    

    Sostituisci quanto segue:

    • SECRET_ID: l'ID del segreto che memorizza le credenziali per l'origine dati. Utilizza il formato projects/PROJECT_ID/secrets/SECRET_ID.
    • BUCKET_ID: il nome del bucket Cloud Storage.
    • REPOSITORY: il repository Artifact Registry che contiene l'immagine del connettore.
    • REPOSITORY_LOCATION: la località Google Cloud in cui è ospitato il repository.
  3. Concedi all'account di servizio che esegue i flussi di lavoro il ruolo roles/iam.serviceAccountUser nell'account di servizio che esegue i job batch Dataproc Serverless. Devi concedere questo ruolo anche se utilizzi lo stesso account di servizio sia per Workflows sia per Dataproc Serverless.

    gcloud iam service-accounts add-iam-policy-binding \
        serviceAccount:SERVICE_ACCOUNT \
        --member='SERVICE_ACCOUNT' \
        --role='roles/iam.serviceAccountUser'
    

    Se utilizzi account di servizio diversi, il valore del flag --member è l'account di servizio che esegue i job batch Dataproc Serverless.

  4. Se vuoi pianificare la pipeline, concedi all'account di servizio il seguente ruolo:

    gcloud projects add-iam-policy-binding PROJECT_ID \
     --member="SERVICE_ACCOUNT" \
     --role=roles/workflows.invoker
    

Importa metadati

Per importare i metadati, crea ed esegui un flusso di lavoro che esegua la pipeline di connettività gestita. Facoltativamente, puoi anche creare una pianificazione per l'esecuzione della pipeline.

Console

  1. Crea il flusso di lavoro. Fornisci le seguenti informazioni:

    • Account di servizio: l'account di servizio che hai configurato nella Sezione Ruoli obbligatori di questo documento.
    • Crittografia: seleziona Chiave di crittografia gestita da Google.

    • Definisci flusso di lavoro: fornisci il seguente file di definizione:

      main:
        params: [args]
        steps:
          - init:
              assign:
              - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
              - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
              - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
              - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
              - NETWORK_TYPE: "networkUri"
      
          - check_networking:
              switch:
                - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                  raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
                - condition: ${NETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_network_uri:
                        assign:
                          - NETWORKING: ${NETWORK_URI}
                          - NETWORK_TYPE: "networkUri"
                - condition: ${SUBNETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_subnetwork_uri:
                        assign:
                          - NETWORKING: ${SUBNETWORK_URI}
                          - NETWORK_TYPE: "subnetworkUri"
              next: check_create_target_entry_group
      
          - check_create_target_entry_group:
              switch:
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                  next: create_target_entry_group
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                  next: generate_extract_job_link
      
          - create_target_entry_group:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: generate_extract_job_link
      
          - generate_extract_job_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                  severity: "INFO"
              next: submit_pyspark_extract_job
      
          - submit_pyspark_extract_job:
              call: http.post
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                headers:
                  Content-Type: "application/json"
                query:
                  batchId: ${WORKFLOW_ID}
                body:
                  pysparkBatch:
                    mainPythonFileUri: file:///main.py
                    args:
                      - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                      - ${"--target_location_id=" + args.CLOUD_REGION}
                      - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                      - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                      - ${"--output_folder=" + WORKFLOW_ID}
                      - ${args.ADDITIONAL_CONNECTOR_ARGS}
                  runtimeConfig:
                      containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                  environmentConfig:
                      executionConfig:
                          serviceAccount: ${args.SERVICE_ACCOUNT}
                          stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                          ${NETWORK_TYPE}: ${NETWORKING}
                          networkTags: ${NETWORK_TAGS}
              result: RESPONSE_MESSAGE
              next: check_pyspark_extract_job
      
          - check_pyspark_extract_job:
              call: http.get
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: PYSPARK_EXTRACT_JOB_STATUS
              next: check_pyspark_extract_job_done
      
          - check_pyspark_extract_job_done:
              switch:
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                  next: generate_import_logs_link
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              next: pyspark_extract_job_wait
      
          - pyspark_extract_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_pyspark_extract_job
      
          - generate_import_logs_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                  severity: "INFO"
              next: submit_import_job
      
          - submit_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                body:
                  type: IMPORT
                  import_spec:
                    source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                    entry_sync_mode: FULL
                    aspect_sync_mode: INCREMENTAL
                    log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                    scope:
                      entry_groups:
                        - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                      entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                      aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
              result: IMPORT_JOB_RESPONSE
              next: get_job_start_time
      
          - get_job_start_time:
              assign:
                - importJobStartTime: ${sys.now()}
              next: import_job_startup_wait
      
          - import_job_startup_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: initial_get_import_job
      
          - initial_get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_status_available
      
          - check_import_job_status_available:
              switch:
                - condition: ${"status" in IMPORT_JOB_STATUS.body}
                  next: check_import_job_done
                - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                  next: kill_import_job
              next: import_job_status_wait
      
          - import_job_status_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_import_job_status_available
      
          - check_import_job_done:
              switch:
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                  next: the_end
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                  next: kill_import_job
              next: import_job_wait
      
          - get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_done
      
          - import_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: get_import_job
      
          - kill_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: get_killed_import_job
      
          - get_killed_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: KILLED_IMPORT_JOB_STATUS
              next: killed
      
          - killed:
              raise: ${KILLED_IMPORT_JOB_STATUS}
      
          - the_end:
              return: ${IMPORT_JOB_STATUS}
  2. Per eseguire la pipeline on demand, il flusso di lavoro.

    Fornisci i seguenti argomenti di runtime:

    {
        "TARGET_PROJECT_ID": "PROJECT_ID",
        "CLOUD_REGION": "LOCATION_ID",
        "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
        "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
        "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
        "SERVICE_ACCOUNT": "SERVICE_ACCOUNT",
        "ADDITIONAL_CONNECTOR_ARGS": [
            ADDITIONAL_CONNECTOR_ARGUMENTS
        ],
        "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
        "SPARK_DRIVER_TYPE": "PYSPARK",
        "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
        "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
        "IMPORT_JOB_LOG_LEVEL": "INFO",
        "NETWORK_TAGS": [NETWORK_TAGS],
        "NETWORK_URI": "NETWORK_URI",
        "SUBNETWORK_URI": "SUBNETWORK_URI"
    }
    

    Sostituisci quanto segue:

    • PROJECT_ID: il nome del target Progetto Google Cloud in cui importare i metadati.
    • LOCATION_ID: il target Google Cloud in cui Dataproc serverless e dei metadati verranno eseguiti job di importazione e verranno importati i metadati.
    • ENTRY_GROUP_ID: l'ID del gruppo di voci in cui eseguire l'importazione dei metadati. L'ID gruppo di voci può contenere lettere minuscole lettere, numeri e trattini.

      Il nome completo della risorsa di questo gruppo di voci è projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: se vuoi che la pipeline crei il gruppo di voci se non esiste già nel tuo progetto, imposta questo valore su true.

    • BUCKET_ID: il nome dell'istanza di Cloud Storage per archiviare il file di importazione dei metadati generato di rete. Ogni esecuzione del flusso di lavoro crea una nuova cartella.

    • SERVICE_ACCOUNT: l'account di servizio. L'account servizio esegue il connettore in Dataproc Serverless.

    • ADDITIONAL_CONNECTOR_ARGUMENTS: un elenco di argomenti aggiuntivi da passare al connettore. Per alcuni esempi, vedi Sviluppa un connettore personalizzato per l'importazione dei metadati. Racchiudi ogni argomento tra virgolette doppie e separa gli argomenti con virgole.

    • CONTAINER_IMAGE: l'immagine container personalizzata del connettore ospitato in Artifact Registry.

    • ENTRY_TYPES: un elenco dei tipi di voce che sono nell'ambito dell'importazione, nel formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. LOCATION_ID deve essere la stessa posizione Google Cloud in cui importi i metadati o global.

    • ASPECT_TYPES: un elenco di tipi di aspetto che sono nell'ambito dell'importazione, nel formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. LOCATION_ID deve essere lo stesso Località Google Cloud in cui importi i metadati, ovvero global.

    • NETWORK_TAGS (facoltativo): un elenco di tag di rete.

    • NETWORK_URI (facoltativo): l'URI della rete VPC che si connette all'origine dati. Se fornisci una rete, ometti l'argomento subnet.

    • SUBNETWORK_URI (facoltativo): l'URI della sottorete che si connette all'origine dati. Se fornisci una sottorete, ometti l'argomento network.

    A seconda della quantità di metadati importati, l'esecuzione della pipeline potrebbe richiedere diversi minuti o più. Per ulteriori informazioni come vedere l'avanzamento, vedere Accedi ai risultati di esecuzione del flusso di lavoro.

    Al termine dell'esecuzione della pipeline, puoi cercare i metadati importati in Dataplex Catalog.

  3. (Facoltativo) Se vuoi eseguire la pipeline in base a una pianificazione, crea una pianificazione utilizzando Cloud Scheduler. Fornisci le seguenti informazioni:

    • Frequenza: un'espressione unix-cron che definisce la pianificazione per per eseguire la pipeline.
    • Argomento flusso di lavoro: gli argomenti di runtime per il connettore, come descritto nel passaggio precedente.
    • Account di servizio: l'account di servizio. Account di servizio gestisce lo scheduler.

gcloud

  1. Salva la seguente definizione del carico di lavoro come file YAML:

    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
            - NETWORK_TYPE: "networkUri"
    
        - check_networking:
            switch:
              - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: ${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: ${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: ${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: ${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - ${"--target_location_id=" + args.CLOUD_REGION}
                    - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--output_folder=" + WORKFLOW_ID}
                    - ${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: ${args.SERVICE_ACCOUNT}
                        stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                        ${NETWORK_TYPE}: ${NETWORKING}
                        networkTags: ${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups:
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: ${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: ${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
              - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: ${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: ${IMPORT_JOB_STATUS}
  2. Definisci le seguenti variabili Bash:

    workflow_name="WORKFLOW_NAME"
    project_id="PROJECT_ID"
    location="LOCATION_ID"
    service_account="SERVICE_ACCOUNT"
    workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
    

    Sostituisci quanto segue:

    • WORKFLOW_NAME: il nome del flusso di lavoro.
    • PROJECT_ID: il nome del target Progetto Google Cloud in cui importare i metadati.
    • LOCATION_ID: la posizione Google Cloud di destinazione in cui verranno eseguiti i job di importazione dei metadati e di Dataproc Serverless e in cui verranno importati i metadati.
    • SERVICE_ACCOUNT: l'account di servizio configurato nella sezione Ruoli richiesti di questo documento.
    • WORKFLOW_DEFINITION_FILE: il flusso di lavoro del file YAML di definizione.
  3. Crea il flusso di lavoro:

    gcloud workflows deploy ${workflow_name} \
        --project=${project_id} \
        --location=${location} \
        --source=${workflow_source} \
        --service-account=${service_account}
    
  4. Per eseguire la pipeline on demand, esegui il flusso di lavoro:

    gcloud workflows run ${workflow_name} --project=${project_id} --location=${location} --data "$(cat << EOF
    {
    "TARGET_PROJECT_ID": "PROJECT_ID",
    "CLOUD_REGION": "LOCATION_ID",
    "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
    "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
    "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
    "SERVICE_ACCOUNT": "SERVICE_ACCOUNT",
    "ADDITIONAL_CONNECTOR_ARGS": [
        ADDITIONAL_CONNECTOR_ARGUMENTS
    ],
    "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
    "SPARK_DRIVER_TYPE": "PYSPARK",
    "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
    "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
    "IMPORT_JOB_LOG_LEVEL": "INFO",
    "NETWORK_TAGS": [NETWORK_TAGS],
    "NETWORK_URI": "NETWORK_URI",
    "SUBNETWORK_URI": "SUBNETWORK_URI"
    }
    EOF
    )"
    

    Sostituisci quanto segue:

    • ENTRY_GROUP_ID: l'ID del gruppo di voci in cui eseguire l'importazione dei metadati. L'ID gruppo di voci può contenere lettere minuscole lettere, numeri e trattini.

      Il nome completo della risorsa di questo gruppo di voci è projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: se vuoi che la pipeline crei il gruppo di voci se non esiste già nel tuo progetto, imposta questo valore su true.

    • BUCKET_ID: il nome dell'istanza di Cloud Storage per archiviare il file di importazione dei metadati generato di rete. Ogni esecuzione del flusso di lavoro crea una nuova cartella.

    • ADDITIONAL_CONNECTOR_ARGUMENTS: un l'elenco di argomenti aggiuntivi da passare al connettore. Per alcuni esempi, vedi Sviluppa un connettore personalizzato per l'importazione dei metadati. Racchiudi ogni argomento tra virgolette doppie e separa gli argomenti con virgole.

    • CONTAINER_IMAGE: l'immagine container personalizzata del connettore ospitato in Artifact Registry.

    • ENTRY_TYPES: un elenco dei tipi di voce che sono nell'ambito dell'importazione, nel formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. LOCATION_ID deve essere la stessa posizione Google Cloud in cui importi i metadati o global.

    • ASPECT_TYPES: un elenco di tipi di aspetto che sono nell'ambito dell'importazione, nel formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. LOCATION_ID deve essere lo stesso Località Google Cloud in cui importi i metadati, ovvero global.

    • NETWORK_TAGS (facoltativo): un elenco di tag di rete.

    • NETWORK_URI (facoltativo): l'URI della rete VPC che si connette all'origine dati. Se fornisci una rete, ometti l'argomento subnet.

    • SUBNETWORK_URI (facoltativo): l'URI della sottorete che si connette all'origine dati. Se fornisci un parametro subnet, ometti l'argomento di rete.

    A seconda della quantità di metadati importati, l'esecuzione del flusso di lavoro potrebbe richiedere diversi minuti o più. Per ulteriori informazioni su come visualizzare l'avanzamento, consulta Accedere ai risultati di esecuzione del flusso di lavoro.

    Al termine dell'esecuzione della pipeline, puoi cercare i metadati importati in Dataplex Catalog.

  5. (Facoltativo) Se vuoi eseguire la pipeline in base a una pianificazione, crea una pianificazione utilizzando Cloud Scheduler:

    gcloud scheduler jobs create http ${workflow_name}-scheduler \
        --project=${project_id} \
        --location=${region} \
        --schedule="SCHEDULE" \
        --time-zone="TIME_ZONE" \
        --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \
        --http-method="POST" \
        --oauth-service-account-email=${service_account} \
        --headers="Content-Type=application/json" \
        --message-body="{\"argument\": \"DOUBLE_ESCAPED_JSON_STRING\"}" \
    

    Sostituisci quanto segue:

    • SCHEDULE: un'espressione cron che definisce per eseguire la pipeline.
    • TIME_ZONE: il fuso orario, ad esempio UTC.
    • DOUBLE_ESCAPED_JSON_STRING: una codifica JSON degli argomenti del flusso di lavoro. Le virgolette doppie all'interno della stringa tra virgolette vengono indicate con le barre inverse (\). Ad esempio: --message-body="{\"argument\": \"{\\\"foo\\\": \\\"bar\\\"}\"}"

Terraform

  1. Salva i seguenti file:

    • Salva con nome main.tf:

      module "cloud_workflows" {
          source  = "GoogleCloudPlatform/cloud-workflows/google"
          version = "0.1.1"
          workflow_name             = var.workflow_name
          project_id                = var.project_id
          region                    = var.region
          service_account_email     = var.service_account
          workflow_trigger = {
              cloud_scheduler = {
              name                  = "${var.workflow_name}-scheduler"
              cron                  = "0 0 * * *"
              time_zone             = "UTC"
              service_account_email = var.service_account
              deadline              = "1800s"
              argument              = jsonencode(var.workflow_args)
              }
          }
          workflow_source = var.workflow_source
          }
      
    • Salva come variables.tf:

      variable "project_id" {
          default = ""
      }
      
      variable "region" {
          default = ""
      }
      
      variable "service_account" {
          default = ""
      }
      
      variable "workflow_name" {
          default = "managed-orchestration-for-dataplex"
      }
      
      variable "description" {
          default = "Submits a Dataproc Serverless Job and then runs a Dataplex Import Job. Times out after 12 hours."
      }
      
      variable "workflow_args" {
          default = {}
      }
      
      variable "cron_schedule" {
          default = "0 0 * * *"
      }
      
      variable "workflow_source" {
          default = ""
      }
      
  2. Salva il seguente file di definizioni delle variabili come file .tfvars. Sostituisci i segnaposto con le informazioni del connettore.

    project_id                      = "PROJECT_ID"
    region                          = "LOCATION_ID"
    service_account                 = "SERVICE_ACCOUNT"
    cron_schedule                   = "SCHEDULE"
    workflow_args                   = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT", "ADDITIONAL_CONNECTOR_ARGS": [ ADDITIONAL_CONNECTOR_ARGUMENTS ], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "SPARK_DRIVER_TYPE": "PYSPARK", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "NETWORK_TAGS": [NETWORK_TAGS], "NETWORK_URI": "NETWORK_URI", "SUBNETWORK_URI": "SUBNETWORK_URI" }
    
    workflow_source                 = <<EOF
    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
            - NETWORK_TYPE: "networkUri"
    
        - check_networking:
            switch:
              - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: $${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: $${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: $${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: $${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: $${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - $${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - $${"--target_location_id=" + args.CLOUD_REGION}
                    - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - $${"--output_folder=" + WORKFLOW_ID}
                    - $${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: $${args.SERVICE_ACCOUNT}
                        stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
                        $${NETWORK_TYPE}: $${NETWORKING}
                        networkTags: $${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups:
                      - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: $${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: $${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
              - condition: $${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: $${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: $${IMPORT_JOB_STATUS}
    EOF

    Sostituisci quanto segue:

    • PROJECT_ID: il nome del progetto Google Cloud di destinazione in cui eseguire l'importazione dei metadati.
    • LOCATION_ID: il target Google Cloud in cui Dataproc serverless e dei metadati verranno eseguiti job di importazione e verranno importati i metadati.
    • SERVICE_ACCOUNT: l'account di servizio che hai configurato nella Sezione Ruoli obbligatori di questo documento.
    • SCHEDULE: un'espressione cron che definisce la pianificazione da eseguire della pipeline.
    • ENTRY_GROUP_ID: l'ID del gruppo di voci in cui importare i metadati. L'ID gruppo di voci può contenere lettere minuscole, numeri e trattini.

      Il nome completo della risorsa di questo gruppo di voci è projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: se vuoi che la pipeline crei gruppo di voci, se non esiste già nel progetto, imposta questo valore su true.
    • BUCKET_ID: il nome del bucket Cloud Storage in cui archiviare il file di importazione dei metadati generato dal connettore. Ogni esecuzione del flusso di lavoro crea un nuovo .
    • ADDITIONAL_CONNECTOR_ARGUMENTS: un elenco di argomenti aggiuntivi del connettore che utilizzi per definire il flusso di lavoro. Per alcuni esempi, vedi Sviluppa un connettore personalizzato per l'importazione dei metadati. Racchiudi ogni argomento tra virgolette doppie e separa gli argomenti con virgole.
    • CONTAINER_IMAGE: l'immagine container personalizzata del connettore ospitata in Artifact Registry.
    • ENTRY_TYPES: un elenco dei tipi di voce che rientrano nell'ambito dell'importazione, in il formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID LOCATION_ID deve essere la stessa località Google Cloud che importi o global.
    • ASPECT_TYPES: un elenco di tipi di aspetti che rientrano nell'ambito dell'importazione, nel formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. LOCATION_ID deve essere la stessa località Google Cloud che importi o global.
    • NETWORK_TAGS (facoltativo): un elenco di tag di rete.
    • (Facoltativo) NETWORK_URI: l'URI della rete VPC che si connette all'origine dati. Se fornisci una rete, ometti l'argomento subnet.
    • SUBNETWORK_URI (facoltativo): l'URI della sottorete che si connette all'origine dati. Se fornisci una sottorete, ometti l'argomento network.
  3. Inizializza Terraform:

    terraform init
    
  4. Convalida Terraform con il file .tfvars:

    terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Sostituisci CONNECTOR_VARIABLES_FILE con il nome del file delle definizioni delle variabili.

  5. Esegui il deployment di Terraform con il tuo file .tfvars:

    terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Terraform crea un flusso di lavoro e un job Cloud Scheduler progetto specificato. Workflows esegue la pipeline la pianificazione specificata dall'utente.

    A seconda della quantità di metadati importati, l'esecuzione del flusso di lavoro potrebbe richiedere diversi minuti o più. Per ulteriori informazioni su come visualizzare l'avanzamento, vedere Accedi ai risultati di esecuzione del flusso di lavoro.

    Al termine dell'esecuzione della pipeline, puoi cercare i metadati importati in Dataplex Catalog.

Visualizza i log dei job

Utilizza Cloud Logging per visualizzare i log di una pipeline di connettività gestita. Il log Il payload include un link ai log per Dataproc Serverless dal job batch e dal job di importazione dei metadati, se pertinente. Per ulteriori informazioni, consulta Visualizzare i log del flusso di lavoro.

Risoluzione dei problemi

Segui questi suggerimenti per la risoluzione dei problemi:

Passaggi successivi