Importa metadatos de una fuente personalizada con flujos de trabajo

En este documento, se describe cómo importar metadatos de una fuente externa a Dataplex mediante la ejecución de una canalización de conectividad administrada en Workflows.

Para configurar una canalización de conectividad administrada, compilas un conector para tu fuente de datos. Luego, ejecutas la canalización en Workflows. La canalización extrae metadatos de tu fuente de datos y, luego, los importa a Dataplex. Si es necesario, la canalización también crea grupos de entradas de Dataplex Catalog en tu Google Cloud proyecto.

Para obtener más información sobre la conectividad administrada, consulta la descripción general de la conectividad administrada.

Antes de comenzar

Antes de importar metadatos, completa las tareas de esta sección.

Compila un conector

Un conector extrae los metadatos de tu fuente de datos y genera un archivo de importación de metadatos que Dataplex puede importar. El conector es una imagen de Artifact Registry que se puede ejecutar en Dataproc Serverless.

Configura Google Cloud recursos

  1. Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.

    Enable the APIs

    Si no planeas ejecutar la canalización según un programa, no necesitas habilitar la API de Cloud Scheduler.

  2. Crea secretos en Secret Manager para almacenar las credenciales de tu fuente de datos de terceros.

  3. Configura tu red de nube privada virtual (VPC) para ejecutar cargas de trabajo de Dataproc Serverless para Spark.

  4. Crea un bucket de Cloud Storage para almacenar los archivos de importación de metadatos.

  5. Crea los siguientes recursos de Dataplex Catalog:

    1. Crea tipos de aspectos personalizados para las entradas que deseas importar.

    2. Crea tipos de entradas personalizadas para las entradas que deseas importar.

Roles obligatorios

Una cuenta de servicio representa la identidad de un flujo de trabajo y determina qué permisos tiene y a qué Google Cloud recursos puede acceder. Necesitas una cuenta de servicio para Workflows (para ejecutar la canalización) y para Dataproc Serverless (para ejecutar el conector).

Puedes usar la cuenta de servicio predeterminada de Compute Engine (PROJECT_NUMBER-compute@developer.gserviceaccount.com) o crear tu propia cuenta de servicio (o cuentas) para ejecutar la canalización de conectividad administrada.

Console

  1. En la consola de Google Cloud, ve a la página IAM.

    Ir a IAM

  2. Selecciona el proyecto al que deseas importar metadatos.

  3. Haz clic en Otorgar acceso y, luego, ingresa la dirección de correo electrónico de la cuenta de servicio.

  4. Asigna los siguientes roles a la cuenta de servicio:

    • Escritor de registros
    • Propietario del grupo de entradas de Dataplex
    • Propietario de trabajos de metadatos de Dataplex
    • Editor del catálogo de Dataplex
    • Editor de Dataproc
    • Trabajador de Dataproc
    • Administrador y descriptor de acceso a secretos de Secret Manager: En el secreto que almacena las credenciales de tu fuente de datos
    • Usuario de objetos de almacenamiento: En el bucket de Cloud Storage
    • Artifact Registry Reader: En el repositorio de Artifact Registry que contiene la imagen del conector
    • Usuario de cuenta de servicio: Si usas cuentas de servicio diferentes, otorga este rol a la cuenta de servicio que ejecuta Workflows en la cuenta de servicio que ejecuta los trabajos por lotes sin servidor de Dataproc.
    • Invocador de flujos de trabajo: Si quieres programar la canalización
  5. Guarda los cambios.

