Importer des métadonnées à partir d'une source personnalisée à l'aide de workflows

Ce document explique comment importer des métadonnées depuis une source tierce dans Dataplex Universal Catalog en exécutant un pipeline de connectivité gérée dans Workflows.

Pour configurer un pipeline de connectivité gérée, vous devez créer un connecteur pour votre source de données. Vous exécutez ensuite le pipeline dans Workflows. Le pipeline extrait les métadonnées de votre source de données, puis les importe dans Dataplex Universal Catalog. Si nécessaire, le pipeline crée également des groupes d'entrées Dataplex Universal Catalog dans votre projet Google Cloud .

Pour en savoir plus sur la connectivité gérée, consultez Présentation de la connectivité gérée.

Avant de commencer

Avant d'importer des métadonnées, effectuez les tâches décrites dans cette section.

Créer un connecteur

Un connecteur extrait les métadonnées de votre source de données et génère un fichier d'importation de métadonnées qui peut être importé par Dataplex Universal Catalog. Le connecteur est une image Artifact Registry pouvant s'exécuter sur Dataproc sans serveur.

Configurer les ressources Google Cloud

  1. Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.

    Enable the APIs

    Si vous ne prévoyez pas d'exécuter le pipeline selon une planification, vous n'avez pas besoin d'activer l'API Cloud Scheduler.

  2. Créez des secrets dans Secret Manager pour stocker les identifiants de votre source de données tierce.

  3. Configurez votre réseau de cloud privé virtuel (VPC) de façon à exécuter des charges de travail Dataproc sans serveur pour Spark.

  4. Créez un bucket Cloud Storage pour stocker les fichiers d'importation de métadonnées.

  5. Créez les ressources Dataplex Universal Catalog suivantes :

    1. Créez des types d'aspects personnalisés pour les entrées que vous souhaitez importer.

    2. Créez des types d'entrées personnalisés pour les entrées que vous souhaitez importer.

Rôles requis

Un compte de service représente l'identité d'un workflow et détermine les autorisations dont il dispose ainsi que les ressources Google Cloud auxquelles il peut accéder. Vous avez besoin d'un compte de service pour Workflows (afin d'exécuter le pipeline) et pour Dataproc sans serveur (afin d'exécuter le connecteur).

Vous pouvez utiliser le compte de service Compute Engine par défaut (PROJECT_NUMBER-compute@developer.gserviceaccount.com) ou créer votre propre compte de service (ou vos propres comptes) pour exécuter le pipeline de connectivité gérée.

Console

  1. Dans la console Google Cloud , accédez à la page IAM.

    Accéder à IAM

  2. Sélectionnez le projet dans lequel vous souhaitez importer les métadonnées.

  3. Cliquez sur  Accorder l'accès, puis saisissez l'adresse e-mail du compte de service.

  4. Attribuez les rôles suivants au compte de service :

    • Rédacteur de journaux
    • Propriétaire du groupe d'entrées Dataplex
    • Propriétaire de jobs de métadonnées Dataplex
    • Éditeur de catalogue Dataplex
    • Éditeur Dataproc
    • Nœud de calcul Dataproc
    • Accesseur de secrets Secret Manager : sur le secret qui stocke les identifiants de votre source de données.
    • Utilisateur d'objets Storage : sur le bucket Cloud Storage.
    • Lecteur Artifact Registry : sur le dépôt Artifact Registry contenant l'image de connecteur.
    • Utilisateur du compte de service : si vous utilisez différents comptes de service, attribuez au compte de service qui exécute Workflows ce rôle sur le compte de service qui exécute les jobs par lot Dataproc sans serveur.
    • Demandeur de workflows : si vous souhaitez planifier l'exécution du pipeline.
  5. Enregistrez les modifications.

gcloud

  1. Attribuez des rôles au compte de service. Exécutez les commandes suivantes :

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/logging.logWriter
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.entryGroupOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.metadataJobOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.catalogEditor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.editor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.worker
    

    Remplacez les éléments suivants :

    • PROJECT_ID : nom du projet Google Cloudcible dans lequel importer les métadonnées.
    • SERVICE_ACCOUNT_ID : compte de service, par exemple my-service-account@my-project.iam.gserviceaccount.com.
  2. Attribuez les rôles suivants au compte de service au niveau des ressources :

    gcloud secrets add-iam-policy-binding SECRET_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/secretmanager.secretaccessor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/storage.objectUser \
        --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')
    gcloud artifacts repositories add-iam-policy-binding REPOSITORY \
        --location=REPOSITORY_LOCATION \
        --member=SERVICE_ACCOUNT_ID} \
        --role=roles/artifactregistry.reader
    

    Remplacez les éléments suivants :

    • SECRET_ID : ID du secret qui stocke les identifiants de votre source de données. Il utilise le format projects/PROJECT_ID/secrets/SECRET_ID.
    • BUCKET_ID : nom du bucket Cloud Storage.
    • REPOSITORY : dépôt Artifact Registry contenant l'image de connecteur.
    • REPOSITORY_LOCATION : emplacement Google Cloudoù le dépôt est hébergé.
  3. Attribuez au compte de service qui exécute Workflows le rôle roles/iam.serviceAccountUser sur le compte de service qui exécute les jobs par lot Dataproc sans serveur. Vous devez attribuer ce rôle même si vous utilisez le même compte de service pour Workflows et Dataproc sans serveur.

    gcloud iam service-accounts add-iam-policy-binding \
        serviceAccount:SERVICE_ACCOUNT_ID \
        --member='SERVICE_ACCOUNT_ID' \
        --role='roles/iam.serviceAccountUser'
    

    Si vous utilisez différents comptes de service, la valeur de l'option --member correspond au compte de service qui exécute les jobs par lot Dataproc sans serveur.

  4. Si vous souhaitez planifier l'exécution du pipeline, attribuez le rôle suivant au compte de service :

    gcloud projects add-iam-policy-binding PROJECT_ID \
     --member="SERVICE_ACCOUNT_ID" \
     --role=roles/workflows.invoker
    

