Cette page explique comment publier des événements de pipeline Cloud Data Fusion, tels que l'état du pipeline, dans des sujets Pub/Sub. Il décrit également comment créer Les fonctions Cloud Run qui traitent les messages Pub/Sub prendre des mesures, comme identifier et relancer les pipelines en échec.
Avant de commencer
- Créez un sujet sur lequel Pub/Sub peut publier des événements de pipeline Cloud Data Fusion.
Rôles requis
Pour vous assurer que le compte de service Cloud Data Fusion dispose des autorisations
les autorisations permettant de publier des événements de pipeline dans un sujet Pub/Sub ;
demandez à votre administrateur d'accorder au compte de service Cloud Data Fusion le
Éditeur Pub/Sub (roles/pubsub.publisher
) sur le projet dans lequel vous créez le sujet Pub/Sub.
Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.
Votre administrateur peut aussi attribuer au compte de service Cloud Data Fusion les autorisations requises à l'aide d'outils personnalisés rôles ou autres prédéfinis rôles.
Gérer la publication d'événements dans une instance Cloud Data Fusion
Vous pouvez gérer la publication d'événements dans Cloud Data Fusion nouveau et existant à l'aide de l'API REST dans les versions 6.7.0 et ultérieures.
Publier des événements dans une nouvelle instance
Créez une instance et incluez le champ EventPublishConfig
. Pour en savoir plus sur les champs obligatoires pour les nouvelles instances, consultez la documentation de référence sur la ressource Instances.
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Remplacez les éléments suivants :
PROJECT_ID
: ID de projet Google CloudLOCATION
: emplacement de votre projetINSTANCE_ID
: ID de votre instance Cloud Data FusionVERSION_NUMBER
: version de Cloud Data Fusion dans laquelle vous créez l'instance (par exemple,6.10.1
)TOPIC_ID
: ID du sujet Pub/Sub
Activer la publication d'événements dans une instance Cloud Data Fusion existante
Mettez à jour le champ EventPublishConfig
dans une instance Cloud Data Fusion existante :
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Remplacez les éléments suivants :
PROJECT_ID
: ID de projet Google CloudLOCATION
: emplacement de votre projetINSTANCE_ID
: ID de votre instance Cloud Data FusionTOPIC_ID
: ID du sujet Pub/Sub
Supprimer la publication d'événements d'une instance
Pour supprimer la publication d'événements d'une instance, mettez à jour
Publication de l'événement enabled
sur la valeur false
:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
Créer des fonctions pour lire les messages Pub/Sub
Les fonctions Cloud Run peuvent lire les messages Pub/Sub et y réagir, par exemple en réessayant les pipelines qui ont échoué. Pour créer des fonctions Cloud Run, effectuer les opérations suivantes:
Dans la console Google Cloud, accédez à la page Fonctions Cloud Run.
Cliquez sur Créer une fonction.
Saisissez un nom de fonction et une région.
Dans le champ Type de déclencheur, sélectionnez Cloud Pub/Sub.
Saisissez l'ID du sujet Pub/Sub.
Cliquez sur Suivant.
Ajoutez des fonctions pour lire les messages Pub/Sub et prendre d'autres actions. Par exemple, vous pouvez ajouter des fonctions pour les cas d'utilisation suivants :
- Envoyer des alertes en cas de défaillance du pipeline
- Envoyez des alertes pour des indicateurs clés de performance (KPI, par exemple) concernant le nombre d'enregistrements ou les informations d'exécution.
- Redémarrer un pipeline ayant échoué et qui n'a pas été réexécuté
Pour obtenir des exemples de fonctions Cloud Run, consultez la section Cas d'utilisation.
Cliquez sur Déployer. Pour en savoir plus, consultez Déployer une fonction Cloud Run.
Cas d'utilisation : État du pipeline de documents et nouvelle tentative de pipelines ayant échoué
Les exemples de fonctions Cloud Run suivants indiquent des messages Pub/Sub sur l'état d'exécution du pipeline, puis relancez en cas d'échec des pipelines dans Cloud Data Fusion.
Ces exemples de fonctions font référence aux composants Google Cloud suivants:
- Projet Google Cloud : projet dans lequel les fonctions Cloud Run et les sujets Pub/Sub sont créés
- Sujet Pub/Sub : sujet Pub/Sub associé à votre instance Cloud Data Fusion
- Instance Cloud Data Fusion : instance Cloud Data Fusion dans laquelle vous concevez et exécutez des pipelines.
- Table BigQuery: il s'agit de la table BigQuery qui capture l'état du pipeline, ainsi que les détails de l'exécution et de la réexécution
- Fonction Cloud Run : fonction Cloud Run dans laquelle vous déployez le code qui réessaie les pipelines ayant échoué.
L'exemple de fonction Cloud Run suivant lit les messages Pub/Sub sur les événements d'état de Cloud Data Fusion.
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")
L'exemple de fonction suivant crée et enregistre une table BigQuery, puis interroge les détails de l'exécution du pipeline.
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))
L'exemple de fonction suivant vérifie les pipelines en échec et si elles ont été réexécutées au cours de la dernière heure.
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = True
Si le pipeline ayant échoué n'a pas été exécuté récemment, l'exemple de fonction suivant réexécute le pipeline ayant échoué.
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1
Étape suivante
- Découvrez comment écrire des fonctions Cloud Run.