Questo documento descrive come importare i metadati da un'origine di terze parti in Dataplex eseguendo una pipeline di connettività gestita in Workflows.
Per configurare una pipeline di connettività gestita, devi creare un connettore per origine dati. Poi esegui la pipeline in Workflows. La la pipeline estrae i metadati dall'origine dati e poi li importa in Dataplex. Se necessario, la pipeline crea anche Gruppi di voci Dataplex Catalog nel tuo progetto Google Cloud.
Per ulteriori informazioni sulla connettività gestita, vedi Panoramica della connettività gestita.
Prima di iniziare
Prima di importare i metadati, completa le attività in questa sezione.
Crea un connettore
Un connettore estrae i metadati dall'origine dati e genera un file di importazione dei metadati che può essere importato da Dataplex. Il connettore è un'immagine Artifact Registry che può essere eseguita su Dataproc Serverless.
Crea un connettore personalizzato che estragga i metadati dall'origine di terze parti.
Per un connettore di esempio che puoi usare come modello di riferimento per creare il tuo connettore, consulta Sviluppa un connettore personalizzato per l'importazione dei metadati.
Configura le risorse Google Cloud
-
Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.
Se non prevedi di eseguire la pipeline in base a una pianificazione, non è necessario abilitare l'API Cloud Scheduler.
Crea secret in Secret Manager per archiviare le credenziali per l'origine dati di terze parti.
Configura la tua rete Virtual Private Cloud (VPC) per eseguire Dataproc Serverless per i carichi di lavoro Spark.
Crea un bucket Cloud Storage per memorizzare i file di importazione dei metadati.
Crea le seguenti risorse Dataplex Catalog:
Crea tipi di aspetti personalizzati per le voci da importare.
Crea tipi di voci personalizzate per le voci da importare.
Ruoli obbligatori
Un account di servizio rappresenta l'identità di un flusso di lavoro e determina le autorizzazioni di cui dispone e le risorse Google Cloud a cui può accedere. È necessario un account di servizio per Workflows (per eseguire pipeline) e per Dataproc serverless (per eseguire il connettore).
Puoi utilizzare l'account di servizio predefinito di Compute Engine
(PROJECT_NUMBER-compute@developer.gserviceaccount.com
) o creare il tuo account di servizio
(o i tuoi account) per eseguire la pipeline di connettività gestita.
Console
Nella console Google Cloud, vai alla pagina IAM.
Seleziona il progetto in cui vuoi importare i metadati.
Fai clic su
Concedi accesso, poi inserisci l'indirizzo email dell'account di servizio.Assegna i ruoli seguenti all'account di servizio:
- Writer log
- Dataplex Entry Group Owner
- Dataplex Metadata Job Owner
- Editor Dataplex Catalog
- Editor Dataproc
- Worker Dataproc
- Funzione di accesso ai secret di Secret Manager: sul secret in cui sono archiviati le credenziali dell'origine dati
- Storage Object User (Utente oggetto archiviazione) nel bucket Cloud Storage
- Artifact Registry Reader: nel repository Artifact Registry che contiene l'immagine del connettore
- Utente account di servizio: se utilizzi account di servizio diversi, conceda questo ruolo all'account di servizio che esegue i flussi di lavoro nell'account di servizio che esegue i job batch Dataproc Serverless
- Invoker di Workflows: se vuoi pianificare la pipeline
Salva le modifiche.
gcloud
Concedi i ruoli all'account di servizio. Esegui questi comandi:
gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/logging.logWriter gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/dataplex.entryGroupOwner gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/dataplex.metadataJobOwner gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/dataplex.catalogEditor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/dataproc.editor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/dataproc.worker
Sostituisci quanto segue:
- PROJECT_ID: l'ID del progetto Google Cloud di destinazione in cui importare i metadati.
- SERVICE_ACCOUNT: l'account di servizio, ad esempio
my-service-account@my-project.iam.gserviceaccount.com
.
Concedi all'account di servizio i seguenti ruoli a livello di risorsa:
gcloud secrets add-iam-policy-binding SECRET_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/secretmanager.secretaccessor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT" \ --role=roles/storage.objectUser \ --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID') gcloud artifacts repositories add-iam-policy-binding REPOSITORY \ --location=REPOSITORY_LOCATION \ --member=SERVICE_ACCOUNT} \ --role=roles/artifactregistry.reader
Sostituisci quanto segue:
- SECRET_ID: l'ID del segreto che memorizza
le credenziali per l'origine dati. Utilizza il formato
projects/PROJECT_ID/secrets/SECRET_ID
. - BUCKET_ID: il nome del bucket Cloud Storage.
- REPOSITORY: il repository Artifact Registry che contiene l'immagine del connettore.
- REPOSITORY_LOCATION: la località Google Cloud in cui è ospitato il repository.
- SECRET_ID: l'ID del segreto che memorizza
le credenziali per l'origine dati. Utilizza il formato
Concedi all'account di servizio che esegue i flussi di lavoro il ruolo
roles/iam.serviceAccountUser
nell'account di servizio che esegue i job batch Dataproc Serverless. Devi concedere questo ruolo anche se utilizzi lo stesso account di servizio sia per Workflows sia per Dataproc Serverless.gcloud iam service-accounts add-iam-policy-binding \ serviceAccount:SERVICE_ACCOUNT \ --member='SERVICE_ACCOUNT' \ --role='roles/iam.serviceAccountUser'
Se utilizzi account di servizio diversi, il valore del flag
--member
è l'account di servizio che esegue i job batch Dataproc Serverless.Se vuoi pianificare la pipeline, concedi all'account di servizio il seguente ruolo:
gcloud projects add-iam-policy-binding PROJECT_ID \ --member="SERVICE_ACCOUNT" \ --role=roles/workflows.invoker
Importa metadati
Per importare i metadati, crea ed esegui un flusso di lavoro che esegua la pipeline di connettività gestita. Facoltativamente, puoi anche creare una pianificazione per l'esecuzione della pipeline.
Console
Crea il flusso di lavoro. Fornisci le seguenti informazioni:
- Account di servizio: l'account di servizio che hai configurato nella Sezione Ruoli obbligatori di questo documento.
Crittografia: seleziona Chiave di crittografia gestita da Google.
Definisci flusso di lavoro: fornisci il seguente file di definizione:
main: params: [args] steps: - init: assign: - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")} - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")} - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")} - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])} - NETWORK_TYPE: "networkUri" - check_networking: switch: - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""} raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one." - condition: ${NETWORK_URI != ""} steps: - submit_extract_job_with_network_uri: assign: - NETWORKING: ${NETWORK_URI} - NETWORK_TYPE: "networkUri" - condition: ${SUBNETWORK_URI != ""} steps: - submit_extract_job_with_subnetwork_uri: assign: - NETWORKING: ${SUBNETWORK_URI} - NETWORK_TYPE: "subnetworkUri" next: check_create_target_entry_group - check_create_target_entry_group: switch: - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true} next: create_target_entry_group - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false} next: generate_extract_job_link - create_target_entry_group: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: generate_extract_job_link - generate_extract_job_link: call: sys.log args: data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID} severity: "INFO" next: submit_pyspark_extract_job - submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py args: - ${"--target_project_id=" + args.TARGET_PROJECT_ID} - ${"--target_location_id=" + args.CLOUD_REGION} - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--output_folder=" + WORKFLOW_ID} - ${args.ADDITIONAL_CONNECTOR_ARGS} runtimeConfig: containerImage: ${args.CUSTOM_CONTAINER_IMAGE} environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID} ${NETWORK_TYPE}: ${NETWORKING} networkTags: ${NETWORK_TAGS} result: RESPONSE_MESSAGE next: check_pyspark_extract_job - check_pyspark_extract_job: call: http.get args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: PYSPARK_EXTRACT_JOB_STATUS next: check_pyspark_extract_job_done - check_pyspark_extract_job_done: switch: - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"} next: generate_import_logs_link - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"} raise: ${PYSPARK_EXTRACT_JOB_STATUS} - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"} raise: ${PYSPARK_EXTRACT_JOB_STATUS} next: pyspark_extract_job_wait - pyspark_extract_job_wait: call: sys.sleep args: seconds: 30 next: check_pyspark_extract_job - generate_import_logs_link: call: sys.log args: data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"} severity: "INFO" next: submit_import_job - submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")} scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID} entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES} aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES} result: IMPORT_JOB_RESPONSE next: get_job_start_time - get_job_start_time: assign: - importJobStartTime: ${sys.now()} next: import_job_startup_wait - import_job_startup_wait: call: sys.sleep args: seconds: 30 next: initial_get_import_job - initial_get_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_status_available - check_import_job_status_available: switch: - condition: ${"status" in IMPORT_JOB_STATUS.body} next: check_import_job_done - condition: ${sys.now() - importJobStartTime > 300} # 5 minutes = 300 seconds next: kill_import_job next: import_job_status_wait - import_job_status_wait: call: sys.sleep args: seconds: 30 next: check_import_job_status_available - check_import_job_done: switch: - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"} next: the_end - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"} raise: ${IMPORT_JOB_STATUS} - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"} raise: ${IMPORT_JOB_STATUS} - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"} raise: ${IMPORT_JOB_STATUS} - condition: ${sys.now() - importJobStartTime > 43200} # 12 hours = 43200 seconds next: kill_import_job next: import_job_wait - get_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_done - import_job_wait: call: sys.sleep args: seconds: 30 next: get_import_job - kill_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: get_killed_import_job - get_killed_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: KILLED_IMPORT_JOB_STATUS next: killed - killed: raise: ${KILLED_IMPORT_JOB_STATUS} - the_end: return: ${IMPORT_JOB_STATUS}
Per eseguire la pipeline on demand, il flusso di lavoro.
Fornisci i seguenti argomenti di runtime:
{ "TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT", "ADDITIONAL_CONNECTOR_ARGS": [ ADDITIONAL_CONNECTOR_ARGUMENTS ], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "SPARK_DRIVER_TYPE": "PYSPARK", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [NETWORK_TAGS], "NETWORK_URI": "NETWORK_URI", "SUBNETWORK_URI": "SUBNETWORK_URI" }
Sostituisci quanto segue:
- PROJECT_ID: il nome del target Progetto Google Cloud in cui importare i metadati.
- LOCATION_ID: il target Google Cloud in cui Dataproc serverless e dei metadati verranno eseguiti job di importazione e verranno importati i metadati.
ENTRY_GROUP_ID: l'ID del gruppo di voci in cui eseguire l'importazione dei metadati. L'ID gruppo di voci può contenere lettere minuscole lettere, numeri e trattini.
Il nome completo della risorsa di questo gruppo di voci è
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
.CREATE_ENTRY_GROUP_BOOLEAN: se vuoi che la pipeline crei il gruppo di voci se non esiste già nel tuo progetto, imposta questo valore su
true
.BUCKET_ID: il nome dell'istanza di Cloud Storage per archiviare il file di importazione dei metadati generato di rete. Ogni esecuzione del flusso di lavoro crea una nuova cartella.
SERVICE_ACCOUNT: l'account di servizio. L'account servizio esegue il connettore in Dataproc Serverless.
ADDITIONAL_CONNECTOR_ARGUMENTS: un elenco di argomenti aggiuntivi da passare al connettore. Per alcuni esempi, vedi Sviluppa un connettore personalizzato per l'importazione dei metadati. Racchiudi ogni argomento tra virgolette doppie e separa gli argomenti con virgole.
CONTAINER_IMAGE: l'immagine container personalizzata del connettore ospitato in Artifact Registry.
ENTRY_TYPES: un elenco dei tipi di voce che sono nell'ambito dell'importazione, nel formato
projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
.LOCATION_ID
deve essere la stessa posizione Google Cloud in cui importi i metadati oglobal
.ASPECT_TYPES: un elenco di tipi di aspetto che sono nell'ambito dell'importazione, nel formato
projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
.LOCATION_ID
deve essere lo stesso Località Google Cloud in cui importi i metadati, ovveroglobal
.NETWORK_TAGS (facoltativo): un elenco di tag di rete.
NETWORK_URI (facoltativo): l'URI della rete VPC che si connette all'origine dati. Se fornisci una rete, ometti l'argomento subnet.
SUBNETWORK_URI (facoltativo): l'URI della sottorete che si connette all'origine dati. Se fornisci una sottorete, ometti l'argomento network.
A seconda della quantità di metadati importati, l'esecuzione della pipeline potrebbe richiedere diversi minuti o più. Per ulteriori informazioni come vedere l'avanzamento, vedere Accedi ai risultati di esecuzione del flusso di lavoro.
Al termine dell'esecuzione della pipeline, puoi cercare i metadati importati in Dataplex Catalog.
(Facoltativo) Se vuoi eseguire la pipeline in base a una pianificazione, crea una pianificazione utilizzando Cloud Scheduler. Fornisci le seguenti informazioni:
- Frequenza: un'espressione unix-cron che definisce la pianificazione per per eseguire la pipeline.
- Argomento flusso di lavoro: gli argomenti di runtime per il connettore, come descritto nel passaggio precedente.
- Account di servizio: l'account di servizio. Account di servizio gestisce lo scheduler.
gcloud
Salva la seguente definizione del carico di lavoro come file YAML:
main: params: [args] steps: - init: assign: - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")} - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")} - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")} - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])} - NETWORK_TYPE: "networkUri" - check_networking: switch: - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""} raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one." - condition: ${NETWORK_URI != ""} steps: - submit_extract_job_with_network_uri: assign: - NETWORKING: ${NETWORK_URI} - NETWORK_TYPE: "networkUri" - condition: ${SUBNETWORK_URI != ""} steps: - submit_extract_job_with_subnetwork_uri: assign: - NETWORKING: ${SUBNETWORK_URI} - NETWORK_TYPE: "subnetworkUri" next: check_create_target_entry_group - check_create_target_entry_group: switch: - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true} next: create_target_entry_group - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false} next: generate_extract_job_link - create_target_entry_group: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: generate_extract_job_link - generate_extract_job_link: call: sys.log args: data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID} severity: "INFO" next: submit_pyspark_extract_job - submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py args: - ${"--target_project_id=" + args.TARGET_PROJECT_ID} - ${"--target_location_id=" + args.CLOUD_REGION} - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--output_folder=" + WORKFLOW_ID} - ${args.ADDITIONAL_CONNECTOR_ARGS} runtimeConfig: containerImage: ${args.CUSTOM_CONTAINER_IMAGE} environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID} ${NETWORK_TYPE}: ${NETWORKING} networkTags: ${NETWORK_TAGS} result: RESPONSE_MESSAGE next: check_pyspark_extract_job - check_pyspark_extract_job: call: http.get args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: PYSPARK_EXTRACT_JOB_STATUS next: check_pyspark_extract_job_done - check_pyspark_extract_job_done: switch: - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"} next: generate_import_logs_link - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"} raise: ${PYSPARK_EXTRACT_JOB_STATUS} - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"} raise: ${PYSPARK_EXTRACT_JOB_STATUS} next: pyspark_extract_job_wait - pyspark_extract_job_wait: call: sys.sleep args: seconds: 30 next: check_pyspark_extract_job - generate_import_logs_link: call: sys.log args: data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"} severity: "INFO" next: submit_import_job - submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")} scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID} entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES} aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES} result: IMPORT_JOB_RESPONSE next: get_job_start_time - get_job_start_time: assign: - importJobStartTime: ${sys.now()} next: import_job_startup_wait - import_job_startup_wait: call: sys.sleep args: seconds: 30 next: initial_get_import_job - initial_get_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_status_available - check_import_job_status_available: switch: - condition: ${"status" in IMPORT_JOB_STATUS.body} next: check_import_job_done - condition: ${sys.now() - importJobStartTime > 300} # 5 minutes = 300 seconds next: kill_import_job next: import_job_status_wait - import_job_status_wait: call: sys.sleep args: seconds: 30 next: check_import_job_status_available - check_import_job_done: switch: - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"} next: the_end - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"} raise: ${IMPORT_JOB_STATUS} - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"} raise: ${IMPORT_JOB_STATUS} - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"} raise: ${IMPORT_JOB_STATUS} - condition: ${sys.now() - importJobStartTime > 43200} # 12 hours = 43200 seconds next: kill_import_job next: import_job_wait - get_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_done - import_job_wait: call: sys.sleep args: seconds: 30 next: get_import_job - kill_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: get_killed_import_job - get_killed_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: KILLED_IMPORT_JOB_STATUS next: killed - killed: raise: ${KILLED_IMPORT_JOB_STATUS} - the_end: return: ${IMPORT_JOB_STATUS}
Definisci le seguenti variabili Bash:
workflow_name="WORKFLOW_NAME" project_id="PROJECT_ID" location="LOCATION_ID" service_account="SERVICE_ACCOUNT" workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
Sostituisci quanto segue:
- WORKFLOW_NAME: il nome del flusso di lavoro.
- PROJECT_ID: il nome del target Progetto Google Cloud in cui importare i metadati.
- LOCATION_ID: la posizione Google Cloud di destinazione in cui verranno eseguiti i job di importazione dei metadati e di Dataproc Serverless e in cui verranno importati i metadati.
- SERVICE_ACCOUNT: l'account di servizio configurato nella sezione Ruoli richiesti di questo documento.
- WORKFLOW_DEFINITION_FILE: il flusso di lavoro del file YAML di definizione.
-
gcloud workflows deploy ${workflow_name} \ --project=${project_id} \ --location=${location} \ --source=${workflow_source} \ --service-account=${service_account}
Per eseguire la pipeline on demand, esegui il flusso di lavoro:
gcloud workflows run ${workflow_name} --project=${project_id} --location=${location} --data "$(cat << EOF { "TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT", "ADDITIONAL_CONNECTOR_ARGS": [ ADDITIONAL_CONNECTOR_ARGUMENTS ], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "SPARK_DRIVER_TYPE": "PYSPARK", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [NETWORK_TAGS], "NETWORK_URI": "NETWORK_URI", "SUBNETWORK_URI": "SUBNETWORK_URI" } EOF )"
Sostituisci quanto segue:
ENTRY_GROUP_ID: l'ID del gruppo di voci in cui eseguire l'importazione dei metadati. L'ID gruppo di voci può contenere lettere minuscole lettere, numeri e trattini.
Il nome completo della risorsa di questo gruppo di voci è
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
.CREATE_ENTRY_GROUP_BOOLEAN: se vuoi che la pipeline crei il gruppo di voci se non esiste già nel tuo progetto, imposta questo valore su
true
.BUCKET_ID: il nome dell'istanza di Cloud Storage per archiviare il file di importazione dei metadati generato di rete. Ogni esecuzione del flusso di lavoro crea una nuova cartella.
ADDITIONAL_CONNECTOR_ARGUMENTS: un l'elenco di argomenti aggiuntivi da passare al connettore. Per alcuni esempi, vedi Sviluppa un connettore personalizzato per l'importazione dei metadati. Racchiudi ogni argomento tra virgolette doppie e separa gli argomenti con virgole.
CONTAINER_IMAGE: l'immagine container personalizzata del connettore ospitato in Artifact Registry.
ENTRY_TYPES: un elenco dei tipi di voce che sono nell'ambito dell'importazione, nel formato
projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
.LOCATION_ID
deve essere la stessa posizione Google Cloud in cui importi i metadati oglobal
.ASPECT_TYPES: un elenco di tipi di aspetto che sono nell'ambito dell'importazione, nel formato
projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
.LOCATION_ID
deve essere lo stesso Località Google Cloud in cui importi i metadati, ovveroglobal
.NETWORK_TAGS (facoltativo): un elenco di tag di rete.
NETWORK_URI (facoltativo): l'URI della rete VPC che si connette all'origine dati. Se fornisci una rete, ometti l'argomento subnet.
SUBNETWORK_URI (facoltativo): l'URI della sottorete che si connette all'origine dati. Se fornisci un parametro subnet, ometti l'argomento di rete.
A seconda della quantità di metadati importati, l'esecuzione del flusso di lavoro potrebbe richiedere diversi minuti o più. Per ulteriori informazioni su come visualizzare l'avanzamento, consulta Accedere ai risultati di esecuzione del flusso di lavoro.
Al termine dell'esecuzione della pipeline, puoi cercare i metadati importati in Dataplex Catalog.
(Facoltativo) Se vuoi eseguire la pipeline in base a una pianificazione, crea una pianificazione utilizzando Cloud Scheduler:
gcloud scheduler jobs create http ${workflow_name}-scheduler \ --project=${project_id} \ --location=${region} \ --schedule="SCHEDULE" \ --time-zone="TIME_ZONE" \ --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \ --http-method="POST" \ --oauth-service-account-email=${service_account} \ --headers="Content-Type=application/json" \ --message-body="{\"argument\": \"DOUBLE_ESCAPED_JSON_STRING\"}" \
Sostituisci quanto segue:
- SCHEDULE: un'espressione cron che definisce per eseguire la pipeline.
- TIME_ZONE: il fuso orario, ad esempio
UTC
. - DOUBLE_ESCAPED_JSON_STRING: una codifica JSON
degli argomenti del flusso di lavoro. Le virgolette doppie all'interno della
stringa tra virgolette vengono indicate con le barre inverse (\). Ad esempio:
--message-body="{\"argument\": \"{\\\"foo\\\": \\\"bar\\\"}\"}"
Terraform
Salva i seguenti file:
Salva con nome
main.tf
:module "cloud_workflows" { source = "GoogleCloudPlatform/cloud-workflows/google" version = "0.1.1" workflow_name = var.workflow_name project_id = var.project_id region = var.region service_account_email = var.service_account workflow_trigger = { cloud_scheduler = { name = "${var.workflow_name}-scheduler" cron = "0 0 * * *" time_zone = "UTC" service_account_email = var.service_account deadline = "1800s" argument = jsonencode(var.workflow_args) } } workflow_source = var.workflow_source }
Salva come
variables.tf
:variable "project_id" { default = "" } variable "region" { default = "" } variable "service_account" { default = "" } variable "workflow_name" { default = "managed-orchestration-for-dataplex" } variable "description" { default = "Submits a Dataproc Serverless Job and then runs a Dataplex Import Job. Times out after 12 hours." } variable "workflow_args" { default = {} } variable "cron_schedule" { default = "0 0 * * *" } variable "workflow_source" { default = "" }
Salva il seguente file di definizioni delle variabili come file
.tfvars
. Sostituisci i segnaposto con le informazioni del connettore.project_id = "PROJECT_ID" region = "LOCATION_ID" service_account = "SERVICE_ACCOUNT" cron_schedule = "SCHEDULE" workflow_args = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT", "ADDITIONAL_CONNECTOR_ARGS": [ ADDITIONAL_CONNECTOR_ARGUMENTS ], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "SPARK_DRIVER_TYPE": "PYSPARK", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "NETWORK_TAGS": [NETWORK_TAGS], "NETWORK_URI": "NETWORK_URI", "SUBNETWORK_URI": "SUBNETWORK_URI" } workflow_source = <<EOF main: params: [args] steps: - init: assign: - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")} - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")} - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")} - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])} - NETWORK_TYPE: "networkUri" - check_networking: switch: - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""} raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one." - condition: $${NETWORK_URI != ""} steps: - submit_extract_job_with_network_uri: assign: - NETWORKING: $${NETWORK_URI} - NETWORK_TYPE: "networkUri" - condition: $${SUBNETWORK_URI != ""} steps: - submit_extract_job_with_subnetwork_uri: assign: - NETWORKING: $${SUBNETWORK_URI} - NETWORK_TYPE: "subnetworkUri" next: check_create_target_entry_group - check_create_target_entry_group: switch: - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true} next: create_target_entry_group - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false} next: generate_extract_job_link - create_target_entry_group: call: http.post args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: generate_extract_job_link - generate_extract_job_link: call: sys.log args: data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID} severity: "INFO" next: submit_pyspark_extract_job - submit_pyspark_extract_job: call: http.post args: url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: $${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py args: - $${"--target_project_id=" + args.TARGET_PROJECT_ID} - $${"--target_location_id=" + args.CLOUD_REGION} - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - $${"--output_folder=" + WORKFLOW_ID} - $${args.ADDITIONAL_CONNECTOR_ARGS} runtimeConfig: containerImage: $${args.CUSTOM_CONTAINER_IMAGE} environmentConfig: executionConfig: serviceAccount: $${args.SERVICE_ACCOUNT} stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID} $${NETWORK_TYPE}: $${NETWORKING} networkTags: $${NETWORK_TAGS} result: RESPONSE_MESSAGE next: check_pyspark_extract_job - check_pyspark_extract_job: call: http.get args: url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: PYSPARK_EXTRACT_JOB_STATUS next: check_pyspark_extract_job_done - check_pyspark_extract_job_done: switch: - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"} next: generate_import_logs_link - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"} raise: $${PYSPARK_EXTRACT_JOB_STATUS} - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"} raise: $${PYSPARK_EXTRACT_JOB_STATUS} next: pyspark_extract_job_wait - pyspark_extract_job_wait: call: sys.sleep args: seconds: 30 next: check_pyspark_extract_job - generate_import_logs_link: call: sys.log args: data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"} severity: "INFO" next: submit_import_job - submit_import_job: call: http.post args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")} scope: entry_groups: - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID} entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES} aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES} result: IMPORT_JOB_RESPONSE next: get_job_start_time - get_job_start_time: assign: - importJobStartTime: $${sys.now()} next: import_job_startup_wait - import_job_startup_wait: call: sys.sleep args: seconds: 30 next: initial_get_import_job - initial_get_import_job: call: http.get args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_status_available - check_import_job_status_available: switch: - condition: $${"status" in IMPORT_JOB_STATUS.body} next: check_import_job_done - condition: $${sys.now() - importJobStartTime > 300} # 5 minutes = 300 seconds next: kill_import_job next: import_job_status_wait - import_job_status_wait: call: sys.sleep args: seconds: 30 next: check_import_job_status_available - check_import_job_done: switch: - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"} next: the_end - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"} raise: $${IMPORT_JOB_STATUS} - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"} raise: $${IMPORT_JOB_STATUS} - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"} raise: $${IMPORT_JOB_STATUS} - condition: $${sys.now() - importJobStartTime > 43200} # 12 hours = 43200 seconds next: kill_import_job next: import_job_wait - get_import_job: call: http.get args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_done - import_job_wait: call: sys.sleep args: seconds: 30 next: get_import_job - kill_import_job: call: http.post args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: get_killed_import_job - get_killed_import_job: call: http.get args: url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: KILLED_IMPORT_JOB_STATUS next: killed - killed: raise: $${KILLED_IMPORT_JOB_STATUS} - the_end: return: $${IMPORT_JOB_STATUS} EOF
Sostituisci quanto segue:
PROJECT_ID
: il nome del progetto Google Cloud di destinazione in cui eseguire l'importazione dei metadati.LOCATION_ID
: il target Google Cloud in cui Dataproc serverless e dei metadati verranno eseguiti job di importazione e verranno importati i metadati.SERVICE_ACCOUNT
: l'account di servizio che hai configurato nella Sezione Ruoli obbligatori di questo documento.SCHEDULE
: un'espressione cron che definisce la pianificazione da eseguire della pipeline.ENTRY_GROUP_ID
: l'ID del gruppo di voci in cui importare i metadati. L'ID gruppo di voci può contenere lettere minuscole, numeri e trattini.Il nome completo della risorsa di questo gruppo di voci è
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
.CREATE_ENTRY_GROUP_BOOLEAN
: se vuoi che la pipeline crei gruppo di voci, se non esiste già nel progetto, imposta questo valore sutrue
.BUCKET_ID
: il nome del bucket Cloud Storage in cui archiviare il file di importazione dei metadati generato dal connettore. Ogni esecuzione del flusso di lavoro crea un nuovo .ADDITIONAL_CONNECTOR_ARGUMENTS
: un elenco di argomenti aggiuntivi del connettore che utilizzi per definire il flusso di lavoro. Per alcuni esempi, vedi Sviluppa un connettore personalizzato per l'importazione dei metadati. Racchiudi ogni argomento tra virgolette doppie e separa gli argomenti con virgole.CONTAINER_IMAGE
: l'immagine container personalizzata del connettore ospitata in Artifact Registry.ENTRY_TYPES
: un elenco dei tipi di voce che rientrano nell'ambito dell'importazione, in il formatoprojects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
LOCATION_ID
deve essere la stessa località Google Cloud che importi oglobal
.ASPECT_TYPES
: un elenco di tipi di aspetti che rientrano nell'ambito dell'importazione, nel formatoprojects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
.LOCATION_ID
deve essere la stessa località Google Cloud che importi oglobal
.NETWORK_TAGS
(facoltativo): un elenco di tag di rete.- (Facoltativo)
NETWORK_URI
: l'URI della rete VPC che si connette all'origine dati. Se fornisci una rete, ometti l'argomento subnet. SUBNETWORK_URI
(facoltativo): l'URI della sottorete che si connette all'origine dati. Se fornisci una sottorete, ometti l'argomento network.
Inizializza Terraform:
terraform init
Convalida Terraform con il file
.tfvars
:terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
Sostituisci
CONNECTOR_VARIABLES_FILE
con il nome del file delle definizioni delle variabili.Esegui il deployment di Terraform con il tuo file
.tfvars
:terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
Terraform crea un flusso di lavoro e un job Cloud Scheduler progetto specificato. Workflows esegue la pipeline la pianificazione specificata dall'utente.
A seconda della quantità di metadati importati, l'esecuzione del flusso di lavoro potrebbe richiedere diversi minuti o più. Per ulteriori informazioni su come visualizzare l'avanzamento, vedere Accedi ai risultati di esecuzione del flusso di lavoro.
Al termine dell'esecuzione della pipeline, puoi cercare i metadati importati in Dataplex Catalog.
Visualizza i log dei job
Utilizza Cloud Logging per visualizzare i log di una pipeline di connettività gestita. Il log Il payload include un link ai log per Dataproc Serverless dal job batch e dal job di importazione dei metadati, se pertinente. Per ulteriori informazioni, consulta Visualizzare i log del flusso di lavoro.
Risoluzione dei problemi
Segui questi suggerimenti per la risoluzione dei problemi:
- Configura il livello di log del job di importazione per il job di metadati in modo da utilizzare il logging a livello di debug anziché il logging a livello di informazioni.
- Esamina i log del job batch Dataproc Serverless (per le esecuzioni del connettore) e del job di importazione dei metadati. Per ulteriori informazioni, vedi Eseguire query su Dataproc Serverless per i log Spark e Esegui query sui log dei job di metadati.
- Se una voce non può essere importata utilizzando la pipeline e il messaggio di errore non informazioni sufficienti, prova a creare una voce personalizzata con gli stessi dettagli in un gruppo di voci di test. Per ulteriori informazioni, consulta Creare una voce personalizzata.
Passaggi successivi
- Panoramica di Dataplex Catalog
- Sviluppare un connettore personalizzato per l'importazione dei metadati