Surveiller l'état du pipeline

Cette page explique comment publier des événements de pipeline Cloud Data Fusion, tels que l'état du pipeline, dans des sujets Pub/Sub. Il explique également comment créer des fonctions Cloud Run qui traitent les messages Pub/Sub et effectuent des actions, telles que l'identification et la nouvelle tentative de pipelines ayant échoué.

Avant de commencer

  • Créez un sujet sur lequel Pub/Sub peut publier des événements de pipeline Cloud Data Fusion.

Rôles requis

Pour vous assurer que le compte de service Cloud Data Fusion dispose des autorisations nécessaires pour publier des événements de pipeline dans un sujet Pub/Sub, demandez à votre administrateur d'attribuer au compte de service Cloud Data Fusion le rôle IAM Éditeur Pub/Sub (roles/pubsub.publisher) sur le projet dans lequel vous créez le sujet Pub/Sub. Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.

Votre administrateur peut également attribuer au compte de service Cloud Data Fusion les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.

Gérer la publication d'événements dans une instance Cloud Data Fusion

Vous pouvez gérer la publication d'événements dans les instances Cloud Data Fusion nouvelles et existantes à l'aide de l'API REST dans les versions 6.7.0 et ultérieures.

Publier des événements dans une nouvelle instance

Créez une instance et incluez le champ EventPublishConfig. Pour en savoir plus sur les champs obligatoires pour les nouvelles instances, consultez la documentation de référence sur la ressource Instances.

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

Remplacez les éléments suivants :

  • PROJECT_ID: ID du Google Cloud projet
  • LOCATION: emplacement de votre projet
  • INSTANCE_ID: ID de votre instance Cloud Data Fusion
  • VERSION_NUMBER: version de Cloud Data Fusion dans laquelle vous créez l'instance (par exemple, 6.10.1)
  • TOPIC_ID: ID du sujet Pub/Sub

Activer la publication d'événements dans une instance Cloud Data Fusion existante

Mettez à jour le champ EventPublishConfig dans une instance Cloud Data Fusion existante:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

Remplacez les éléments suivants :

  • PROJECT_ID: ID du Google Cloud projet
  • LOCATION: emplacement de votre projet
  • INSTANCE_ID: ID de votre instance Cloud Data Fusion
  • TOPIC_ID: ID du sujet Pub/Sub

Supprimer la publication d'événements d'une instance

Pour supprimer la publication d'événements d'une instance, mettez à jour la valeur enabled de la publication d'événements sur false:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

Créer des fonctions pour lire les messages Pub/Sub

Les fonctions Cloud Run peuvent lire les messages Pub/Sub et y réagir, par exemple en réessayant les pipelines qui ont échoué. Pour créer des fonctions Cloud Run, procédez comme suit:

  1. Dans la console Google Cloud, accédez à la page Fonctions Cloud Run.

    Accéder aux fonctions Cloud Run

  2. Cliquez sur Créer une fonction.

  3. Saisissez un nom de fonction et une région.

  4. Dans le champ Type de déclencheur, sélectionnez Cloud Pub/Sub.

  5. Saisissez l'ID du sujet Pub/Sub.

  6. Cliquez sur Suivant.

  7. Ajoutez des fonctions pour lire les messages Pub/Sub et effectuer d'autres actions. Par exemple, vous pouvez ajouter des fonctions pour les cas d'utilisation suivants:

    • Envoyer des alertes en cas d'échecs du pipeline
    • Envoyez des alertes pour les KPI, comme le nombre d'enregistrements ou les informations d'exécution.
    • Redémarrer un pipeline ayant échoué et qui n'a pas été réexécuté

    Pour obtenir des exemples de fonctions Cloud Run, consultez la section Cas d'utilisation.

  8. Cliquez sur Déployer. Pour en savoir plus, consultez Déployer une fonction Cloud Run.

Cas d'utilisation: Documenter l'état du pipeline et réessayer les pipelines ayant échoué

L'exemple de fonctions Cloud Run suivant lit les messages Pub/Sub sur l'état d'exécution du pipeline, puis réessaie les pipelines ayant échoué dans Cloud Data Fusion.

Ces exemples de fonctions font référence aux composants Google Cloud suivants:

  • projetGoogle Cloud : projet dans lequel les fonctions Cloud Run et les sujets Pub/Sub sont créés
  • Sujet Pub/Sub: sujet Pub/Sub associé à votre instance Cloud Data Fusion
  • Instance Cloud Data Fusion: instance Cloud Data Fusion dans laquelle vous concevez et exécutez des pipelines.
  • Table BigQuery: table BigQuery qui capture l'état du pipeline, ainsi que les détails d'exécution et de nouvelle exécution
  • Fonction Cloud Run: fonction Cloud Run dans laquelle vous déployez le code qui réessaie les pipelines échoués.
  1. L'exemple de fonction Cloud Run suivant lit les messages Pub/Sub sur les événements d'état de Cloud Data Fusion.

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. L'exemple de fonction suivant crée et enregistre une table BigQuery, puis interroge les détails de l'exécution du pipeline.

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. L'exemple de fonction suivant recherche les pipelines ayant échoué et indique s'ils ont été réexécutés au cours de la dernière heure.

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. Si le pipeline ayant échoué n'a pas été exécuté récemment, l'exemple de fonction suivant réexécute le pipeline ayant échoué.

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

Étape suivante