Cette page explique comment publier des événements de pipeline Cloud Data Fusion, tels que l'état du pipeline, dans des sujets Pub/Sub. Il explique également comment créer des fonctions Cloud Run qui traitent les messages Pub/Sub et effectuent des actions, comme identifier et relancer les pipelines ayant échoué.
Avant de commencer
- Créez un sujet dans lequel Pub/Sub peut publier les événements de pipeline Cloud Data Fusion.
Rôles requis
Pour vous assurer que le compte de service Cloud Data Fusion dispose des autorisations nécessaires pour publier des événements de pipeline dans un sujet Pub/Sub, demandez à votre administrateur d'attribuer au compte de service Cloud Data Fusion le rôle IAM Éditeur Pub/Sub (roles/pubsub.publisher
) sur le projet dans lequel vous créez le sujet Pub/Sub.
Votre administrateur peut également attribuer au compte de service Cloud Data Fusion les autorisations requises à l'aide de rôles personnalisés ou d'autres rôles prédéfinis.
Gérer la publication d'événements dans une instance Cloud Data Fusion
Vous pouvez gérer la publication d'événements dans les instances Cloud Data Fusion nouvelles et existantes à l'aide de l'API REST dans les versions 6.7.0 et ultérieures.
Publier des événements dans une nouvelle instance
Créez une instance et incluez le champ EventPublishConfig
. Pour en savoir plus sur les champs obligatoires pour les nouvelles instances, consultez la documentation de référence sur la ressource Instances.
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google CloudLOCATION
: emplacement de votre projetINSTANCE_ID
: ID de votre instance Cloud Data FusionVERSION_NUMBER
: version de Cloud Data Fusion dans laquelle vous créez l'instance (par exemple,6.10.1
)TOPIC_ID
: ID du sujet Pub/Sub
Activer la publication d'événements dans une instance Cloud Data Fusion existante
Mettez à jour le champ EventPublishConfig
dans une instance Cloud Data Fusion existante :
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google CloudLOCATION
: emplacement de votre projetINSTANCE_ID
: ID de votre instance Cloud Data FusionTOPIC_ID
: ID du sujet Pub/Sub
Supprimer la publication d'événements d'une instance
Pour supprimer la publication d'événements d'une instance, définissez la valeur enabled
de publication d'événements sur false
:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
Créer des fonctions pour lire les messages Pub/Sub
Les fonctions Cloud Run peuvent lire les messages Pub/Sub et agir en conséquence, par exemple en relançant les pipelines ayant échoué. Pour créer une fonction Cloud Run :
Dans la console Google Cloud , accédez à la page Fonctions Cloud Run.
Cliquez sur Créer une fonction.
Saisissez un nom de fonction et une région.
Dans le champ Type de déclencheur, sélectionnez Cloud Pub/Sub.
Saisissez l'ID du sujet Pub/Sub.
Cliquez sur Suivant.
Ajoutez des fonctions pour lire les messages Pub/Sub et effectuer d'autres actions. Par exemple, vous pouvez ajouter des fonctions pour les cas d'utilisation suivants :
- Envoyez des alertes en cas d'échec de pipeline.
- Envoyer des alertes pour les KPI, comme le nombre d'enregistrements ou les informations sur l'exécution.
- Redémarrez un pipeline ayant échoué et qui n'a pas été réexécuté.
Pour obtenir des exemples de fonctions Cloud Run, consultez la section Cas d'utilisation.
Cliquez sur Déployer. Pour en savoir plus, consultez Déployer une fonction Cloud Run.
Cas d'utilisation : Vérifier l'état du pipeline de documents et relancer les pipelines ayant échoué
Les fonctions Cloud Run suivantes lisent les messages Pub/Sub sur l'état d'exécution du pipeline, puis relancent les pipelines ayant échoué dans Cloud Data Fusion.
Ces exemples de fonctions font référence aux composants Google Cloud suivants :
- Google Cloud project : projet dans lequel les fonctions Cloud Run et les sujets Pub/Sub sont créés
- Sujet Pub/Sub : sujet Pub/Sub associé à votre instance Cloud Data Fusion
- Instance Cloud Data Fusion : instance Cloud Data Fusion dans laquelle vous concevez et exécutez des pipelines
- Table BigQuery : table BigQuery qui capture l'état du pipeline, ainsi que les détails de l'exécution et de la réexécution
- Fonction Cloud Run : fonction Cloud Run dans laquelle vous déployez le code qui relance les pipelines ayant échoué
L'exemple de fonction Cloud Run suivant lit les messages Pub/Sub concernant les événements d'état Cloud Data Fusion.
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")
L'exemple de fonction suivant crée et enregistre une table BigQuery, puis interroge les détails de l'exécution du pipeline.
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))
L'exemple de fonction suivant vérifie si des pipelines ont échoué et s'ils ont été réexécutés au cours de la dernière heure.
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = True
Si le pipeline ayant échoué n'a pas été exécuté récemment, l'exemple de fonction suivant le réexécute.
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1
Étapes suivantes
- Découvrez comment écrire des fonctions Cloud Run.