En esta página, se describe cómo publicar eventos de canalizaciones de Cloud Data Fusion, como el estado de la canalización, en temas de Pub/Sub. También se describe cómo crear funciones de Cloud Run que procesan los mensajes de Pub/Sub y realizan acciones, como identificar y volver a intentar ejecutar las canalizaciones con errores.
Antes de comenzar
- Crea un tema en el que Pub/Sub pueda publicar eventos de la canalización de Cloud Data Fusion.
Roles requeridos
Para garantizar que la cuenta de servicio de Cloud Data Fusion tenga los permisos necesarios para publicar eventos de canalización en un tema de Pub/Sub, pídele a tu administrador que le otorgue a la cuenta de servicio de Cloud Data Fusion el rol de IAM de publicador de Pub/Sub (roles/pubsub.publisher
) en el proyecto en el que crees el tema de Pub/Sub.
Es posible que tu administrador también pueda otorgar a la cuenta de servicio de Cloud Data Fusion los permisos necesarios a través de roles personalizados o de otros roles predefinidos.
Administra la publicación de eventos en una instancia de Cloud Data Fusion
Puedes administrar la publicación de eventos en instancias nuevas y existentes de Cloud Data Fusion con la API de REST en las versiones 6.7.0 y posteriores.
Publica eventos en una instancia nueva
Crea una instancia nueva y, luego, incluye el campo EventPublishConfig
. Si quieres obtener más información sobre los campos obligatorios para las instancias nuevas, consulta la referencia del recurso Instances.
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Google CloudLOCATION
: La ubicación de tu proyectoINSTANCE_ID
: Es el ID de tu instancia de Cloud Data Fusion.VERSION_NUMBER
: La versión de Cloud Data Fusion en la que creas la instancia, por ejemplo,6.10.1
TOPIC_ID
: Es el ID del tema de Pub/Sub.
Habilita la publicación de eventos en una instancia existente de Cloud Data Fusion
Actualiza el campo EventPublishConfig
en una instancia existente de Cloud Data Fusion:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Google CloudLOCATION
: La ubicación de tu proyectoINSTANCE_ID
: Es el ID de tu instancia de Cloud Data Fusion.TOPIC_ID
: Es el ID del tema de Pub/Sub.
Cómo quitar la publicación de eventos de una instancia
Para quitar la publicación de eventos de una instancia, actualiza el valor enabled
de publicación de eventos a false
:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
Crea funciones para leer mensajes de Pub/Sub
Las funciones de Cloud Run pueden leer mensajes de Pub/Sub y actuar en consecuencia, por ejemplo, reintentar canalizaciones con errores. Para crear una Cloud Run Function, haz lo siguiente:
En la consola de Google Cloud , ve a la página de Cloud Run functions.
Haz clic en Crear función.
Ingresa un nombre de función y una región.
En el campo Tipo de activador, selecciona Cloud Pub/Sub.
Ingresa el ID del tema de Pub/Sub.
Haz clic en Siguiente.
Agrega funciones para leer los mensajes de Pub/Sub y realizar otras acciones. Por ejemplo, puedes agregar funciones para los siguientes casos de uso:
- Envía alertas sobre errores de canalización.
- Envía alertas sobre los KPI, como el recuento de registros o la información de ejecución.
- Reinicia una canalización fallida que no se volvió a ejecutar.
Para ver ejemplos de funciones de Cloud Run, consulta la sección de casos de uso.
Haz clic en Implementar. Para obtener más información, consulta Cómo implementar una función de Cloud Run Functions.
Caso de uso: Estado de la canalización de documentos y reintento de canalizaciones con errores
Las siguientes funciones de Cloud Run de ejemplo leen mensajes de Pub/Sub sobre el estado de ejecución de la canalización y, luego, vuelven a intentar ejecutar las canalizaciones con errores en Cloud Data Fusion.
Estas funciones de ejemplo hacen referencia a los siguientes componentes Google Cloud :
- Google Cloud project: Es el proyecto en el que se crean las funciones de Cloud Run y los temas de Pub/Sub.
- Tema de Pub/Sub: Es el tema de Pub/Sub vinculado a tu instancia de Cloud Data Fusion.
- Instancia de Cloud Data Fusion: Es la instancia de Cloud Data Fusion en la que diseñas y ejecutas canalizaciones.
- Tabla de BigQuery: Es la tabla de BigQuery que captura el estado de la canalización y los detalles de las ejecuciones y las reejecuciones.
- Cloud Run Function: Es la Cloud Run Function en la que implementas el código que vuelve a intentar ejecutar las canalizaciones con errores.
En el siguiente ejemplo de función de Cloud Run, se leen los mensajes de Pub/Sub sobre los eventos de estado de Cloud Data Fusion.
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")
La siguiente función de ejemplo crea y guarda una tabla de BigQuery, y consulta los detalles de la ejecución de la canalización.
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))
La siguiente función de ejemplo verifica si hubo canalizaciones con errores y si se volvieron a ejecutar en la última hora.
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = True
Si la canalización con errores no se ejecutó recientemente, la siguiente función de ejemplo vuelve a ejecutarla.
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1
¿Qué sigue?
- Obtén información para escribir Cloud Run Functions.