Supervisa el estado de la canalización

En esta página, se describe cómo publicar eventos de canalización de Cloud Data Fusion, como el estado de la canalización, en temas de Pub/Sub. También se describe cómo crear funciones de Cloud Run que procesen los mensajes de Pub/Sub y tomen medidas, como identificar y volver a intentar canalización fallidas.

Antes de comenzar

  • Crea un tema en el que Pub/Sub pueda publicar eventos de canalización de Cloud Data Fusion.

Roles obligatorios

Para garantizar que la cuenta de servicio de Cloud Data Fusion tenga los permisos necesarios para publicar eventos de canalización en un tema de Pub/Sub, pídele a tu administrador que le otorgue a la cuenta de servicio de Cloud Data Fusion el rol de IAM Publicador de Pub/Sub (roles/pubsub.publisher) en el proyecto en el que creas el tema de Pub/Sub.

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Es posible que tu administrador también pueda otorgar a la cuenta de servicio de Cloud Data Fusion los permisos necesarios mediante roles personalizados o con otros roles predefinidos.

Administra la publicación de eventos en una instancia de Cloud Data Fusion

Puedes administrar la publicación de eventos en instancias existentes y nuevas de Cloud Data Fusion con la API de REST en las versiones 6.7.0 y posteriores.

Publica eventos en una instancia nueva

Crea una instancia nueva y, luego, incluye el campo EventPublishConfig. Para obtener más información sobre los campos obligatorios para instancias nuevas, consulta la referencia del recurso de instancias.

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

Reemplaza lo siguiente:

  • PROJECT_ID: El Google Cloud ID del proyecto
  • LOCATION: Es la ubicación de tu proyecto.
  • INSTANCE_ID: El ID de tu instancia de Cloud Data Fusion
  • VERSION_NUMBER: La versión de Cloud Data Fusion en la que creas la instancia (por ejemplo, 6.10.1)
  • TOPIC_ID: Es el ID del tema de Pub/Sub.

Habilita la publicación de eventos en una instancia existente de Cloud Data Fusion

Actualiza el campo EventPublishConfig en una instancia existente de Cloud Data Fusion:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

Reemplaza lo siguiente:

  • PROJECT_ID: El Google Cloud ID del proyecto
  • LOCATION: Es la ubicación de tu proyecto.
  • INSTANCE_ID: El ID de tu instancia de Cloud Data Fusion
  • TOPIC_ID: Es el ID del tema de Pub/Sub.

Cómo quitar la publicación de eventos de una instancia

Para quitar la publicación de eventos de una instancia, actualiza el valor enabled de la publicación de eventos a false:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

Crea funciones para leer mensajes de Pub/Sub

Las funciones de Cloud Run pueden leer mensajes de Pub/Sub y realizar acciones en ellos, como volver a intentar canalizar errores. Para crear una función de Cloud Run, haz lo siguiente:

  1. En la consola de Google Cloud, ve a la página Funciones de Cloud Run.

    Ve a las funciones de Cloud Run

  2. Haz clic en Crear función.

  3. Ingresa un nombre y una región para la función.

  4. En el campo Tipo de activador, selecciona Cloud Pub/Sub.

  5. Ingresa el ID del tema de Pub/Sub.

  6. Haz clic en Siguiente.

  7. Agrega funciones para leer los mensajes de Pub/Sub y realizar otras acciones. Por ejemplo, puedes agregar funciones para los siguientes casos de uso:

    • Envía alertas de fallas de canalización.
    • Enviar alertas de KPI, como el recuento de registros o la información de ejecución
    • Reinicia una canalización con errores que no se volvió a ejecutar.

    Para ver ejemplos de funciones de Cloud Run, consulta la sección caso de uso.

  8. Haz clic en Implementar. Para obtener más información, consulta Cómo implementar una función de Cloud Run.

Caso de uso: Documenta el estado de la canalización y vuelve a intentar las canalizaciones con errores

En el siguiente ejemplo, las funciones de Cloud Run leen mensajes de Pub/Sub sobre el estado de ejecución de la canalización y, luego, vuelven a intentar las canalizaciones que fallaron en Cloud Data Fusion.

Estas funciones de ejemplo se refieren a los siguientes componentes Google Cloud :

  • Google Cloud project: Es el proyecto en el que se crean las funciones de Cloud Run y los temas de Pub/Sub.
  • Tema de Pub/Sub: Es el tema de Pub/Sub vinculado a tu instancia de Cloud Data Fusion.
  • Instancia de Cloud Data Fusion: Es la instancia de Cloud Data Fusion en la que diseñas y ejecutas canalizaciones.
  • Tabla de BigQuery: Es la tabla de BigQuery que captura el estado de la canalización y los detalles de la ejecución y la nueva ejecución.
  • Función de Cloud Run: Es la función de Cloud Run en la que implementas el código que vuelve a intentar las canalizaciones fallidas.
  1. En el siguiente ejemplo de función de Cloud Run, se leen los mensajes de Pub/Sub sobre los eventos de estado de Cloud Data Fusion.

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. En la siguiente función de ejemplo, se crea y guarda una tabla de BigQuery, y se consultan los detalles de la ejecución de la canalización.

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. En la siguiente función de ejemplo, se verifican los flujos de trabajo que fallaron y si se volvieron a ejecutar en la última hora.

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. Si la canalización con errores no se ejecutó recientemente, la siguiente función de ejemplo la vuelve a ejecutar.

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

¿Qué sigue?