Surveiller l'état du pipeline

Cette page explique comment publier des événements de pipeline Cloud Data Fusion, tels que l'état du pipeline, dans des sujets Pub/Sub. Il décrit également comment créer Les fonctions Cloud Run qui traitent les messages Pub/Sub prendre des mesures, comme identifier et relancer les pipelines en échec.

Avant de commencer

  • Créez un sujet sur lequel Pub/Sub peut publier des événements de pipeline Cloud Data Fusion.

Rôles requis

Pour vous assurer que le compte de service Cloud Data Fusion dispose des autorisations les autorisations permettant de publier des événements de pipeline dans un sujet Pub/Sub ; demandez à votre administrateur d'accorder au compte de service Cloud Data Fusion le Éditeur Pub/Sub (roles/pubsub.publisher) sur le projet dans lequel vous créez le sujet Pub/Sub. Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.

Votre administrateur peut aussi attribuer au compte de service Cloud Data Fusion les autorisations requises à l'aide d'outils personnalisés rôles ou autres prédéfinis rôles.

Gérer la publication d'événements dans une instance Cloud Data Fusion

Vous pouvez gérer la publication d'événements dans Cloud Data Fusion nouveau et existant à l'aide de l'API REST dans les versions 6.7.0 et ultérieures.

Publier des événements dans une nouvelle instance

Créez une instance et incluez le champ EventPublishConfig. Pour en savoir plus sur les champs obligatoires pour les nouvelles instances, consultez la documentation de référence sur la ressource Instances.

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

Remplacez les éléments suivants :

  • PROJECT_ID : ID de projet Google Cloud
  • LOCATION : emplacement de votre projet
  • INSTANCE_ID : ID de votre instance Cloud Data Fusion
  • VERSION_NUMBER : version de Cloud Data Fusion dans laquelle vous créez l'instance (par exemple, 6.10.1)
  • TOPIC_ID : ID du sujet Pub/Sub

Activer la publication d'événements dans une instance Cloud Data Fusion existante

Mettez à jour le champ EventPublishConfig dans une instance Cloud Data Fusion existante :

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

Remplacez les éléments suivants :

  • PROJECT_ID : ID de projet Google Cloud
  • LOCATION : emplacement de votre projet
  • INSTANCE_ID : ID de votre instance Cloud Data Fusion
  • TOPIC_ID : ID du sujet Pub/Sub

Supprimer la publication d'événements d'une instance

Pour supprimer la publication d'événements d'une instance, mettez à jour Publication de l'événement enabled sur la valeur false:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

Créer des fonctions pour lire les messages Pub/Sub

Les fonctions Cloud Run peuvent lire les messages Pub/Sub et y réagir, par exemple en réessayant les pipelines qui ont échoué. Pour créer des fonctions Cloud Run, effectuer les opérations suivantes:

  1. Dans la console Google Cloud, accédez à la page Fonctions Cloud Run.

    Accéder aux fonctions Cloud Run

  2. Cliquez sur Créer une fonction.

  3. Saisissez un nom de fonction et une région.

  4. Dans le champ Type de déclencheur, sélectionnez Cloud Pub/Sub.

  5. Saisissez l'ID du sujet Pub/Sub.

  6. Cliquez sur Suivant.

  7. Ajoutez des fonctions pour lire les messages Pub/Sub et prendre d'autres actions. Par exemple, vous pouvez ajouter des fonctions pour les cas d'utilisation suivants :

    • Envoyer des alertes en cas de défaillance du pipeline
    • Envoyez des alertes pour des indicateurs clés de performance (KPI, par exemple) concernant le nombre d'enregistrements ou les informations d'exécution.
    • Redémarrer un pipeline ayant échoué et qui n'a pas été réexécuté

    Pour obtenir des exemples de fonctions Cloud Run, consultez la section Cas d'utilisation.

  8. Cliquez sur Déployer. Pour en savoir plus, consultez Déployer une fonction Cloud Run.

Cas d'utilisation : État du pipeline de documents et nouvelle tentative de pipelines ayant échoué

Les exemples de fonctions Cloud Run suivants indiquent des messages Pub/Sub sur l'état d'exécution du pipeline, puis relancez en cas d'échec des pipelines dans Cloud Data Fusion.

Ces exemples de fonctions font référence aux composants Google Cloud suivants:

  • Projet Google Cloud : projet dans lequel les fonctions Cloud Run et les sujets Pub/Sub sont créés
  • Sujet Pub/Sub : sujet Pub/Sub associé à votre instance Cloud Data Fusion
  • Instance Cloud Data Fusion : instance Cloud Data Fusion dans laquelle vous concevez et exécutez des pipelines.
  • Table BigQuery: il s'agit de la table BigQuery qui capture l'état du pipeline, ainsi que les détails de l'exécution et de la réexécution
  • Fonction Cloud Run : fonction Cloud Run dans laquelle vous déployez le code qui réessaie les pipelines ayant échoué.
  1. L'exemple de fonction Cloud Run suivant lit les messages Pub/Sub sur les événements d'état de Cloud Data Fusion.

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. L'exemple de fonction suivant crée et enregistre une table BigQuery, puis interroge les détails de l'exécution du pipeline.

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. L'exemple de fonction suivant vérifie les pipelines en échec et si elles ont été réexécutées au cours de la dernière heure.

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. Si le pipeline ayant échoué n'a pas été exécuté récemment, l'exemple de fonction suivant réexécute le pipeline ayant échoué.

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

Étape suivante