Importer les métadonnées

Pour importer des métadonnées, créez et exécutez un workflow qui exécute le pipeline de connectivité gérée. Vous pouvez également créer une planification pour exécuter le pipeline.

Console

  1. Créez le workflow. Fournissez les informations suivantes :

    • Compte de service : compte de service que vous avez configuré dans la section Rôles requis de ce document.
    • Chiffrement : sélectionnez Google-managed encryption key.

    • Définissez le workflow en fournissant le fichier de définition suivant :

      main:
        params: [args]
        steps:
          - init:
              assign:
              - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
              - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
              - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
              - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
      
          - check_networking:
              switch:
                - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                  raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
                - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                  steps:
                   - submit_extract_job_with_default_network_uri:
                        assign:
                          - NETWORK_TYPE: "networkUri"
                          - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
                - condition: ${NETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_network_uri:
                        assign:
                          - NETWORKING: ${NETWORK_URI}
                          - NETWORK_TYPE: "networkUri"
                - condition: ${SUBNETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_subnetwork_uri:
                        assign:
                          - NETWORKING: ${SUBNETWORK_URI}
                          - NETWORK_TYPE: "subnetworkUri"
              next: check_create_target_entry_group
      
          - check_create_target_entry_group:
              switch:
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                  next: create_target_entry_group
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                  next: prepare_pyspark_job_body
      
          - create_target_entry_group:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: prepare_pyspark_job_body
      
          - prepare_pyspark_job_body:
              assign:
                - pyspark_batch_body:
                    mainPythonFileUri: file:///main.py
                    args:
                      - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                      - ${"--target_location_id=" + args.CLOUD_REGION}
                      - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                      - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                      - ${"--output_folder=" + WORKFLOW_ID}
                      - ${args.ADDITIONAL_CONNECTOR_ARGS}
              next: add_jar_file_uri_if_present
      
          - add_jar_file_uri_if_present:
              switch:
                - condition: ${args.JAR_FILE_URI != "" and args.JAR_FILE_URI != null}
                  assign:
                    - pyspark_batch_body.jarFileUris : ${args.JAR_FILE_URI}
              next: generate_extract_job_link
      
          - generate_extract_job_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                  severity: "INFO"
              next: submit_pyspark_extract_job
      
          - submit_pyspark_extract_job:
              call: http.post
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                headers:
                  Content-Type: "application/json"
                query:
                  batchId: ${WORKFLOW_ID}
                body:
                  pysparkBatch: ${pyspark_batch_body}
                  runtimeConfig:
                      containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                  environmentConfig:
                      executionConfig:
                          serviceAccount: ${args.SERVICE_ACCOUNT}
                          stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                          ${NETWORK_TYPE}: ${NETWORKING}
                          networkTags: ${NETWORK_TAGS}
              result: RESPONSE_MESSAGE
              next: check_pyspark_extract_job
      
          - check_pyspark_extract_job:
              call: http.get
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: PYSPARK_EXTRACT_JOB_STATUS
              next: check_pyspark_extract_job_done
      
          - check_pyspark_extract_job_done:
              switch:
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                  next: generate_import_logs_link
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              next: pyspark_extract_job_wait
      
          - pyspark_extract_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_pyspark_extract_job
      
          - generate_import_logs_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                  severity: "INFO"
              next: submit_import_job
      
          - submit_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                body:
                  type: IMPORT
                  import_spec:
                    source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                    entry_sync_mode: FULL
                    aspect_sync_mode: INCREMENTAL
                    log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                    scope:
                      entry_groups: 
                        - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                      entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                      aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
              result: IMPORT_JOB_RESPONSE
              next: get_job_start_time
      
          - get_job_start_time:
              assign:
                - importJobStartTime: ${sys.now()}
              next: import_job_startup_wait
      
          - import_job_startup_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: initial_get_import_job
      
          - initial_get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_status_available
      
          - check_import_job_status_available:
              switch:
                - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                  next: kill_import_job
                - condition: ${"status" in IMPORT_JOB_STATUS.body}
                  next: check_import_job_done
              next: import_job_status_wait
      
          - import_job_status_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_import_job_status_available
      
          - check_import_job_done:
              switch:
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                  next: the_end
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                  next: kill_import_job
              next: import_job_wait
      
          - get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_done
      
          - import_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: get_import_job
      
          - kill_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: get_killed_import_job
      
          - get_killed_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: KILLED_IMPORT_JOB_STATUS
              next: killed
      
          - killed:
              raise: ${KILLED_IMPORT_JOB_STATUS}
      
          - the_end:
              return: ${IMPORT_JOB_STATUS}
      
  2. Pour exécuter le pipeline à la demande, exécutez le workflow.

    Indiquez les arguments d'exécution suivants :

    {
        "TARGET_PROJECT_ID": "PROJECT_ID",
        "CLOUD_REGION": "LOCATION_ID",
        "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
        "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
        "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
        "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
        "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
        "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
        "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
        "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
        "IMPORT_JOB_LOG_LEVEL": "INFO",
        "JAR_FILE_URI": "",
        "NETWORK_TAGS": [],
        "NETWORK_URI": "",
        "SUBNETWORK_URI": ""
     }
    

    Remplacez les éléments suivants :

    • PROJECT_ID : nom du projet Google Cloudcible dans lequel importer les métadonnées.
    • LOCATION_ID : emplacement Google Cloud cible où s'exécuteront les jobs d'importation de métadonnées et Dataproc sans serveur, et où les métadonnées seront importées.
    • ENTRY_GROUP_ID : ID du groupe d'entrées dans lequel importer les métadonnées. L'ID du groupe d'entrées peut contenir des lettres minuscules, des chiffres et des traits d'union.

      Le nom de ressource complet de ce groupe d'entrées est projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN : si vous souhaitez que le pipeline crée le groupe d'entrées s'il n'existe pas déjà dans votre projet, définissez cette valeur sur true.
    • BUCKET_ID : nom du bucket Cloud Storage dans lequel stocker le fichier d'importation de métadonnées généré par le connecteur. Chaque exécution de workflow crée un dossier.
    • SERVICE_ACCOUNT_ID : compte de service que vous avez configuré dans la section Rôles requis de ce document. Le compte de service exécute le connecteur dans Dataproc sans serveur.
    • ADDITIONAL_CONNECTOR_ARGUMENTS : liste d'arguments supplémentaires à transmettre au connecteur. Pour obtenir des exemples, consultez Développer un connecteur personnalisé pour l'importation de métadonnées. Placez chaque argument entre guillemets doubles et séparez-les par des virgules.
    • CONTAINER_IMAGE : image du conteneur personnalisé du connecteur hébergé dans Artifact Registry.
    • ENTRY_TYPES : liste des types d'entrées concernés par l'importation, au format projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. Le LOCATION_ID doit correspondre au même emplacementGoogle Cloud que celui dans lequel vous importez les métadonnées, ou être défini sur global.
    • ASPECT_TYPES : liste des types d'aspects concernés par l'importation, au format projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. Le LOCATION_ID doit correspondre au même emplacementGoogle Cloud que celui dans lequel vous importez les métadonnées, ou être défini sur global.
    • (Facultatif) Pour l'argument NETWORK_TAGS, fournissez une liste de tags réseau.
    • (Facultatif) Pour l'argument NETWORK_URI, indiquez l'URI du réseau VPC qui se connecte à la source de données. Si vous spécifiez un réseau, omettez l'argument "subnetwork".
    • (Facultatif) Pour l'argument SUBNETWORK_URI, indiquez l'URI du sous-réseau qui se connecte à la source de données. Si vous spécifiez un sous-réseau, omettez l'argument "network".

    Selon la quantité de métadonnées que vous importez, l'exécution du pipeline peut prendre plusieurs minutes, voire plus. Pour savoir comment afficher la progression, consultez Accéder aux résultats d'exécution d'un workflow.

    Une fois l'exécution du pipeline terminée, vous pouvez rechercher les métadonnées importées dans Dataplex Universal Catalog.

  3. (Facultatif) Si vous souhaitez exécuter le pipeline selon une planification, créez-la à l'aide de Cloud Scheduler. Fournissez les informations suivantes :

    • Fréquence : expression unix-cron qui définit la planification de l'exécution du pipeline.
    • Argument du workflow : arguments d'exécution du connecteur, comme décrit à l'étape précédente.
    • Compte de service : compte de service qui gère le planificateur.

gcloud

  1. Enregistrez la définition de charge de travail suivante au format YAML :

    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                steps:
                 - submit_extract_job_with_default_network_uri:
                      assign:
                        - NETWORK_TYPE: "networkUri"
                        - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
              - condition: ${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: ${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: ${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: ${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: prepare_pyspark_job_body
    
        - create_target_entry_group:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: prepare_pyspark_job_body
    
        - prepare_pyspark_job_body:
            assign:
              - pyspark_batch_body:
                  mainPythonFileUri: file:///main.py
                  args:
                    - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - ${"--target_location_id=" + args.CLOUD_REGION}
                    - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--output_folder=" + WORKFLOW_ID}
                    - ${args.ADDITIONAL_CONNECTOR_ARGS}
            next: add_jar_file_uri_if_present
    
        - add_jar_file_uri_if_present:
            switch:
              - condition: ${args.JAR_FILE_URI != "" and args.JAR_FILE_URI != null}
                assign:
                  - pyspark_batch_body.jarFileUris : ${args.JAR_FILE_URI}
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch: ${pyspark_batch_body}
                runtimeConfig:
                    containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: ${args.SERVICE_ACCOUNT}
                        stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                        ${NETWORK_TYPE}: ${NETWORKING}
                        networkTags: ${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: ${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: ${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: ${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: ${IMPORT_JOB_STATUS}
    
  2. Définissez les variables Bash, créez le workflow et, si vous le souhaitez, créez une planification pour exécuter le pipeline :

    # Define Bash variables (replace with your actual values)
    project_id="PROJECT_ID"
    region="LOCATION_ID"
    service_account="SERVICE_ACCOUNT_ID"
    workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    # Create Workflows resource
    gcloud workflows deploy ${workflow_name} \
      --project=${project_id} \
      --location=${region} \
      --source=${workflow_source} \
      --service-account=${service_account}
    
    # Create Cloud Scheduler job
    gcloud scheduler jobs create http ${workflow_name}-scheduler \
      --project=${project_id} \
      --location=${region} \
      --schedule="CRON_SCHEDULE_EXPRESSION" \
      --time-zone="UTC" \
      --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \
      --http-method="POST" \
      --oauth-service-account-email=${service_account} \
      --headers="Content-Type=application/json" \
      --message-body='{"argument": ${workflow_args}}'
    

    Remplacez les éléments suivants :

    • PROJECT_ID : nom du projet Google Cloudcible dans lequel importer les métadonnées.
    • LOCATION_ID : emplacement Google Cloud cible où s'exécuteront les jobs d'importation de métadonnées et Dataproc sans serveur, et où les métadonnées seront importées.
    • SERVICE_ACCOUNT_ID : compte de service que vous avez configuré dans la section Rôles requis de ce document.
    • WORKFLOW_DEFINITION_FILE : chemin d'accès au fichier YAML de définition du workflow.
    • WORKFLOW_NAME : nom du workflow.
    • WORKFLOW_ARGUMENTS : arguments d'exécution à transmettre au connecteur. Les arguments sont au format JSON :

      {
          "TARGET_PROJECT_ID": "PROJECT_ID",
          "CLOUD_REGION": "LOCATION_ID",
          "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
          "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
          "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
          "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
          "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
          "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
          "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
          "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
          "IMPORT_JOB_LOG_LEVEL": "INFO",
          "JAR_FILE_URI": "",
          "NETWORK_TAGS": [],
          "NETWORK_URI": "",
          "SUBNETWORK_URI": ""
       }
      

      Pour Cloud Scheduler, les guillemets doubles dans la chaîne entre guillemets sont échappés avec des barres obliques inverses (\). Par exemple : --message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}".

      Remplacez les éléments suivants :

      • ENTRY_GROUP_ID : ID du groupe d'entrées dans lequel importer les métadonnées. L'ID du groupe d'entrées peut contenir des lettres minuscules, des chiffres et des traits d'union.

        Le nom de ressource complet de ce groupe d'entrées est projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

      • CREATE_ENTRY_GROUP_BOOLEAN : si vous souhaitez que le pipeline crée le groupe d'entrées s'il n'existe pas déjà dans votre projet, définissez cette valeur sur true.
      • BUCKET_ID : nom du bucket Cloud Storage dans lequel stocker le fichier d'importation de métadonnées généré par le connecteur. Chaque exécution de workflow crée un dossier.
      • ADDITIONAL_CONNECTOR_ARGUMENTS : liste d'arguments supplémentaires à transmettre au connecteur. Pour obtenir des exemples, consultez Développer un connecteur personnalisé pour l'importation de métadonnées.
      • CONTAINER_IMAGE : image du conteneur personnalisé du connecteur hébergé dans Artifact Registry.
      • ENTRY_TYPES : liste des types d'entrées concernés par l'importation, au format projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. Le LOCATION_ID doit correspondre au même emplacementGoogle Cloud que celui dans lequel vous importez les métadonnées, ou être défini sur global.
      • ASPECT_TYPES : liste des types d'aspects concernés par l'importation, au format projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. Le LOCATION_ID doit correspondre au même emplacementGoogle Cloud que celui dans lequel vous importez les métadonnées, ou être défini sur global.
      • (Facultatif) Pour l'argument NETWORK_TAGS, fournissez une liste de tags réseau.
      • (Facultatif) Pour l'argument NETWORK_URI, indiquez l'URI du réseau VPC qui se connecte à la source de données. Si vous spécifiez un réseau, omettez l'argument "subnetwork".
      • (Facultatif) Pour l'argument SUBNETWORK_URI, indiquez l'URI du sous-réseau qui se connecte à la source de données. Si vous spécifiez un sous-réseau, omettez l'argument "network".
    • CRON_SCHEDULE_EXPRESSION : expression cron qui définit la planification de l'exécution du pipeline. Par exemple, pour planifier l'exécution du pipeline à minuit tous les jours, utilisez l'expression 0 0 * * *.

  3. Pour exécuter le pipeline à la demande, exécutez le workflow :

    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    gcloud workflows run "${workflow_name}" --project=${project_id} --location=${location} --data '${workflow_args}'
    

    Les arguments du workflow sont au format JSON, mais ne sont pas échappés.

    Selon la quantité de métadonnées que vous importez, l'exécution du workflow peut prendre plusieurs minutes, voire plus. Pour savoir comment afficher la progression, consultez Accéder aux résultats d'exécution d'un workflow.

    Une fois l'exécution du pipeline terminée, vous pouvez rechercher les métadonnées importées dans Dataplex Universal Catalog.

Terraform

  1. Clonez le dépôt cloud-dataplex.

    Le dépôt inclut les fichiers Terraform suivants :

  2. Modifiez le fichier .tfvars pour remplacer les espaces réservés par les informations de votre connecteur.

    project_id                      = "PROJECT_ID"
    region                          = "LOCATION_ID"
    service_account                 = "SERVICE_ACCOUNT_ID"
    cron_schedule                   = "CRON_SCHEDULE_EXPRESSION"
    workflow_args                   = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID", "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [], "NETWORK_URI": "", "SUBNETWORK_URI": ""}
    
    
    workflow_source                 = <<EOF
    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: $${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: $${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: $${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: $${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: $${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: $${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - $${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - $${"--target_location_id=" + args.CLOUD_REGION}
                    - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - $${"--output_folder=" + WORKFLOW_ID}
                    - $${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: $${args.SERVICE_ACCOUNT}
                        stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
                        $${NETWORK_TYPE}: $${NETWORKING}
                        networkTags: $${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: $${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: $${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: $${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: $${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: $${IMPORT_JOB_STATUS}
    EOF
    

    Remplacez les éléments suivants :

    • PROJECT_ID : nom du projet Google Cloudcible dans lequel importer les métadonnées.
    • LOCATION_ID : emplacement Google Cloud cible où s'exécuteront les jobs d'importation de métadonnées et Dataproc sans serveur, et où les métadonnées seront importées.
    • SERVICE_ACCOUNT_ID : compte de service que vous avez configuré dans la section Rôles requis de ce document.
    • CRON_SCHEDULE_EXPRESSION : expression cron qui définit la planification de l'exécution du pipeline. Par exemple, pour planifier l'exécution du pipeline à minuit tous les jours, utilisez l'expression 0 0 * * *.
    • ENTRY_GROUP_ID : ID du groupe d'entrées dans lequel importer les métadonnées. L'ID du groupe d'entrées peut contenir des lettres minuscules, des chiffres et des traits d'union.

      Le nom de ressource complet de ce groupe d'entrées est projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN : si vous souhaitez que le pipeline crée le groupe d'entrées s'il n'existe pas déjà dans votre projet, définissez cette valeur sur true.
    • BUCKET_ID : nom du bucket Cloud Storage dans lequel stocker le fichier d'importation de métadonnées généré par le connecteur. Chaque exécution de workflow crée un dossier.
    • ADDITIONAL_CONNECTOR_ARGUMENTS : liste d'arguments supplémentaires à transmettre au connecteur. Pour obtenir des exemples, consultez Développer un connecteur personnalisé pour l'importation de métadonnées. Placez chaque argument entre guillemets doubles et séparez-les par des virgules.
    • CONTAINER_IMAGE : image du conteneur personnalisé du connecteur hébergé dans Artifact Registry.
    • ENTRY_TYPES : liste des types d'entrées concernés par l'importation, au format projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. Le LOCATION_ID doit correspondre au même emplacementGoogle Cloud que celui dans lequel vous importez les métadonnées, ou être défini sur global.
    • ASPECT_TYPES : liste des types d'aspects concernés par l'importation, au format projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. Le LOCATION_ID doit correspondre au même emplacementGoogle Cloud que celui dans lequel vous importez les métadonnées, ou être défini sur global.
    • (Facultatif) Pour l'argument NETWORK_TAGS, fournissez une liste de tags réseau.
    • (Facultatif) Pour l'argument NETWORK_URI, indiquez l'URI du réseau VPC qui se connecte à la source de données. Si vous spécifiez un réseau, omettez l'argument "subnetwork".
    • (Facultatif) Pour l'argument SUBNETWORK_URI, indiquez l'URI du sous-réseau qui se connecte à la source de données. Si vous spécifiez un sous-réseau, omettez l'argument "network".
  3. Initialisez Terraform :

    terraform init
    
  4. Validez Terraform avec votre fichier .tfvars :

    terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Remplacez CONNECTOR_VARIABLES_FILE par le nom de votre fichier de définitions de variables.

  5. Déployez Terraform avec votre fichier .tfvars :

    terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Terraform crée un workflow et un job Cloud Scheduler dans le projet spécifié. Workflows exécute le pipeline selon la planification que vous spécifiez.

    Selon la quantité de métadonnées que vous importez, l'exécution du workflow peut prendre plusieurs minutes, voire plus. Pour savoir comment afficher la progression, consultez Accéder aux résultats d'exécution d'un workflow.

    Une fois l'exécution du pipeline terminée, vous pouvez rechercher les métadonnées importées dans Dataplex Universal Catalog.

Afficher les journaux de jobs

Utilisez Cloud Logging pour afficher les journaux d'un pipeline de connectivité gérée. La charge utile de journal inclut un lien vers les journaux du job par lot Dataproc sans serveur et du job d'importation de métadonnées, le cas échéant. Pour en savoir plus, consultez Afficher les journaux de workflow.

Dépannage

Suivez ces suggestions de dépannage :

Étapes suivantes