Ce document explique comment importer des métadonnées à partir d'une source tierce dans Dataplex en exécutant un pipeline de connectivité géré dans Workflows.
Pour configurer un pipeline de connectivité géré, vous devez créer un connecteur pour votre source de données. Ensuite, vous exécuterez le pipeline dans Workflows. Le pipeline extrait les métadonnées de votre source de données, puis les importe dans Dataplex. Si nécessaire, le pipeline crée également Groupes d'entrées du catalogue Dataplex dans votre projet Google Cloud.
Pour en savoir plus sur la connectivité gérée, consultez la présentation de la connectivité gérée.
Avant de commencer
Avant d'importer des métadonnées, effectuez les tâches de cette section.
Créer un connecteur
Un connecteur extrait les métadonnées de votre source de données et génère un fichier d'importation de métadonnées pouvant être importé par Dataplex. Le connecteur est une image Artifact Registry qui peut être exécutée sur Dataproc sans serveur.
Créez un connecteur personnalisé qui extrait les métadonnées de votre source tierce.
Pour obtenir un exemple de connecteur que vous pouvez utiliser comme modèle de référence pour créer votre propre connecteur, consultez la section Développer un connecteur personnalisé pour l'importation de métadonnées.
Configurer les ressources Google Cloud
-
Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.
Si vous ne prévoyez pas d'exécuter le pipeline selon un calendrier, vous n'avez pas besoin d'activer l'API Cloud Scheduler.
Créer des secrets dans Secret Manager pour stocker les identifiants de votre source de données tierce.
Configurez votre réseau de cloud privé virtuel (VPC) pour exécuter Dataproc sans serveur pour les charges de travail Spark.
Créez un bucket Cloud Storage pour stocker les fichiers d'importation des métadonnées.
Créez les ressources Dataplex Catalog suivantes :
Créer des types d'aspect personnalisés pour les entrées que vous souhaitez importer.
Créez des types d'entrées personnalisés pour les entrées que vous souhaitez importer.
Rôles requis
Un compte de service représente l'identité d'un workflow et détermine les autorisations dont il dispose et les ressources Google Cloud auxquelles il peut accéder. Vous avez besoin d'un compte de service pour Workflows (pour exécuter le pipeline) et pour Dataproc sans serveur (pour exécuter le connecteur).
Vous pouvez utiliser le compte de service Compute Engine par défaut
(PROJECT_NUMBER-compute@developer.gserviceaccount.com
) ou créer votre propre compte de service
pour exécuter le pipeline de connectivité gérée.
Console
Dans la console Google Cloud, accédez à la page IAM.
Sélectionnez le projet dans lequel vous souhaitez importer les métadonnées.
Cliquez sur
Accorder l'accès, puis saisissez l'adresse e-mail du compte de service.Attribuez les rôles suivants au compte de service:
- Rédacteur de journaux
- Propriétaire du groupe d'entrées Dataplex
- Propriétaire de jobs de métadonnées Dataplex
- Éditeur de catalogue Dataplex
- Éditeur Dataproc
- Nœud de calcul Dataproc
- Accesseur de secrets Secret Manager : sur le secret qui stocke Les identifiants de votre source de données
- Utilisateur des objets Storage : sur le bucket Cloud Storage
- Lecteur Artifact Registry, sur le dépôt Artifact Registry contenant l'image du connecteur
- Utilisateur du compte de service : si vous utilisez différents comptes de service, attribuez ce rôle au compte de service qui exécute les workflows sur le compte de service qui exécute les tâches par lot sans serveur Dataproc.
- Workflows Demander (Demandeur de workflows) : si vous souhaitez planifier le pipeline
Enregistrez les modifications.
gcloud
Attribuez des rôles au compte de service. Exécutez les commandes suivantes :
gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/logging.logWriter gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/dataplex.entryGroupOwner gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/dataplex.metadataJobOwner gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/dataplex.catalogEditor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/dataproc.editor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/dataproc.worker
Remplacez les éléments suivants :
- PROJECT_ID: ID de projet de la cible. Projet Google Cloud dans lequel importer les métadonnées.
- SERVICE_ACCOUNT: compte de service, par exemple
my-service-account@my-project.iam.gserviceaccount.com
Attribuez au compte de service les rôles suivants au niveau de la ressource:
gcloud secrets add-iam-policy-binding SECRET_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/secretmanager.secretaccessor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/storage.objectUser \ --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID') gcloud artifacts repositories add-iam-policy-binding REPOSITORY \ --location=REPOSITORY_LOCATION \ --member=SERVICE_ACCOUNT} \ --role=roles/artifactregistry.reader
Remplacez les éléments suivants :
- SECRET_ID : ID du secret qui stocke les identifiants de votre source de données. Il utilise le format
projects/PROJECT_ID/secrets/SECRET_ID
. - BUCKET_ID: nom de l'instance Cloud Storage bucket.
- REPOSITORY: dépôt Artifact Registry contenant l'image du connecteur.
- REPOSITORY_LOCATION : emplacement Google Cloud où le dépôt est hébergé.
- SECRET_ID : ID du secret qui stocke les identifiants de votre source de données. Il utilise le format
Accordez au compte de service exécutant Workflows le rôle Rôle
roles/iam.serviceAccountUser
sur le compte de service les jobs par lot Dataproc sans serveur. Vous devez attribuer ce rôle même si vous utilisez le même compte de service pour les workflows et Dataproc sans serveur.gcloud iam service-accounts add-iam-policy-binding \ serviceAccount:SERVICE_ACCOUNT \ --member='SERVICE_ACCOUNT' \ --role='roles/iam.serviceAccountUser'
Si vous utilisez différents comptes de service, la valeur de l'indicateur
--member
est le compte de service exécutant les tâches par lot Dataproc sans serveur.Si vous souhaitez planifier le pipeline, accordez au compte de service le rôle suivant:
gcloud projects add-iam-policy-binding PROJECT_ID \ --member="SERVICE_ACCOUNT" \ --role=roles/workflows.invoker
Importer les métadonnées
Pour importer des métadonnées, créez et exécutez un workflow qui exécute le pipeline de connectivité géré. Si vous le souhaitez, vous pouvez également créer un calendrier d'exécution du pipeline.
Console
Créez le workflow. Fournissez les informations suivantes :
- Compte de service: le compte de service que vous avez configuré dans le Rôles requis de ce document.
Chiffrement : sélectionnez Clé de chiffrement gérée par Google.
Définir le workflow : fournissez le fichier de définition suivant :
main: params: [args] steps: - init: assign: - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")} - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")} - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")} - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])} - NETWORK_TYPE: "networkUri" - check_networking: switch: - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""} raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one." - condition: ${NETWORK_URI != ""} steps: - submit_extract_job_with_network_uri: assign: - NETWORKING: ${NETWORK_URI} - NETWORK_TYPE: "networkUri" - condition: ${SUBNETWORK_URI != ""} steps: - submit_extract_job_with_subnetwork_uri: assign: - NETWORKING: ${SUBNETWORK_URI} - NETWORK_TYPE: "subnetworkUri" next: check_create_target_entry_group - check_create_target_entry_group: switch: - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true} next: create_target_entry_group - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false} next: generate_extract_job_link - create_target_entry_group: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: generate_extract_job_link - generate_extract_job_link: call: sys.log args: data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID} severity: "INFO" next: submit_pyspark_extract_job - submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py args: - ${"--target_project_id=" + args.TARGET_PROJECT_ID} - ${"--target_location_id=" + args.CLOUD_REGION} - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--output_folder=" + WORKFLOW_ID} - ${args.ADDITIONAL_CONNECTOR_ARGS} runtimeConfig: containerImage: ${args.CUSTOM_CONTAINER_IMAGE} environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID} ${NETWORK_TYPE}: ${NETWORKING} networkTags: ${NETWORK_TAGS} result: RESPONSE_MESSAGE next: check_pyspark_extract_job - check_pyspark_extract_job: call: http.get args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: PYSPARK_EXTRACT_JOB_STATUS next: check_pyspark_extract_job_done - check_pyspark_extract_job_done: switch: - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"} next: generate_import_logs_link - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"} raise: ${PYSPARK_EXTRACT_JOB_STATUS} - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"} raise: ${PYSPARK_EXTRACT_JOB_STATUS} next: pyspark_extract_job_wait - pyspark_extract_job_wait: call: sys.sleep args: seconds: 30 next: check_pyspark_extract_job - generate_import_logs_link: call: sys.log args: data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"} severity: "INFO" next: submit_import_job - submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")} scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID} entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES} aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES} result: IMPORT_JOB_RESPONSE next: get_job_start_time - get_job_start_time: assign: - importJobStartTime: ${sys.now()} next: import_job_startup_wait - import_job_startup_wait: call: sys.sleep args: seconds: 30 next: initial_get_import_job - initial_get_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_status_available - check_import_job_status_available: switch: - condition: ${"status" in IMPORT_JOB_STATUS.body} next: check_import_job_done - condition: ${sys.now() - importJobStartTime > 300} # 5 minutes = 300 seconds next: kill_import_job next: import_job_status_wait - import_job_status_wait: call: sys.sleep args: seconds: 30 next: check_import_job_status_available - check_import_job_done: switch: - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"} next: the_end - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"} raise: ${IMPORT_JOB_STATUS} - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"} raise: ${IMPORT_JOB_STATUS} - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"} raise: ${IMPORT_JOB_STATUS} - condition: ${sys.now() - importJobStartTime > 43200} # 12 hours = 43200 seconds next: kill_import_job next: import_job_wait - get_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_done - import_job_wait: call: sys.sleep args: seconds: 30 next: get_import_job - kill_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: get_killed_import_job - get_killed_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: KILLED_IMPORT_JOB_STATUS next: killed - killed: raise: ${KILLED_IMPORT_JOB_STATUS} - the_end: return: ${IMPORT_JOB_STATUS}
Pour exécuter le pipeline à la demande, exécutez le workflow.
Indiquez les arguments d'exécution suivants :
{ "TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT", "ADDITIONAL_CONNECTOR_ARGS": [ ADDITIONAL_CONNECTOR_ARGUMENTS ], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "SPARK_DRIVER_TYPE": "PYSPARK", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [NETWORK_TAGS], "NETWORK_URI": "NETWORK_URI", "SUBNETWORK_URI": "SUBNETWORK_URI" }
Remplacez les éléments suivants :
- PROJECT_ID: nom de la cible Projet Google Cloud dans lequel importer les métadonnées.
- LOCATION_ID : emplacement Google Cloud cible où les tâches d'importation de métadonnées et de Dataproc sans serveur s'exécuteront, et dans lequel les métadonnées seront importées.
ENTRY_GROUP_ID: ID du groupe d'entrées à importer des métadonnées. L'ID du groupe d'entrée peut contenir des lettres minuscules, des chiffres et des traits d'union.
Le nom de ressource complet de ce groupe d'entrées est
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
CREATE_ENTRY_GROUP_BOOLEAN: si vous souhaitez que le pour créer le groupe d'entrées s'il n'existe pas déjà dans votre projet, définissez cette valeur sur
true
.BUCKET_ID: nom de l'instance Cloud Storage bucket pour stocker le fichier d'importation de métadonnées généré par le connecteur. Chaque exécution de workflow crée un dossier.
SERVICE_ACCOUNT : compte de service. Le compte de service exécute le connecteur dans Dataproc sans serveur.
ADDITIONAL_CONNECTOR_ARGUMENTS: a liste des arguments supplémentaires à transmettre au connecteur. Pour obtenir des exemples, consultez la section Développer un connecteur personnalisé pour l'importation de métadonnées. Placez chaque argument entre guillemets doubles et séparez-les par des virgules.
CONTAINER_IMAGE : image de conteneur personnalisée du connecteur hébergée dans Artifact Registry.
ENTRY_TYPES: liste des types d'entrées inclus dans le champ d'application de l'importation, au format
projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
LOCATION_ID
doit être le même emplacement Google Cloud que celui dans lequel vous importez les métadonnées ouglobal
.ASPECT_TYPES : liste des types d'aspects pouvant être importés, au format
projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
. La valeur deLOCATION_ID
doit être identique Emplacement Google Cloud dans lequel vous importez des métadonnées, ouglobal
.NETWORK_TAGS (facultatif): liste des tags réseau.
NETWORK_URI (facultatif) : URI du réseau VPC qui se connecte à la source de données. Si vous indiquez un réseau, omettez l'argument "subnetwork".
SUBNETWORK_URI (facultatif): URI de la qui se connecte à la source de données. Si vous fournissez un sous-réseau, omettez l'argument de réseau.
Selon la quantité de métadonnées que vous importez, le pipeline peut prendre plusieurs minutes, voire plus. Pour savoir comment afficher la progression, consultez Accéder aux résultats de l'exécution du workflow.
Une fois l'exécution du pipeline terminée, vous pouvez rechercher les métadonnées importées dans le catalogue Dataplex.
Facultatif: Si vous souhaitez exécuter le pipeline selon un calendrier, créer une planification en utilisant Cloud Scheduler. Fournissez les informations suivantes :
- Fréquence : expression unix-cron qui définit la planification d'exécution du pipeline.
- Argument de workflow : arguments d'exécution du connecteur, comme décrit à l'étape précédente.
- Compte de service : compte de service. Le compte de service gère le planificateur.
gcloud
Enregistrez la définition de charge de travail suivante en tant que fichier YAML :
main: params: [args] steps: - init: assign: - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")} - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")} - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")} - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])} - NETWORK_TYPE: "networkUri" - check_networking: switch: - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""} raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one." - condition: ${NETWORK_URI != ""} steps: - submit_extract_job_with_network_uri: assign: - NETWORKING: ${NETWORK_URI} - NETWORK_TYPE: "networkUri" - condition: ${SUBNETWORK_URI != ""} steps: - submit_extract_job_with_subnetwork_uri: assign: - NETWORKING: ${SUBNETWORK_URI} - NETWORK_TYPE: "subnetworkUri" next: check_create_target_entry_group - check_create_target_entry_group: switch: - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true} next: create_target_entry_group - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false} next: generate_extract_job_link - create_target_entry_group: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: generate_extract_job_link - generate_extract_job_link: call: sys.log args: data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID} severity: "INFO" next: submit_pyspark_extract_job - submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py args: - ${"--target_project_id=" + args.TARGET_PROJECT_ID} - ${"--target_location_id=" + args.CLOUD_REGION} - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--output_folder=" + WORKFLOW_ID} - ${args.ADDITIONAL_CONNECTOR_ARGS} runtimeConfig: containerImage: ${args.CUSTOM_CONTAINER_IMAGE} environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID} ${NETWORK_TYPE}: ${NETWORKING} networkTags: ${NETWORK_TAGS} result: RESPONSE_MESSAGE next: check_pyspark_extract_job - check_pyspark_extract_job: call: http.get args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: PYSPARK_EXTRACT_JOB_STATUS next: check_pyspark_extract_job_done - check_pyspark_extract_job_done: switch: - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"} next: generate_import_logs_link - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"} raise: ${PYSPARK_EXTRACT_JOB_STATUS} - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"} raise: ${PYSPARK_EXTRACT_JOB_STATUS} next: pyspark_extract_job_wait - pyspark_extract_job_wait: call: sys.sleep args: seconds: 30 next: check_pyspark_extract_job - generate_import_logs_link: call: sys.log args: data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"} severity: "INFO" next: submit_import_job - submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")} scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID} entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES} aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES} result: IMPORT_JOB_RESPONSE next: get_job_start_time - get_job_start_time: assign: - importJobStartTime: ${sys.now()} next: import_job_startup_wait - import_job_startup_wait: call: sys.sleep args: seconds: 30 next: initial_get_import_job - initial_get_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_status_available - check_import_job_status_available: switch: - condition: ${"status" in IMPORT_JOB_STATUS.body} next: check_import_job_done - condition: ${sys.now() - importJobStartTime > 300} # 5 minutes = 300 seconds next: kill_import_job next: import_job_status_wait - import_job_status_wait: call: sys.sleep args: seconds: 30 next: check_import_job_status_available - check_import_job_done: switch: - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"} next: the_end - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"} raise: ${IMPORT_JOB_STATUS} - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"} raise: ${IMPORT_JOB_STATUS} - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"} raise: ${IMPORT_JOB_STATUS} - condition: ${sys.now() - importJobStartTime > 43200} # 12 hours = 43200 seconds next: kill_import_job next: import_job_wait - get_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_done - import_job_wait: call: sys.sleep args: seconds: 30 next: get_import_job - kill_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: get_killed_import_job - get_killed_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: KILLED_IMPORT_JOB_STATUS next: killed - killed: raise: ${KILLED_IMPORT_JOB_STATUS} - the_end: return: ${IMPORT_JOB_STATUS}
Définissez les variables Bash suivantes:
workflow_name="WORKFLOW_NAME" project_id="PROJECT_ID" location="LOCATION_ID" service_account="SERVICE_ACCOUNT" workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
Remplacez les éléments suivants :
- WORKFLOW_NAME: nom du workflow.
- PROJECT_ID: nom de la cible Projet Google Cloud dans lequel importer les métadonnées.
- LOCATION_ID : emplacement Google Cloud cible où les tâches d'importation de métadonnées et de Dataproc sans serveur s'exécuteront, et dans lequel les métadonnées seront importées.
- SERVICE_ACCOUNT : compte de service que vous avez configuré dans la section Rôles requis de ce document.
- WORKFLOW_DEFINITION_FILE: workflow fichier YAML de définition.
-
gcloud workflows deploy ${workflow_name} \ --project=${project_id} \ --location=${location} \ --source=${workflow_source} \ --service-account=${service_account}
Pour exécuter le pipeline à la demande, exécutez le workflow :
gcloud workflows run ${workflow_name} --project=${project_id} --location=${location} --data "$(cat << EOF { "TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT", "ADDITIONAL_CONNECTOR_ARGS": [ ADDITIONAL_CONNECTOR_ARGUMENTS ], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "SPARK_DRIVER_TYPE": "PYSPARK", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [NETWORK_TAGS], "NETWORK_URI": "NETWORK_URI", "SUBNETWORK_URI": "SUBNETWORK_URI" } EOF )"
Remplacez les éléments suivants :
ENTRY_GROUP_ID : ID du groupe d'entrées dans lequel importer les métadonnées. L'ID du groupe d'entrées peut contenir des minuscules lettres, chiffres et traits d'union.
Le nom de ressource complet de ce groupe d'entrées est
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
CREATE_ENTRY_GROUP_BOOLEAN: si vous souhaitez que le pour créer le groupe d'entrées s'il n'existe pas déjà dans votre projet, définissez cette valeur sur
true
.BUCKET_ID : nom du bucket Cloud Storage dans lequel stocker le fichier d'importation de métadonnées généré par le connecteur. Chaque exécution de workflow crée un dossier.
ADDITIONAL_CONNECTOR_ARGUMENTS : liste d'arguments supplémentaires à transmettre au connecteur. Pour obtenir des exemples, consultez la section Développer un connecteur personnalisé pour l'importation de métadonnées. Placez chaque argument entre guillemets. et séparez les arguments par des virgules.
CONTAINER_IMAGE : image de conteneur personnalisée du connecteur hébergée dans Artifact Registry.
ENTRY_TYPES: liste des types d'entrées inclus dans le champ d'application de l'importation, au format
projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
LOCATION_ID
doit être le même emplacement Google Cloud que celui dans lequel vous importez les métadonnées ouglobal
.ASPECT_TYPES : liste des types d'aspects pouvant être importés, au format
projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
. La valeur deLOCATION_ID
doit être identique Emplacement Google Cloud dans lequel vous importez des métadonnées, ouglobal
.NETWORK_TAGS (facultatif): liste des tags réseau.
NETWORK_URI (facultatif) : URI du réseau VPC qui se connecte à la source de données. Si vous indiquez un réseau, omettez l'argument "subnetwork".
SUBNETWORK_URI (facultatif) : URI du sous-réseau qui se connecte à la source de données. Si vous fournissez un , omettez l'argument "network".
Selon la quantité de métadonnées que vous importez, l'exécution du workflow peut prendre plusieurs minutes. Pour savoir comment voir la progression, voir Accédez aux résultats de l'exécution du workflow.
Une fois l'exécution du pipeline terminée, vous pouvez rechercher les métadonnées importées dans le catalogue Dataplex.
Facultatif : Si vous souhaitez exécuter le pipeline selon un calendrier, créez un calendrier à l'aide de Cloud Scheduler :
gcloud scheduler jobs create http ${workflow_name}-scheduler \ --project=${project_id} \ --location=${region} \ --schedule="SCHEDULE" \ --time-zone="TIME_ZONE" \ --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \ --http-method="POST" \ --oauth-service-account-email=${service_account} \ --headers="Content-Type=application/json" \ --message-body="{\"argument\": \"DOUBLE_ESCAPED_JSON_STRING\"}" \
Remplacez les éléments suivants :
- SCHEDULE : expression cron qui définit la planification d'exécution du pipeline.
- TIME_ZONE : fuseau horaire, par exemple
UTC
. - DOUBLE_ESCAPED_JSON_STRING: un encodage JSON
des arguments du workflow. Les guillemets doubles dans la chaîne entre guillemets sont échappés avec des barres obliques inverses (\). Par exemple :
--message-body="{\"argument\": \"{\\\"foo\\\": \\\"bar\\\"}\"}"
Terraform
Enregistrez les fichiers suivants :
Enregistrez en tant que
main.tf
:module "cloud_workflows" { source = "GoogleCloudPlatform/cloud-workflows/google" version = "0.1.1" workflow_name = var.workflow_name project_id = var.project_id region = var.region service_account_email = var.service_account workflow_trigger = { cloud_scheduler = { name = "${var.workflow_name}-scheduler" cron = "0 0 * * *" time_zone = "UTC" service_account_email = var.service_account deadline = "1800s" argument = jsonencode(var.workflow_args) } } workflow_source = var.workflow_source }
Enregistrer sous
variables.tf
:variable "project_id" { default = "" } variable "region" { default = "" } variable "service_account" { default = "" } variable "workflow_name" { default = "managed-orchestration-for-dataplex" } variable "description" { default = "Submits a Dataproc Serverless Job and then runs a Dataplex Import Job. Times out after 12 hours." } variable "workflow_args" { default = {} } variable "cron_schedule" { default = "0 0 * * *" } variable "workflow_source" { default = "" }
Enregistrez le fichier de définitions de variable suivant au format
.tfvars
. Remplacez les espaces réservés par les informations de votre connecteur.project_id = "PROJECT_ID" region = "LOCATION_ID" service_account = "SERVICE_ACCOUNT" cron_schedule = "SCHEDULE" workflow_args = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT", "ADDITIONAL_CONNECTOR_ARGS": [ ADDITIONAL_CONNECTOR_ARGUMENTS ], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "SPARK_DRIVER_TYPE": "PYSPARK", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "NETWORK_TAGS": [NETWORK_TAGS], "NETWORK_URI": "NETWORK_URI", "SUBNETWORK_URI": "SUBNETWORK_URI" } workflow_source = <<EOF main: params: [args] steps: - init: assign: - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")} - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")} - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")} - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])} - NETWORK_TYPE: "networkUri" - check_networking: switch: - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""} raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one." - condition: $${NETWORK_URI != ""} steps: - submit_extract_job_with_network_uri: assign: - NETWORKING: $${NETWORK_URI} - NETWORK_TYPE: "networkUri" - condition: $${SUBNETWORK_URI != ""} steps: - submit_extract_job_with_subnetwork_uri: assign: - NETWORKING: $${SUBNETWORK_URI} - NETWORK_TYPE: "subnetworkUri" next: check_create_target_entry_group - check_create_target_entry_group: switch: - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true} next: create_target_entry_group - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false} next: generate_extract_job_link - create_target_entry_group: call: http.post args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: generate_extract_job_link - generate_extract_job_link: call: sys.log args: data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID} severity: "INFO" next: submit_pyspark_extract_job - submit_pyspark_extract_job: call: http.post args: url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: $${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py args: - $${"--target_project_id=" + args.TARGET_PROJECT_ID} - $${"--target_location_id=" + args.CLOUD_REGION} - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - $${"--output_folder=" + WORKFLOW_ID} - $${args.ADDITIONAL_CONNECTOR_ARGS} runtimeConfig: containerImage: $${args.CUSTOM_CONTAINER_IMAGE} environmentConfig: executionConfig: serviceAccount: $${args.SERVICE_ACCOUNT} stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID} $${NETWORK_TYPE}: $${NETWORKING} networkTags: $${NETWORK_TAGS} result: RESPONSE_MESSAGE next: check_pyspark_extract_job - check_pyspark_extract_job: call: http.get args: url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: PYSPARK_EXTRACT_JOB_STATUS next: check_pyspark_extract_job_done - check_pyspark_extract_job_done: switch: - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"} next: generate_import_logs_link - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"} raise: $${PYSPARK_EXTRACT_JOB_STATUS} - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"} raise: $${PYSPARK_EXTRACT_JOB_STATUS} next: pyspark_extract_job_wait - pyspark_extract_job_wait: call: sys.sleep args: seconds: 30 next: check_pyspark_extract_job - generate_import_logs_link: call: sys.log args: data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"} severity: "INFO" next: submit_import_job - submit_import_job: call: http.post args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")} scope: entry_groups: - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID} entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES} aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES} result: IMPORT_JOB_RESPONSE next: get_job_start_time - get_job_start_time: assign: - importJobStartTime: $${sys.now()} next: import_job_startup_wait - import_job_startup_wait: call: sys.sleep args: seconds: 30 next: initial_get_import_job - initial_get_import_job: call: http.get args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_status_available - check_import_job_status_available: switch: - condition: $${"status" in IMPORT_JOB_STATUS.body} next: check_import_job_done - condition: $${sys.now() - importJobStartTime > 300} # 5 minutes = 300 seconds next: kill_import_job next: import_job_status_wait - import_job_status_wait: call: sys.sleep args: seconds: 30 next: check_import_job_status_available - check_import_job_done: switch: - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"} next: the_end - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"} raise: $${IMPORT_JOB_STATUS} - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"} raise: $${IMPORT_JOB_STATUS} - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"} raise: $${IMPORT_JOB_STATUS} - condition: $${sys.now() - importJobStartTime > 43200} # 12 hours = 43200 seconds next: kill_import_job next: import_job_wait - get_import_job: call: http.get args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_done - import_job_wait: call: sys.sleep args: seconds: 30 next: get_import_job - kill_import_job: call: http.post args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: get_killed_import_job - get_killed_import_job: call: http.get args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: KILLED_IMPORT_JOB_STATUS next: killed - killed: raise: $${KILLED_IMPORT_JOB_STATUS} - the_end: return: $${IMPORT_JOB_STATUS} EOF
Remplacez les éléments suivants :
PROJECT_ID
: nom du projet Google Cloud cible dans lequel importer les métadonnées.LOCATION_ID
: la cible Google Cloud emplacement où sont stockées les données des jobs d'importation seront exécutés et des métadonnées dans lesquelles elles seront importées.SERVICE_ACCOUNT
: compte de service que vous avez configuré dans Rôles requis de ce document.SCHEDULE
: expression Cron qui définit le calendrier à exécuter dans le pipeline.ENTRY_GROUP_ID
: ID du groupe d'entrées dans lequel importer les métadonnées. L'ID du groupe d'entrée peut contenir des lettres minuscules, des chiffres et des traits d'union.Le nom de ressource complet de ce groupe d'entrées est
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
CREATE_ENTRY_GROUP_BOOLEAN
: si vous souhaitez que le pipeline crée le groupe d'entrée s'il n'existe pas déjà dans votre projet, définissez cette valeur surtrue
.BUCKET_ID
: nom du bucket Cloud Storage dans lequel stocker le fichier d'importation des métadonnées généré par le connecteur. Chaque exécution de workflow crée .ADDITIONAL_CONNECTOR_ARGUMENTS
: liste d'arguments de connecteur supplémentaires que vous utilisez pour définir le workflow. Pour obtenir des exemples, consultez Développez un connecteur personnalisé pour l'importation des métadonnées. Placez chaque argument entre guillemets et séparez les arguments par des virgules.CONTAINER_IMAGE
: image de conteneur personnalisé du connecteur hébergé dans Artifact Registry.ENTRY_TYPES
: liste des types d'entrées pouvant être importés, au formatprojects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
.LOCATION_ID
doit correspondre à l'emplacement Google Cloud dans lequel vous importez les métadonnées ou àglobal
.ASPECT_TYPES
: liste des types d'aspects pouvant être importés, au formatprojects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
.LOCATION_ID
doit être soit le même emplacement Google Cloud dans lequel vous importez les métadonnées, soitglobal
.NETWORK_TAGS
(facultatif): liste des tags réseau.NETWORK_URI
(facultatif): URI du réseau VPC qui se connecte à la source de données. Si vous indiquez un réseau, omettez l'argument "subnetwork".SUBNETWORK_URI
(facultatif) : URI du sous-réseau qui se connecte à la source de données. Si vous fournissez un sous-réseau, omettez l'argument de réseau.
Initialisez Terraform :
terraform init
Validez Terraform avec votre fichier
.tfvars
:terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
Remplacez
CONNECTOR_VARIABLES_FILE
par le nom. du fichier de définitions de variables.Déployez Terraform avec votre fichier
.tfvars
:terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
Terraform crée un workflow et une tâche Cloud Scheduler dans le projet spécifié. Workflows exécute le pipeline au niveau que vous spécifiez.
Selon la quantité de métadonnées que vous importez, le workflow peut peut prendre plusieurs minutes ou plus. Pour savoir comment voir la progression, voir Accédez aux résultats de l'exécution du workflow.
Une fois l'exécution du pipeline terminée, vous pouvez rechercher les métadonnées importées dans le catalogue Dataplex.
Afficher les journaux de jobs
Utiliser Cloud Logging pour afficher les journaux d'un pipeline de connectivité géré La charge utile de journalisation inclut un lien vers les journaux de la tâche par lot Dataproc sans serveur et de la tâche d'importation de métadonnées, le cas échéant. Pour en savoir plus, consultez Affichez les journaux de workflow.
Dépannage
Suivez les suggestions de dépannage ci-dessous :
- Configurez le niveau de journalisation de la tâche d'importation pour la tâche de métadonnées afin qu'elle utilise la journalisation au niveau du débogage au lieu de la journalisation au niveau des informations.
- Examinez les journaux du job par lot Dataproc sans serveur (pour exécutions du connecteur) et le job d'importation de métadonnées. Pour en savoir plus, consultez les pages Interroger les journaux Dataproc sans serveur pour Spark et Interroger les journaux de tâches de métadonnées.
- Si une entrée ne peut pas être importée à l'aide du pipeline et que le message d'erreur ne fournir suffisamment d'informations, essayez de créer une entrée personnalisée avec les mêmes détails, dans un groupe d'entrées de test. Pour en savoir plus, consultez la section Créer une entrée personnalisée.
Étape suivante
- Présentation du catalogue Dataplex.
- Développer un connecteur personnalisé pour l'importation de métadonnées