Supervisa el estado de la canalización

En esta página, se describe cómo publicar eventos de canalización de Cloud Data Fusion, como del estado de la canalización a los temas de Pub/Sub. También se describe cómo crear funciones de Cloud Run que procesen los mensajes de Pub/Sub y tomen medidas, como identificar y volver a intentar canalización fallidas.

Antes de comenzar

  • Crea un tema en el que Pub/Sub pueda publicar eventos de canalización de Cloud Data Fusion.

Roles obligatorios

Para garantizar que la cuenta de servicio de Cloud Data Fusion cuente con los permisos necesarios permisos para publicar eventos de canalización en un tema de Pub/Sub, pedirle a tu administrador que otorgue el acceso a la cuenta de servicio de Cloud Data Fusion Rol de IAM de Publicador de Pub/Sub (roles/pubsub.publisher) en el proyecto en el que creas el tema de Pub/Sub. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Es posible que tu administrador también pueda otorgar a la cuenta de servicio de Cloud Data Fusion los permisos necesarios mediante roles personalizados o con otros roles predefinidos.

Administrar la publicación de eventos en una instancia de Cloud Data Fusion

Puedes administrar la publicación de eventos en instancias existentes y nuevas de Cloud Data Fusion con la API de REST en las versiones 6.7.0 y posteriores.

Publica eventos en una instancia nueva

Crea una instancia nueva y, luego, incluye el campo EventPublishConfig. Para obtener más información sobre los campos obligatorios para instancias nuevas, consulta la referencia del recurso de instancias.

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud
  • LOCATION: Es la ubicación de tu proyecto.
  • INSTANCE_ID: El ID de tu instancia de Cloud Data Fusion
  • VERSION_NUMBER: Es la versión de Cloud Data Fusion. en la que creas la instancia, por ejemplo, 6.10.1
  • TOPIC_ID: Es el ID del tema de Pub/Sub.

Habilita la publicación de eventos en una instancia existente de Cloud Data Fusion

Actualiza el EventPublishConfig en una instancia existente de Cloud Data Fusion:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud
  • LOCATION: el location de tu proyecto
  • INSTANCE_ID: Es el ID de Cloud Data Fusion. instancia
  • TOPIC_ID: Es el ID del tema de Pub/Sub.

Quita la publicación de eventos de una instancia

Para quitar la publicación de eventos de una instancia, actualiza el el valor de enabled de publicación del evento en false:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

Crear funciones para leer mensajes de Pub/Sub

Las funciones de Cloud Run pueden leer mensajes de Pub/Sub y realizar acciones por ejemplo, reintentar las canalizaciones con errores. Para crear una función de Cloud Run, haz lo siguiente:

  1. En la consola de Google Cloud, ve a la página Funciones de Cloud Run.

    Ve a las funciones de Cloud Run

  2. Haz clic en Crear función.

  3. Ingresa un nombre de función y una región.

  4. En el campo Tipo de activador, selecciona Cloud Pub/Sub.

  5. Ingresa el ID del tema de Pub/Sub.

  6. Haz clic en Siguiente.

  7. Agrega funciones para leer los mensajes de Pub/Sub y tomar otras acciones. Por ejemplo, puedes agregar funciones para los siguientes casos de uso:

    • Envía alertas de fallas en las canalizaciones.
    • Enviar alertas de KPI, como el recuento de registros o la información de ejecución
    • Reinicia una canalización con errores que no se haya vuelto a ejecutar.

    Para ver ejemplos de funciones de Cloud Run, consulta la caso de uso.

  8. Haz clic en Implementar. Para obtener más información, consulta Implementa una función de Cloud Run.

Caso de uso: Documenta el estado de la canalización y vuelve a intentar las canalizaciones que fallaron

El siguiente ejemplo de lectura de Cloud Run Functions de Pub/Sub sobre el estado de ejecución de la canalización y, luego, reintentar canalizaciones con errores en Cloud Data Fusion.

Estas funciones de ejemplo hacen referencia a los siguientes componentes de Google Cloud:

  • Proyecto de Google Cloud: Es el proyecto en el que se crean las funciones de Cloud Run y los temas de Pub/Sub.
  • Tema de Pub/Sub: el tema de Pub/Sub vinculado al tu instancia de Cloud Data Fusion
  • Instancia de Cloud Data Fusion: Cloud Data Fusion en la que diseñas y ejecutas canalizaciones
  • Tabla de BigQuery: Es la tabla de BigQuery que captura el estado de la canalización y los detalles de la ejecución y la nueva ejecución.
  • Función de Cloud Run: la función de Cloud Run en la que realizas la implementación el código que reintenta canalizaciones con errores
  1. En el siguiente ejemplo de función de Cloud Run, se leen los mensajes de Pub/Sub sobre los eventos de estado de Cloud Data Fusion.

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. En la siguiente función de ejemplo, se crea y guarda una tabla de BigQuery, y se consultan los detalles de la ejecución de la canalización.

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. En la siguiente función de ejemplo, se verifican los flujos de trabajo que fallaron y si se volvieron a ejecutar en la última hora.

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. Si la canalización con errores no se ejecutó recientemente, la siguiente función de ejemplo vuelve a ejecutar la canalización con errores.

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

¿Qué sigue?