Cette page explique comment publier des événements de pipeline Cloud Data Fusion, tels que l'état du pipeline, dans des sujets Pub/Sub. Il explique également comment créer des fonctions Cloud Run qui traitent les messages Pub/Sub et effectuent des actions, telles que l'identification et la nouvelle tentative de pipelines ayant échoué.
Avant de commencer
- Créez un sujet sur lequel Pub/Sub peut publier des événements de pipeline Cloud Data Fusion.
Rôles requis
Pour vous assurer que le compte de service Cloud Data Fusion dispose des autorisations nécessaires pour publier des événements de pipeline dans un sujet Pub/Sub, demandez à votre administrateur d'attribuer au compte de service Cloud Data Fusion le rôle IAM Éditeur Pub/Sub (roles/pubsub.publisher
) sur le projet dans lequel vous créez le sujet Pub/Sub.
Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.
Votre administrateur peut également attribuer au compte de service Cloud Data Fusion les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.
Gérer la publication d'événements dans une instance Cloud Data Fusion
Vous pouvez gérer la publication d'événements dans les instances Cloud Data Fusion nouvelles et existantes à l'aide de l'API REST dans les versions 6.7.0 et ultérieures.
Publier des événements dans une nouvelle instance
Créez une instance et incluez le champ EventPublishConfig
. Pour en savoir plus sur les champs obligatoires pour les nouvelles instances, consultez la documentation de référence sur la ressource Instances.
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Remplacez les éléments suivants :
PROJECT_ID
: ID du Google Cloud projetLOCATION
: emplacement de votre projetINSTANCE_ID
: ID de votre instance Cloud Data FusionVERSION_NUMBER
: version de Cloud Data Fusion dans laquelle vous créez l'instance (par exemple,6.10.1
)TOPIC_ID
: ID du sujet Pub/Sub
Activer la publication d'événements dans une instance Cloud Data Fusion existante
Mettez à jour le champ EventPublishConfig
dans une instance Cloud Data Fusion existante:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Remplacez les éléments suivants :
PROJECT_ID
: ID du Google Cloud projetLOCATION
: emplacement de votre projetINSTANCE_ID
: ID de votre instance Cloud Data FusionTOPIC_ID
: ID du sujet Pub/Sub
Supprimer la publication d'événements d'une instance
Pour supprimer la publication d'événements d'une instance, mettez à jour la valeur enabled
de la publication d'événements sur false
:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
Créer des fonctions pour lire les messages Pub/Sub
Les fonctions Cloud Run peuvent lire les messages Pub/Sub et y réagir, par exemple en réessayant les pipelines qui ont échoué. Pour créer des fonctions Cloud Run, procédez comme suit:
Dans la console Google Cloud, accédez à la page Fonctions Cloud Run.
Cliquez sur Créer une fonction.
Saisissez un nom de fonction et une région.
Dans le champ Type de déclencheur, sélectionnez Cloud Pub/Sub.
Saisissez l'ID du sujet Pub/Sub.
Cliquez sur Suivant.
Ajoutez des fonctions pour lire les messages Pub/Sub et effectuer d'autres actions. Par exemple, vous pouvez ajouter des fonctions pour les cas d'utilisation suivants:
- Envoyer des alertes en cas d'échecs du pipeline
- Envoyez des alertes pour les KPI, comme le nombre d'enregistrements ou les informations d'exécution.
- Redémarrer un pipeline ayant échoué et qui n'a pas été réexécuté
Pour obtenir des exemples de fonctions Cloud Run, consultez la section Cas d'utilisation.
Cliquez sur Déployer. Pour en savoir plus, consultez Déployer une fonction Cloud Run.
Cas d'utilisation: Documenter l'état du pipeline et réessayer les pipelines ayant échoué
L'exemple de fonctions Cloud Run suivant lit les messages Pub/Sub sur l'état d'exécution du pipeline, puis réessaie les pipelines ayant échoué dans Cloud Data Fusion.
Ces exemples de fonctions font référence aux composants Google Cloud suivants:
- projetGoogle Cloud : projet dans lequel les fonctions Cloud Run et les sujets Pub/Sub sont créés
- Sujet Pub/Sub: sujet Pub/Sub associé à votre instance Cloud Data Fusion
- Instance Cloud Data Fusion: instance Cloud Data Fusion dans laquelle vous concevez et exécutez des pipelines.
- Table BigQuery: table BigQuery qui capture l'état du pipeline, ainsi que les détails d'exécution et de nouvelle exécution
- Fonction Cloud Run: fonction Cloud Run dans laquelle vous déployez le code qui réessaie les pipelines échoués.
L'exemple de fonction Cloud Run suivant lit les messages Pub/Sub sur les événements d'état de Cloud Data Fusion.
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")
L'exemple de fonction suivant crée et enregistre une table BigQuery, puis interroge les détails de l'exécution du pipeline.
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))
L'exemple de fonction suivant recherche les pipelines ayant échoué et indique s'ils ont été réexécutés au cours de la dernière heure.
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = True
Si le pipeline ayant échoué n'a pas été exécuté récemment, l'exemple de fonction suivant réexécute le pipeline ayant échoué.
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1
Étape suivante
- Découvrez comment écrire des fonctions Cloud Run.