Importa metadatos desde una fuente personalizada con Workflows

Este documento describe cómo importar metadatos de una fuente externa a Dataplex ejecutando una canalización de conectividad administrada en Workflows.

Para configurar una canalización de conectividad administrada, compilas un conector para tu fuente de datos. Luego, ejecuta la canalización en Workflows. El extrae los metadatos de tu fuente de datos y, luego, los importa en Dataplex. Si es necesario, la canalización también crea grupos de entradas de Dataplex Catalog en tu proyecto de Google Cloud.

Para obtener más información sobre la conectividad administrada, consulta Descripción general de la conectividad administrada.

Antes de comenzar

Antes de importar metadatos, completa las tareas de esta sección.

Compila un conector

Un conector extrae los metadatos de tus datos fuente y genera un archivo de importación de metadatos que puede importar Dataplex El conector es una imagen de Artifact Registry en la que se puede ejecutar Dataproc sin servidores.

Configurar recursos de Google Cloud

  1. Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.

    Enable the APIs

    Si no planeas ejecutar la canalización según un programa, no es necesario habilitar la API de Cloud Scheduler

  2. Crea secretos en Secret Manager para almacenar las credenciales de tu fuente de datos de terceros.

  3. Configura tu red de nube privada virtual (VPC) para ejecutar cargas de trabajo de Dataproc Serverless para Spark.

  4. Crea un bucket de Cloud Storage para almacena los archivos de importación de metadatos.

  5. Crea los siguientes recursos de Dataplex Catalog:

    1. Cómo crear tipos de aspectos personalizados para las entradas que quieres importar.

    2. Cómo crear tipos de entradas personalizados para las entradas que quieres importar.

Roles obligatorios

Una cuenta de servicio representa la identidad de un flujo de trabajo y determina los permisos que tiene y a qué recursos de Google Cloud puede acceder. Necesitas una cuenta de servicio para Workflows (para ejecutar canalización) y Dataproc Serverless (para ejecutar el conector).

Puedes usar la cuenta de servicio predeterminada de Compute Engine (PROJECT_NUMBER-compute@developer.gserviceaccount.com) o crea tu propia cuenta de servicio (o cuentas) para ejecutar la canalización de conectividad administrada.

Console

  1. En la consola de Google Cloud, ve a la página IAM.

    Ir a IAM

  2. Selecciona el proyecto al que deseas importar los metadatos.

  3. Haz clic en Otorgar acceso. y, luego, ingresa la dirección de correo electrónico de la cuenta de servicio.

  4. Asigna los siguientes roles a la cuenta de servicio:

    • Escritor de registros
    • Propietario del grupo de entradas de Dataplex
    • Propietario de trabajos de metadatos de Dataplex
    • Editor del catálogo de Dataplex
    • Editor de Dataproc
    • Trabajador de Dataproc
    • Administrador y descriptor de acceso a secretos de Secret Manager: En el secreto que almacena las credenciales de tu fuente de datos
    • Usuario de objetos de Storage: En el bucket de Cloud Storage
    • Artifact Registry Reader: En el repositorio de Artifact Registry que contiene la imagen del conector
    • Usuario de cuenta de servicio: Si usas cuentas de servicio diferentes, otorgar este rol a la cuenta de servicio que ejecuta Workflows en la cuenta de servicio que ejecuta Dataproc Serverless trabajos por lotes
    • Invocador de flujos de trabajo: si deseas programar la canalización
  5. Guarda los cambios.