gcloud

  1. Otorga roles a la cuenta de servicio. Ejecuta los siguientes comandos:

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/logging.logWriter
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.entryGroupOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.metadataJobOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.catalogEditor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.editor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.worker
    

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el nombre del proyecto Google Cloud de destino al que se importarán los metadatos.
    • SERVICE_ACCOUNT_ID: La cuenta de servicio, como my-service-account@my-project.iam.gserviceaccount.com.
  2. Otorga a la cuenta de servicio los siguientes roles a nivel del recurso:

    gcloud secrets add-iam-policy-binding SECRET_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/secretmanager.secretaccessor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/storage.objectUser \
        --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')
    gcloud artifacts repositories add-iam-policy-binding REPOSITORY \
        --location=REPOSITORY_LOCATION \
        --member=SERVICE_ACCOUNT_ID} \
        --role=roles/artifactregistry.reader
    

    Reemplaza lo siguiente:

    • SECRET_ID: Es el ID del secreto que almacena las credenciales de tu fuente de datos. Usa el formato projects/PROJECT_ID/secrets/SECRET_ID.
    • BUCKET_ID: Es el nombre del bucket de Cloud Storage.
    • REPOSITORY: Es el repositorio de Artifact Registry que contiene la imagen del conector.
    • REPOSITORY_LOCATION: Es la ubicación Google Clouden la que se aloja el repositorio.
  3. Otorgar a la cuenta de servicio que ejecuta Workflows el rol roles/iam.serviceAccountUser en la cuenta de servicio que ejecuta los trabajos por lotes de Dataproc Serverless Debes otorgar este rol incluso si usas la misma cuenta de servicio para Workflows y Dataproc sin servidores.

    gcloud iam service-accounts add-iam-policy-binding \
        serviceAccount:SERVICE_ACCOUNT_ID \
        --member='SERVICE_ACCOUNT_ID' \
        --role='roles/iam.serviceAccountUser'
    

    Si usas cuentas de servicio diferentes, el valor de la marca --member es la cuenta de servicio que ejecuta los trabajos por lotes sin servidor de Dataproc.

  4. Si deseas programar la canalización, otorga a la cuenta de servicio el siguiente rol:

    gcloud projects add-iam-policy-binding PROJECT_ID \
     --member="SERVICE_ACCOUNT_ID" \
     --role=roles/workflows.invoker
    

Importar metadatos

Para importar metadatos, crea y, luego, ejecuta un flujo de trabajo que ejecute la canalización de conectividad administrada. De manera opcional, también puedes crear un programa para ejecutar la canalización.

