Monitorizar el estado de la canalización

En esta página se describe cómo publicar eventos de canalización de Cloud Data Fusion, como el estado de la canalización, en temas de Pub/Sub. También se describe cómo crear funciones de Cloud Run que procesen los mensajes de Pub/Sub y realicen acciones, como identificar y volver a intentar ejecutar las canalizaciones fallidas.

Antes de empezar

  • Crea un tema en el que Pub/Sub pueda publicar eventos de la canalización de Cloud Data Fusion.

Roles obligatorios

Para asegurarte de que la cuenta de servicio de Cloud Data Fusion tiene los permisos necesarios para publicar eventos de la canalización en un tema de Pub/Sub, pide a tu administrador que le conceda el rol de gestión de identidades y accesos Publicador de Pub/Sub (roles/pubsub.publisher) en el proyecto en el que crees el tema de Pub/Sub.

Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.

Es posible que tu administrador también pueda conceder a la cuenta de servicio de Cloud Data Fusion los permisos necesarios a través de roles personalizados u otros roles predefinidos.

Gestionar la publicación de eventos en una instancia de Cloud Data Fusion

Puede gestionar la publicación de eventos en instancias de Cloud Data Fusion nuevas y ya creadas mediante la API REST en las versiones 6.7.0 y posteriores.

Publicar eventos en una instancia nueva

Crea una instancia e incluye el campo EventPublishConfig. Para obtener más información sobre los campos obligatorios de las nuevas instancias, consulte la referencia del recurso Instances.

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud
  • LOCATION: la ubicación de tu proyecto
  • INSTANCE_ID: el ID de tu instancia de Cloud Data Fusion
  • VERSION_NUMBER: la versión de Cloud Data Fusion en la que creas la instancia (por ejemplo, 6.10.1
  • TOPIC_ID: el ID del tema de Pub/Sub

Habilitar la publicación de eventos en una instancia de Cloud Data Fusion

Actualiza el campo EventPublishConfig de una instancia de Cloud Data Fusion:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud
  • LOCATION: la ubicación de tu proyecto
  • INSTANCE_ID: el ID de tu instancia de Cloud Data Fusion
  • TOPIC_ID: el ID del tema de Pub/Sub

Quitar la publicación de eventos de una instancia

Para quitar la publicación de eventos de una instancia, actualiza el valor de publicación de eventos enabled a false:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

Crear funciones para leer mensajes de Pub/Sub

Las funciones de Cloud Run pueden leer mensajes de Pub/Sub y actuar en consecuencia, como reintentar las canalizaciones fallidas. Para crear una función de Cloud Run, sigue estos pasos:

  1. En la Google Cloud consola, ve a la página Funciones de Cloud Run.

    Ir a Cloud Run Functions

  2. Haz clic en Crear función.

  3. Introduce el nombre de la función y la región.

  4. En el campo Tipo de activador, selecciona Cloud Pub/Sub.

  5. Introduce el ID del tema de Pub/Sub.

  6. Haz clic en Siguiente.

  7. Añade funciones para leer los mensajes de Pub/Sub y realizar otras acciones. Por ejemplo, puedes añadir funciones para los siguientes casos prácticos:

    • Enviar alertas de fallos en las canalizaciones.
    • Enviar alertas sobre KPIs, como el recuento de registros o la información de ejecución.
    • Reiniciar una canalización fallida que no se ha vuelto a ejecutar.

    Para ver ejemplos de funciones de Cloud Run, consulta la sección de casos prácticos.

  8. Haz clic en Desplegar. Para obtener más información, consulta Desplegar una función de Cloud Run.

Caso práctico: estado de la canalización de documentos y reintento de canalizaciones fallidas

Las siguientes funciones de Cloud Run leen mensajes de Pub/Sub sobre el estado de ejecución de la canalización y, a continuación, vuelven a intentar ejecutar las canalizaciones fallidas en Cloud Data Fusion.

Estas funciones de ejemplo hacen referencia a los siguientes componentes de Google Cloud :

  • Google Cloud project: el proyecto en el que se crean las funciones de Cloud Run y los temas de Pub/Sub
  • Tema de Pub/Sub: tema de Pub/Sub vinculado a tu instancia de Cloud Data Fusion
  • Instancia de Cloud Data Fusion: la instancia de Cloud Data Fusion en la que diseñas y ejecutas flujos de procesamiento.
  • Tabla de BigQuery: tabla de BigQuery que registra el estado de la canalización y los detalles de las ejecuciones y las repeticiones.
  • Función de Cloud Run: función de Cloud Run en la que se despliega el código que vuelve a intentar ejecutar las canalizaciones fallidas
  1. En el siguiente ejemplo de función de Cloud Run se leen los mensajes de Pub/Sub sobre eventos de estado de Cloud Data Fusion.

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. La siguiente función de ejemplo crea y guarda una tabla de BigQuery, y consulta los detalles de la ejecución de la canalización.

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. La siguiente función de ejemplo comprueba si se han producido errores en las canalizaciones y si se han vuelto a ejecutar en la última hora.

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. Si la canalización que ha fallado no se ha ejecutado recientemente, la siguiente función de ejemplo vuelve a ejecutarla.

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

Siguientes pasos