gcloud

  1. Otorga roles a la cuenta de servicio. Ejecute los siguientes comandos:

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/logging.logWriter
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.entryGroupOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.metadataJobOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.catalogEditor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.editor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.worker
    

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el nombre del proyecto de Google Cloud de destino al que se importarán los metadatos.
    • SERVICE_ACCOUNT_ID: La cuenta de servicio, como my-service-account@my-project.iam.gserviceaccount.com.
  2. Otorga a la cuenta de servicio los siguientes roles a nivel del recurso:

    gcloud secrets add-iam-policy-binding SECRET_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/secretmanager.secretaccessor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/storage.objectUser \
        --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')
    gcloud artifacts repositories add-iam-policy-binding REPOSITORY \
        --location=REPOSITORY_LOCATION \
        --member=SERVICE_ACCOUNT_ID} \
        --role=roles/artifactregistry.reader
    

    Reemplaza lo siguiente:

    • SECRET_ID: Es el ID del secreto que almacena las credenciales de tu fuente de datos. Usa el formato projects/PROJECT_ID/secrets/SECRET_ID.
    • BUCKET_ID: Es el nombre del bucket de Cloud Storage.
    • REPOSITORY: El repositorio de Artifact Registry que contiene la imagen del conector.
    • REPOSITORY_LOCATION: Google Cloud y la ubicación en la que se aloja el repositorio.
  3. Otorgar a la cuenta de servicio que ejecuta Workflows el rol roles/iam.serviceAccountUser en la cuenta de servicio que ejecuta los trabajos por lotes de Dataproc Serverless Debes otorgar este rol incluso si usas la misma cuenta de servicio para Workflows y Dataproc sin servidores.

    gcloud iam service-accounts add-iam-policy-binding \
        serviceAccount:SERVICE_ACCOUNT_ID \
        --member='SERVICE_ACCOUNT_ID' \
        --role='roles/iam.serviceAccountUser'
    

    Si usas cuentas de servicio diferentes, el valor de la marca --member es la cuenta de servicio que ejecuta Dataproc Serverless trabajos por lotes.

  4. Si deseas programar la canalización, otorga a la cuenta de servicio el siguiente rol:

    gcloud projects add-iam-policy-binding PROJECT_ID \
     --member="SERVICE_ACCOUNT_ID" \
     --role=roles/workflows.invoker
    

Importar metadatos

Para importar metadatos, crea y, luego, ejecuta un flujo de trabajo que ejecute la canalización de conectividad administrada. También puedes crear un programa para ejecutar la canalización.