Console

  1. Crea el flujo de trabajo. Proporciona la siguiente información:

    • Cuenta de servicio: Es la cuenta de servicio que configuraste en la sección Roles necesarios de este documento.
    • Encriptación: Selecciona Google-managed encryption key.

    • Define el flujo de trabajo: Proporciona el siguiente archivo de definición:

      main:
        params: [args]
        steps:
          - init:
              assign:
              - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
              - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
              - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
              - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
      
          - check_networking:
              switch:
                - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                  raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
                - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                  steps:
                   - submit_extract_job_with_default_network_uri:
                        assign:
                          - NETWORK_TYPE: "networkUri"
                          - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
                - condition: ${NETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_network_uri:
                        assign:
                          - NETWORKING: ${NETWORK_URI}
                          - NETWORK_TYPE: "networkUri"
                - condition: ${SUBNETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_subnetwork_uri:
                        assign:
                          - NETWORKING: ${SUBNETWORK_URI}
                          - NETWORK_TYPE: "subnetworkUri"
              next: check_create_target_entry_group
      
          - check_create_target_entry_group:
              switch:
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                  next: create_target_entry_group
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                  next: generate_extract_job_link
      
          - create_target_entry_group:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: generate_extract_job_link
      
          - generate_extract_job_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                  severity: "INFO"
              next: submit_pyspark_extract_job
      
          - submit_pyspark_extract_job:
              call: http.post
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                headers:
                  Content-Type: "application/json"
                query:
                  batchId: ${WORKFLOW_ID}
                body:
                  pysparkBatch:
                    mainPythonFileUri: file:///main.py
                    args:
                      - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                      - ${"--target_location_id=" + args.CLOUD_REGION}
                      - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                      - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                      - ${"--output_folder=" + WORKFLOW_ID}
                      - ${args.ADDITIONAL_CONNECTOR_ARGS}
                  runtimeConfig:
                      containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                  environmentConfig:
                      executionConfig:
                          serviceAccount: ${args.SERVICE_ACCOUNT}
                          stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                          ${NETWORK_TYPE}: ${NETWORKING}
                          networkTags: ${NETWORK_TAGS}
              result: RESPONSE_MESSAGE
              next: check_pyspark_extract_job
      
          - check_pyspark_extract_job:
              call: http.get
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: PYSPARK_EXTRACT_JOB_STATUS
              next: check_pyspark_extract_job_done
      
          - check_pyspark_extract_job_done:
              switch:
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                  next: generate_import_logs_link
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              next: pyspark_extract_job_wait
      
          - pyspark_extract_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_pyspark_extract_job
      
          - generate_import_logs_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                  severity: "INFO"
              next: submit_import_job
      
          - submit_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                body:
                  type: IMPORT
                  import_spec:
                    source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                    entry_sync_mode: FULL
                    aspect_sync_mode: INCREMENTAL
                    log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                    scope:
                      entry_groups: 
                        - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                      entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                      aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
              result: IMPORT_JOB_RESPONSE
              next: get_job_start_time
      
          - get_job_start_time:
              assign:
                - importJobStartTime: ${sys.now()}
              next: import_job_startup_wait
      
          - import_job_startup_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: initial_get_import_job
      
          - initial_get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_status_available
      
          - check_import_job_status_available:
              switch:
                - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                  next: kill_import_job
                - condition: ${"status" in IMPORT_JOB_STATUS.body}
                  next: check_import_job_done
              next: import_job_status_wait
      
          - import_job_status_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_import_job_status_available
      
          - check_import_job_done:
              switch:
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                  next: the_end
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                  next: kill_import_job
              next: import_job_wait
      
          - get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_done
      
          - import_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: get_import_job
      
          - kill_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: get_killed_import_job
      
          - get_killed_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: KILLED_IMPORT_JOB_STATUS
              next: killed
      
          - killed:
              raise: ${KILLED_IMPORT_JOB_STATUS}
      
          - the_end:
              return: ${IMPORT_JOB_STATUS}
  2. Para ejecutar la canalización a pedido, ejecuta el flujo de trabajo.

    Proporciona los siguientes argumentos del entorno de ejecución:

    {
        "TARGET_PROJECT_ID": "PROJECT_ID",
        "CLOUD_REGION": "LOCATION_ID",
        "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
        "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
        "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
        "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
        "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
        "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
        "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
        "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
        "IMPORT_JOB_LOG_LEVEL": "INFO",
        "NETWORK_TAGS": [],
        "NETWORK_URI": "",
        "SUBNETWORK_URI": ""
     }
    

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el nombre del proyecto Google Cloud de destino al que se importarán los metadatos.
    • LOCATION_ID: Es la ubicación Google Cloud de destino en la que se ejecutarán los trabajos de Dataproc sin servidores y de importación de metadatos, y a la que se importarán los metadatos.
    • ENTRY_GROUP_ID: El ID del grupo de entradas al que se importarán los metadatos. El ID del grupo de entrada puede contener letras minúsculas, números y guiones.

      El nombre completo del recurso de este grupo de entradas es projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: Si deseas que la canalización cree el grupo de entrada si aún no existe en tu proyecto, establece este valor en true.
    • BUCKET_ID: Es el nombre del bucket de Cloud Storage para almacenar el archivo de importación de metadatos que genera el conector. Cada ejecución de flujo de trabajo crea una carpeta nueva.
    • SERVICE_ACCOUNT_ID: La cuenta de servicio que configuraste en la sección Roles necesarios de este documento. La cuenta de servicio ejecuta el conector en Dataproc Serverless.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: Es una lista de argumentos adicionales que se pasarán al conector. Para ver ejemplos, consulta Desarrolla un conector personalizado para la importación de metadatos. Encierra cada argumento entre comillas dobles y sepáralos con comas.
    • CONTAINER_IMAGE: Es la imagen de contenedor personalizada del conector alojada en Artifact Registry.
    • ENTRY_TYPES: Es una lista de tipos de entrada que están dentro del alcance para la importación, en el formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. LOCATION_ID debe ser la misma ubicaciónGoogle Cloud a la que importas los metadatos o global.
    • ASPECT_TYPES: Es una lista de tipos de aspectos que están dentro del alcance para la importación, en el formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. LOCATION_ID debe ser la misma ubicaciónGoogle Cloud a la que importas los metadatos o global.
    • Opcional: Para el argumento NETWORK_TAGS, proporciona una lista de etiquetas de red.
    • Opcional: Para el argumento NETWORK_URI, proporciona el URI de la red de VPC que se conecta a la fuente de datos. Si proporcionas una red, omite el argumento de subred.
    • Opcional: Para el argumento SUBNETWORK_URI, proporciona el URI de la subred que se conecta a la fuente de datos. Si proporcionas una subred, omite el argumento de red.

    Según la cantidad de metadatos que importes, la canalización puede tardar varios minutos o más en ejecutarse. Para obtener más información sobre cómo ver el progreso, consulta Cómo acceder a los resultados de la ejecución de flujos de trabajo.

    Una vez que se haya ejecutado la canalización, puedes buscar los metadatos importados en Dataplex Catalog.

  3. Opcional: Si deseas ejecutar la canalización según un programa, crea un programa con Cloud Scheduler. Proporciona la siguiente información:

    • Frecuencia: Es una expresión unix-cron que define el programa para ejecutar la canalización.
    • Argumento de flujo de trabajo: Los argumentos de entorno de ejecución del conector, como se describe en el paso anterior.
    • Cuenta de servicio: Es la cuenta de servicio. La cuenta de servicio administra el programador.

gcloud

  1. Guarda la siguiente definición de carga de trabajo como un archivo YAML:

    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                steps:
                 - submit_extract_job_with_default_network_uri:
                      assign:
                        - NETWORK_TYPE: "networkUri"
                        - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
              - condition: ${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: ${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: ${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: ${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - ${"--target_location_id=" + args.CLOUD_REGION}
                    - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--output_folder=" + WORKFLOW_ID}
                    - ${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: ${args.SERVICE_ACCOUNT}
                        stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                        ${NETWORK_TYPE}: ${NETWORKING}
                        networkTags: ${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: ${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: ${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: ${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: ${IMPORT_JOB_STATUS}
  2. Define las variables de Bash, crea el flujo de trabajo y, de manera opcional, crea un programa para ejecutar la canalización:

    # Define Bash variables (replace with your actual values)
    project_id="PROJECT_ID"
    region="LOCATION_ID"
    service_account="SERVICE_ACCOUNT_ID"
    workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    # Create Workflows resource
    gcloud workflows deploy ${workflow_name} \
      --project=${project_id} \
      --location=${region} \
      --source=${workflow_source} \
      --service-account=${service_account}
    
    # Create Cloud Scheduler job
    gcloud scheduler jobs create http ${workflow_name}-scheduler \
      --project=${project_id} \
      --location=${region} \
      --schedule="CRON_SCHEDULE_EXPRESSION" \
      --time-zone="UTC" \
      --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \
      --http-method="POST" \
      --oauth-service-account-email=${service_account} \
      --headers="Content-Type=application/json" \
      --message-body='{"argument": ${workflow_args}}'
    

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el nombre del proyecto Google Cloud de destino al que se importarán los metadatos.
    • LOCATION_ID: Es la ubicación Google Cloud de destino en la que se ejecutarán los trabajos de Dataproc sin servidores y de importación de metadatos, y a la que se importarán los metadatos.
    • SERVICE_ACCOUNT_ID: La cuenta de servicio que configuraste en la sección Roles necesarios de este documento.
    • WORKFLOW_DEFINITION_FILE: Es la ruta de acceso al archivo YAML de definición del flujo de trabajo.
    • WORKFLOW_NAME: El nombre del flujo de trabajo.
    • WORKFLOW_ARGUMENTS: Son los argumentos del entorno de ejecución que se pasarán al conector. Los argumentos están en formato JSON:

      {
          "TARGET_PROJECT_ID": "PROJECT_ID",
          "CLOUD_REGION": "LOCATION_ID",
          "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
          "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
          "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
          "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
          "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
          "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
          "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
          "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
          "IMPORT_JOB_LOG_LEVEL": "INFO",
          "NETWORK_TAGS": [],
          "NETWORK_URI": "",
          "SUBNETWORK_URI": ""
       }
      

      En Cloud Scheduler, las comillas dobles dentro de la cadena con comillas se escapan con barras inversas (\). Por ejemplo: --message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}".

      Reemplaza lo siguiente:

      • ENTRY_GROUP_ID: El ID del grupo de entradas al que se importarán los metadatos. El ID del grupo de entrada puede contener letras minúsculas, números y guiones.

        El nombre completo del recurso de este grupo de entradas es projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

      • CREATE_ENTRY_GROUP_BOOLEAN: Si deseas que la canalización cree el grupo de entrada si aún no existe en tu proyecto, establece este valor en true.
      • BUCKET_ID: Es el nombre del bucket de Cloud Storage para almacenar el archivo de importación de metadatos que genera el conector. Cada ejecución de flujo de trabajo crea una carpeta nueva.
      • ADDITIONAL_CONNECTOR_ARGUMENTS: Es una lista de argumentos adicionales que se pasarán al conector. Para ver ejemplos, consulta Desarrolla un conector personalizado para la importación de metadatos.
      • CONTAINER_IMAGE: Es la imagen de contenedor personalizada del conector alojada en Artifact Registry.
      • ENTRY_TYPES: Es una lista de tipos de entrada que están dentro del alcance para la importación, en el formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. LOCATION_ID debe ser la misma ubicaciónGoogle Cloud a la que importas los metadatos o global.
      • ASPECT_TYPES: Es una lista de tipos de aspectos que están dentro del alcance para la importación, en el formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. LOCATION_ID debe ser la misma ubicaciónGoogle Cloud a la que importas los metadatos o global.
      • Opcional: Para el argumento NETWORK_TAGS, proporciona una lista de etiquetas de red.
      • Opcional: Para el argumento NETWORK_URI, proporciona el URI de la red de VPC que se conecta a la fuente de datos. Si proporcionas una red, omite el argumento de subred.
      • Opcional: Para el argumento SUBNETWORK_URI, proporciona el URI de la subred que se conecta a la fuente de datos. Si proporcionas una subred, omite el argumento de red.
    • CRON_SCHEDULE_EXPRESSION: Es una expresión cron que define el programa para ejecutar la canalización. Por ejemplo, para ejecutar el programa a la media noche todos los días, usa la expresión 0 0 * * *.

  3. Para ejecutar la canalización a pedido, ejecuta el flujo de trabajo:

    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    gcloud workflows run "${workflow_name}" --project=${project_id} --location=${location} --data '${workflow_args}'
    

    Los argumentos del flujo de trabajo están en formato JSON, pero no se escapan.

    Según la cantidad de metadatos que importes, el flujo de trabajo podría tardar varios minutos o más en ejecutarse. Para obtener más información sobre cómo ver el progreso, consulta Cómo acceder a los resultados de la ejecución del flujo de trabajo.

    Una vez que se haya ejecutado la canalización, puedes buscar los metadatos importados en Dataplex Catalog.

Terraform

  1. Clona el repositorio cloud-dataplex.

    El repositorio incluye los siguientes archivos de Terraform:

  2. Edita el archivo .tfvars para reemplazar los marcadores de posición por la información de tu conector.

    project_id                      = "PROJECT_ID"
    region                          = "LOCATION_ID"
    service_account                 = "SERVICE_ACCOUNT_ID"
    cron_schedule                   = "CRON_SCHEDULE_EXPRESSION"
    workflow_args                   = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID", "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [], "NETWORK_URI": "", "SUBNETWORK_URI": ""}
    
    
    workflow_source                 = <<EOF
    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: $${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: $${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: $${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: $${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: $${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: $${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - $${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - $${"--target_location_id=" + args.CLOUD_REGION}
                    - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - $${"--output_folder=" + WORKFLOW_ID}
                    - $${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: $${args.SERVICE_ACCOUNT}
                        stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
                        $${NETWORK_TYPE}: $${NETWORKING}
                        networkTags: $${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: $${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: $${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: $${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: $${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: $${IMPORT_JOB_STATUS}
    EOF
    

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el nombre del proyecto Google Cloud de destino al que se importarán los metadatos.
    • LOCATION_ID: Es la ubicación Google Cloud de destino en la que se ejecutarán los trabajos de Dataproc sin servidores y de importación de metadatos, y a la que se importarán los metadatos.
    • SERVICE_ACCOUNT_ID: La cuenta de servicio que configuraste en la sección Roles necesarios de este documento.
    • CRON_SCHEDULE_EXPRESSION: Es una expresión cron que define el programa para ejecutar la canalización. Por ejemplo, para ejecutar el programa a la media noche todos los días, usa la expresión 0 0 * * *.
    • ENTRY_GROUP_ID: El ID del grupo de entradas al que se importarán los metadatos. El ID del grupo de entrada puede contener letras minúsculas, números y guiones.

      El nombre completo del recurso de este grupo de entradas es projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: Si deseas que la canalización cree el grupo de entrada si aún no existe en tu proyecto, establece este valor en true.
    • BUCKET_ID: Es el nombre del bucket de Cloud Storage para almacenar el archivo de importación de metadatos que genera el conector. Cada ejecución de flujo de trabajo crea una carpeta nueva.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: Es una lista de argumentos adicionales que se pasarán al conector. Para ver ejemplos, consulta Desarrolla un conector personalizado para la importación de metadatos. Encierra cada argumento entre comillas dobles y sepáralos con comas.
    • CONTAINER_IMAGE: Es la imagen de contenedor personalizada del conector alojada en Artifact Registry.
    • ENTRY_TYPES: Es una lista de tipos de entrada que están dentro del alcance para la importación, en el formato projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. LOCATION_ID debe ser la misma ubicaciónGoogle Cloud a la que importas los metadatos o global.
    • ASPECT_TYPES: Es una lista de tipos de aspectos que están dentro del alcance para la importación, en el formato projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. LOCATION_ID debe ser la misma ubicaciónGoogle Cloud a la que importas los metadatos o global.
    • Opcional: Para el argumento NETWORK_TAGS, proporciona una lista de etiquetas de red.
    • Opcional: Para el argumento NETWORK_URI, proporciona el URI de la red de VPC que se conecta a la fuente de datos. Si proporcionas una red, omite el argumento de subred.
    • Opcional: Para el argumento SUBNETWORK_URI, proporciona el URI de la subred que se conecta a la fuente de datos. Si proporcionas una subred, omite el argumento de red.
  3. Inicializa Terraform mediante este comando:

    terraform init
    
  4. Valida Terraform con tu archivo .tfvars:

    terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Reemplaza CONNECTOR_VARIABLES_FILE por el nombre de tu archivo de definiciones de variables.

  5. Implementa Terraform con tu archivo .tfvars:

    terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    Terraform crea un flujo de trabajo y un trabajo de Cloud Scheduler en el proyecto especificado. Workflows ejecuta la canalización en el programa que especifiques.

    Según la cantidad de metadatos que importes, el flujo de trabajo podría tardar varios minutos o más en ejecutarse. Para obtener más información sobre cómo ver el progreso, consulta Cómo acceder a los resultados de la ejecución del flujo de trabajo.

    Una vez que se haya ejecutado la canalización, puedes buscar los metadatos importados en Dataplex Catalog.

Cómo ver registros de trabajos

Usa Cloud Logging para ver los registros de una canalización de conectividad administrada. La carga útil de registro incluye un vínculo a los registros del trabajo por lotes de Dataproc sin servidores y del trabajo de importación de metadatos, según corresponda. Para obtener más información, consulta Ver registros de flujo de trabajo.

Soluciona problemas

Usa las siguientes sugerencias para solucionar problemas:

¿Qué sigue?