Console

  1. Crea el flujo de trabajo. Proporcione la siguiente información:

    • Cuenta de servicio: La cuenta de servicio que configuraste en Roles obligatorios de este documento.
    • Encriptación: Selecciona Clave de encriptación administrada por Google.

    • Define el flujo de trabajo: Proporciona el siguiente archivo de definición:

      main:
        params: [args]
        steps:
          - init:
              assign:
              - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
              - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
              - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
              - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
      
          - check_networking:
              switch:
                - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                  raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
                - condition: ${NETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_network_uri:
                        assign:
                          - NETWORKING: ${NETWORK_URI}
                          - NETWORK_TYPE: "networkUri"
                - condition: ${SUBNETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_subnetwork_uri:
                        assign:
                          - NETWORKING: ${SUBNETWORK_URI}
                          - NETWORK_TYPE: "subnetworkUri"
              next: set_default_networking
      
          - set_default_networking:
              assign:
                - NETWORK_TYPE: "networkUri"
                - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
              next: check_create_target_entry_group
      
          - check_create_target_entry_group:
              switch:
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                  next: create_target_entry_group
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                  next: generate_extract_job_link
      
          - create_target_entry_group:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: generate_extract_job_link
      
          - generate_extract_job_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                  severity: "INFO"
              next: submit_pyspark_extract_job
      
          - submit_pyspark_extract_job:
              call: http.post
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                headers:
                  Content-Type: "application/json"
                query:
                  batchId: ${WORKFLOW_ID}
                body:
                  pysparkBatch:
                    mainPythonFileUri: file:///main.py
                    args:
                      - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                      - ${"--target_location_id=" + args.CLOUD_REGION}
                      - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                      - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                      - ${"--output_folder=" + WORKFLOW_ID}
                      - ${args.ADDITIONAL_CONNECTOR_ARGS}
                  runtimeConfig:
                      containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                  environmentConfig:
                      executionConfig:
                          serviceAccount: ${args.SERVICE_ACCOUNT}
                          stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                          ${NETWORK_TYPE}: ${NETWORKING}
                          networkTags: ${NETWORK_TAGS}
              result: RESPONSE_MESSAGE
              next: check_pyspark_extract_job
      
          - check_pyspark_extract_job:
              call: http.get
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: PYSPARK_EXTRACT_JOB_STATUS
              next: check_pyspark_extract_job_done
      
          - check_pyspark_extract_job_done:
              switch:
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                  next: generate_import_logs_link
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              next: pyspark_extract_job_wait
      
          - pyspark_extract_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_pyspark_extract_job
      
          - generate_import_logs_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                  severity: "INFO"
              next: submit_import_job
      
          - submit_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                body:
                  type: IMPORT
                  import_spec:
                    source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                    entry_sync_mode: FULL
                    aspect_sync_mode: INCREMENTAL
                    log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                    scope:
                      entry_groups: 
                        - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                      entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                      aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
              result: IMPORT_JOB_RESPONSE
              next: get_job_start_time
      
          - get_job_start_time:
              assign:
                - importJobStartTime: ${sys.now()}
              next: import_job_startup_wait
      
          - import_job_startup_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: initial_get_import_job
      
          - initial_get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_status_available
      
          - check_import_job_status_available:
              switch:
                - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                  next: kill_import_job
                - condition: ${"status" in IMPORT_JOB_STATUS.body}
                  next: check_import_job_done
              next: import_job_status_wait
      
          - import_job_status_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_import_job_status_available
      
          - check_import_job_done:
              switch:
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                  next: the_end
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                  next: kill_import_job
              next: import_job_wait
      
          - get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_done
      
          - import_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: get_import_job
      
          - kill_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: get_killed_import_job
      
          - get_killed_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: KILLED_IMPORT_JOB_STATUS
              next: killed
      
          - killed:
              raise: ${KILLED_IMPORT_JOB_STATUS}
      
          - the_end:
              return: ${IMPORT_JOB_STATUS}
  2. Para ejecutar la canalización a pedido, ejecutar el flujo de trabajo.

    Proporciona los siguientes argumentos del entorno de ejecución:

    {
        "TARGET_PROJECT_ID": "PROJECT_ID",
        "CLOUD_REGION": "LOCATION_ID",
        "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
        "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
        "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
        "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
        "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
        "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
        "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
        "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
        "IMPORT_JOB_LOG_LEVEL": "INFO",
        "NETWORK_TAGS": [],
        "NETWORK_URI": "",
        "SUBNETWORK_URI": ""
     }
    

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el nombre del proyecto de Google Cloud de destino al que se importarán los metadatos.
    • LOCATION_ID: Es la ubicación de Google Cloud de destino en la que se ejecutarán los trabajos de Dataproc sin servidor y de importación de metadatos, y a la que se importarán los metadatos.
    • ENTRY_GROUP_ID: El ID del grupo de entradas al que se importarán los metadatos. El ID del grupo de entrada puede contener letras minúsculas, números y guiones.

      El nombre completo del recurso de este grupo de entradas es projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN, si deseas que la canalización haga lo siguiente: crear el grupo de entrada, si aún no existe en tu proyecto, establece lo siguiente: valor en true.
    • BUCKET_ID: Es el nombre del bucket de Cloud Storage para almacenar el archivo de importación de metadatos que genera el conector. Cada ejecución del flujo de trabajo crea una carpeta nueva.
    • SERVICE_ACCOUNT_ID la cuenta de servicio que configuraste en Roles obligatorios de este documento. La cuenta de servicio ejecuta el conector en Dataproc Serverless.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: una lista de argumentos que se pasarán al conector. Para ver ejemplos, consulta Desarrolla una para la importación de metadatos. Encierra cada argumento en doble comillas y separar los argumentos con comas.
    • CONTAINER_IMAGE: Es la imagen de contenedor personalizada del conector alojada en Artifact Registry.
    • ENTRY_TYPES: Una lista de tipos de entradas que se encuentran dentro del alcance para importarlos, en el formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID LOCATION_ID debe ser la misma ubicación de Google Cloud a la que importas los metadatos o global.
    • ASPECT_TYPES: Una lista de los tipos de aspectos que se encuentran dentro del alcance para importarlos, en el formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID LOCATION_ID debe ser la misma ubicación de Google Cloud a la que importas los metadatos o global.
    • Opcional: Para el argumento NETWORK_TAGS, proporciona una lista de etiquetas de red.
    • Opcional: En el argumento NETWORK_URI, proporciona el URI de la VPC. red que se conecta a la fuente de datos. Si proporcionas una red, omite el argumento de subred.
    • Opcional: Para el argumento SUBNETWORK_URI, proporciona el URI de la subred que se conecta a la fuente de datos. Si proporcionas una subred, omite el argumento de red.

    Según la cantidad de metadatos que importes, la canalización puede tardar varios minutos o más en ejecutarse. Para obtener más información sobre cómo ver el progreso, consulta Cómo acceder a los resultados de la ejecución de flujos de trabajo.

    Una vez que se haya ejecutado la canalización, puedes buscar los metadatos importados en Dataplex Catalog.

  3. Opcional: Si deseas ejecutar la canalización según un programa, crea un programa con Cloud Scheduler. Proporcione la siguiente información:

    • Frecuencia: Una expresión cron de Unix que define el programa para para ejecutar la canalización.
    • Argumento de flujo de trabajo: Los argumentos de entorno de ejecución del conector, como se describe en el paso anterior.
    • Cuenta de servicio: La cuenta de servicio La cuenta de servicio administra el programador.

gcloud

  1. Guarda la siguiente definición de carga de trabajo como un archivo YAML:

    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: ${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: ${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: ${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: ${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - ${"--target_location_id=" + args.CLOUD_REGION}
                    - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--output_folder=" + WORKFLOW_ID}
                    - ${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: ${args.SERVICE_ACCOUNT}
                        stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                        ${NETWORK_TYPE}: ${NETWORKING}
                        networkTags: ${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: ${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: ${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: ${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: ${IMPORT_JOB_STATUS}
  2. Definir las variables de Bash crear el flujo de trabajo De manera opcional, puedes crear un programa para ejecuta la canalización:

    # Define Bash variables (replace with your actual values)
    project_id="PROJECT_ID"
    region="LOCATION_ID"
    service_account="SERVICE_ACCOUNT_ID"
    workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    # Create Workflows resource
    gcloud workflows deploy ${workflow_name} \
      --project=${project_id} \
      --location=${region} \
      --source=${workflow_source} \
      --service-account=${service_account}
    
    # Create Cloud Scheduler job
    gcloud scheduler jobs create http ${workflow_name}-scheduler \
      --project=${project_id} \
      --location=${region} \
      --schedule="CRON_SCHEDULE_EXPRESSION" \
      --time-zone="UTC" \
      --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \
      --http-method="POST" \
      --oauth-service-account-email=${service_account} \
      --headers="Content-Type=application/json" \
      --message-body='{"argument": ${workflow_args}}'
    

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el nombre del proyecto de Google Cloud de destino al que se importarán los metadatos.
    • LOCATION_ID: la ubicación de destino de Google Cloud en la que se ejecutarán Dataproc Serverless y los trabajos de importación de metadatos donde se importarán los metadatos.
    • SERVICE_ACCOUNT_ID: La cuenta de servicio que configuraste en la sección Roles necesarios de este documento.
    • WORKFLOW_DEFINITION_FILE: Es la ruta de acceso al archivo YAML de definición del flujo de trabajo.
    • WORKFLOW_NAME: El nombre del flujo de trabajo.
    • WORKFLOW_ARGUMENTS: Los argumentos del entorno de ejecución que se pasarán al conector. Los argumentos están en formato JSON:

      {
          "TARGET_PROJECT_ID": "PROJECT_ID",
          "CLOUD_REGION": "LOCATION_ID",
          "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
          "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
          "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
          "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
          "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
          "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
          "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
          "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
          "IMPORT_JOB_LOG_LEVEL": "INFO",
          "NETWORK_TAGS": [],
          "NETWORK_URI": "",
          "SUBNETWORK_URI": ""
       }
      

      En Cloud Scheduler, las comillas dobles dentro de la cadena con comillas se escapan con barras inversas (\). Por ejemplo: --message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}".

      Reemplaza lo siguiente:

      • ENTRY_GROUP_ID: El ID del grupo de entradas al que se importarán los metadatos. El ID del grupo de entrada puede contener letras minúsculas, números y guiones.

        El nombre completo del recurso de este grupo de entradas es projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

      • CREATE_ENTRY_GROUP_BOOLEAN, si deseas que la canalización haga lo siguiente: crear el grupo de entrada, si aún no existe en tu proyecto, establece lo siguiente: valor en true.
      • BUCKET_ID: Es el nombre de Cloud Storage. bucket para almacenar el archivo de importación de metadatos que genera el conector. Cada ejecución del flujo de trabajo crea una carpeta nueva.
      • ADDITIONAL_CONNECTOR_ARGUMENTS: Es una lista de argumentos adicionales que se pasarán al conector. Para ver ejemplos, consulta Desarrolla un conector personalizado para la importación de metadatos.
      • CONTAINER_IMAGE: Es la imagen de contenedor personalizada del objeto. de Google Cloud alojado en Artifact Registry.
      • ENTRY_TYPES: Una lista de tipos de entradas que se encuentran dentro del alcance para importarlos, en el formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID LOCATION_ID debe ser la misma ubicación de Google Cloud a la que importas los metadatos o global.
      • ASPECT_TYPES: Una lista de los tipos de aspectos que se encuentran dentro del alcance para importarlos, en el formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID LOCATION_ID debe ser la misma ubicación de Google Cloud a la que importas los metadatos o global.
      • Opcional: Para el argumento NETWORK_TAGS, proporciona una lista de etiquetas de red.
      • Opcional: En el argumento NETWORK_URI, proporciona el URI de la VPC. red que se conecta a la fuente de datos. Si proporcionas una red, omite el argumento de subred.
      • Opcional: En el argumento SUBNETWORK_URI, proporciona el URI de la subred que se conecta a la fuente de datos. Si proporcionas una subred, omite el argumento de red.
    • CRON_SCHEDULE_EXPRESSION: una expresión cron que define el programa para ejecutar la canalización. Por ejemplo, para ejecutar el programa a la media noche todos los días, usa la expresión 0 0 * * *.

  3. Para ejecutar la canalización a pedido, ejecuta el flujo de trabajo:

    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    gcloud workflows run "${workflow_name}" --project=${project_id} --location=${location} --data '${workflow_args}'
    

    Los argumentos del flujo de trabajo están en formato JSON, pero no tienen escape.

    Según la cantidad de metadatos que importes, el flujo de trabajo podría tardar varios minutos o más en ejecutarse. Para obtener más información sobre cómo ver el progreso, consulta Cómo acceder a los resultados de la ejecución del flujo de trabajo.

    Cuando la canalización haya terminado de ejecutarse, puedes buscar los metadatos importados en Dataplex Catalog.

Terraform

  1. Clona el Repositorio cloud-dataplex.

    El repositorio incluye los siguientes archivos de Terraform:

  2. Edita el archivo .tfvars para reemplazar los marcadores de posición por la información. para el conector.

    project_id                      = "PROJECT_ID"
    region                          = "LOCATION_ID"
    service_account                 = "SERVICE_ACCOUNT_ID"
    cron_schedule                   = "CRON_SCHEDULE_EXPRESSION"
    workflow_args                   = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID", "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [], "NETWORK_URI": "", "SUBNETWORK_URI": ""}
    
    
    workflow_source                 = <<EOF
    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: $${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: $${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: $${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: $${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: $${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: $${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - $${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - $${"--target_location_id=" + args.CLOUD_REGION}
                    - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - $${"--output_folder=" + WORKFLOW_ID}
                    - $${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: $${args.SERVICE_ACCOUNT}
                        stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
                        $${NETWORK_TYPE}: $${NETWORKING}
                        networkTags: $${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: $${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: $${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: $${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: $${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: $${IMPORT_JOB_STATUS}
    EOF
    

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el nombre del proyecto de Google Cloud de destino al que se importarán los metadatos.
    • LOCATION_ID: la ubicación de destino de Google Cloud en la que se ejecutarán Dataproc Serverless y los trabajos de importación de metadatos donde se importarán los metadatos.
    • SERVICE_ACCOUNT_ID: La cuenta de servicio que configuraste en la sección Roles necesarios de este documento.
    • CRON_SCHEDULE_EXPRESSION: Es una expresión cron que define el programa para ejecutar la canalización. Por ejemplo, para ejecutar el programa en la medianoche todos los días, usa la expresión 0 0 * * *.
    • ENTRY_GROUP_ID: El ID del grupo de entradas al que se importarán los metadatos. El ID del grupo de entradas puede contener letras minúsculas, números y guiones.

      El nombre completo del recurso de este grupo de entradas es projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN, si deseas que la canalización haga lo siguiente: crear el grupo de entrada, si aún no existe en tu proyecto, establece lo siguiente: valor en true.
    • BUCKET_ID: Es el nombre de Cloud Storage. bucket para almacenar el archivo de importación de metadatos que genera el conector. Cada ejecución del flujo de trabajo crea una carpeta nueva.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: Es una lista de argumentos adicionales que se pasarán al conector. Para ver ejemplos, consulta Desarrolla una para la importación de metadatos. Encierra cada argumento entre comillas dobles y sepáralos con comas.
    • CONTAINER_IMAGE: Es la imagen de contenedor personalizada del conector alojada en Artifact Registry.
    • ENTRY_TYPES: Una lista de tipos de entradas que se encuentran dentro del alcance para importarlos, en el formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID LOCATION_ID debe ser la misma ubicación de Google Cloud a la que importas los metadatos o global.
    • ASPECT_TYPES: Una lista de los tipos de aspectos que se encuentran dentro del alcance para importarlos, en el formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID LOCATION_ID debe ser la misma ubicación de Google Cloud a la que importas los metadatos o global.
    • Opcional: Para el argumento NETWORK_TAGS, proporciona una lista de etiquetas de red.
    • Opcional: En el argumento NETWORK_URI, proporciona el URI de la VPC. red que se conecta a la fuente de datos. Si proporcionas una red, omite el argumento de subred.
    • Opcional: En el argumento SUBNETWORK_URI, proporciona el URI de la subred que se conecta a la fuente de datos. Si proporcionas una subred, omite el argumento de red.
  3. Inicializa Terraform mediante este comando:

    terraform init
    
  4. Valida Terraform con tu archivo .tfvars:

    terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Reemplaza CONNECTOR_VARIABLES_FILE por el nombre. del archivo de definiciones de variables.

  5. Implementa Terraform con tu archivo .tfvars:

    terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Terraform crea un flujo de trabajo y un trabajo de Cloud Scheduler en la proyecto especificado. Workflows ejecuta la canalización en el programa que especifiques.

    Según la cantidad de metadatos que importes, el flujo de trabajo podría tardar varios minutos o más en ejecutarse. Para obtener más información sobre cómo ver el progreso, consulta Cómo acceder a los resultados de la ejecución del flujo de trabajo.

    Cuando la canalización haya terminado de ejecutarse, puedes buscar los metadatos importados en Dataplex Catalog.

Ver registros de trabajos

Usar Cloud Logging para ver los registros de una canalización de conectividad administrada El registro incluye un vínculo a los registros de la instancia el trabajo por lotes y el trabajo de importación de metadatos, según corresponda. Para obtener más información, consulta Visualiza los registros del flujo de trabajo.

Soluciona problemas

Usa las siguientes sugerencias para solucionar problemas:

¿Qué sigue?