Monitorare lo stato della pipeline

Questa pagina descrive come pubblicare eventi della pipeline Cloud Data Fusion, ad esempio lo stato della pipeline, negli argomenti Pub/Sub. Descrive inoltre come creare funzioni Cloud Run che elaborano i messaggi Pub/Sub ed eseguono azioni, ad esempio identificare e riprovare le pipeline non riuscite.

Prima di iniziare

  • Crea un argomento in cui Pub/Sub può pubblicare gli eventi della pipeline Cloud Data Fusion.

Ruoli obbligatori

Per assicurarti che il service account Cloud Data Fusion disponga delle autorizzazioni necessarie per pubblicare eventi della pipeline in un argomento Pub/Sub, chiedi all'amministratore di concedere al service account Cloud Data Fusion il ruolo IAM Pub/Sub Publisher (roles/pubsub.publisher) nel progetto in cui crei l'argomento Pub/Sub.

Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

L'amministratore potrebbe anche assegnare al service account Cloud Data Fusion le autorizzazioni richieste tramite ruoli personalizzati o altri ruoli predefiniti.

Gestire la pubblicazione di eventi in un'istanza Cloud Data Fusion

Puoi gestire la pubblicazione di eventi nelle istanze Cloud Data Fusion nuove ed esistenti utilizzando l'API REST nelle versioni 6.7.0 e successive.

Pubblicare eventi in una nuova istanza

Crea una nuova istanza e includi il campo EventPublishConfig. Per saperne di più sui campi obbligatori per le nuove istanze, consulta il riferimento Risorsa Instances.

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

Sostituisci quanto segue:

  • PROJECT_ID: l' Google Cloud ID progetto
  • LOCATION: la posizione del progetto
  • INSTANCE_ID: l'ID della tua istanza Cloud Data Fusion
  • VERSION_NUMBER: la versione di Cloud Data Fusion in cui crei l'istanza, ad esempio 6.10.1
  • TOPIC_ID: l'ID dell'argomento Pub/Sub

Abilitare la pubblicazione di eventi in un'istanza Cloud Data Fusion esistente

Aggiorna il campo EventPublishConfig in un'istanza Cloud Data Fusion esistente:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

Sostituisci quanto segue:

  • PROJECT_ID: l' Google Cloud ID progetto
  • LOCATION: la posizione del progetto
  • INSTANCE_ID: l'ID della tua istanza Cloud Data Fusion
  • TOPIC_ID: l'ID dell'argomento Pub/Sub

Rimuovere la pubblicazione di eventi da un'istanza

Per rimuovere la pubblicazione di eventi da un'istanza, aggiorna il valore di pubblicazione di eventi enabled a false:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

Crea funzioni per leggere i messaggi Pub/Sub

Le funzioni Cloud Run possono leggere i messaggi Pub/Sub e agire di conseguenza, ad esempio riprovando le pipeline non riuscite. Per creare una funzione Cloud Run, svolgi le seguenti operazioni:

  1. Nella console Google Cloud , vai alla pagina Cloud Run Functions.

    Vai alle funzioni Cloud Run

  2. Fai clic su Crea funzione.

  3. Inserisci un nome di funzione e una regione.

  4. Nel campo Tipo di trigger, seleziona Cloud Pub/Sub.

  5. Inserisci l'ID argomento Pub/Sub.

  6. Fai clic su Avanti.

  7. Aggiungi funzioni per leggere i messaggi Pub/Sub ed eseguire altre azioni. Ad esempio, puoi aggiungere funzioni per i seguenti casi d'uso:

    • Invia avvisi per gli errori della pipeline.
    • Invia avvisi per gli indicatori chiave di prestazione, ad esempio il conteggio dei record o le informazioni sulle corse.
    • Riavvia una pipeline non riuscita che non è stata eseguita di nuovo.

    Per esempi di funzioni Cloud Run, consulta la sezione casi d'uso.

  8. Fai clic su Esegui il deployment. Per saperne di più, consulta Eseguire il deployment di una funzione Cloud Run.

Caso d'uso: documentare lo stato della pipeline e riprovare le pipeline non riuscite

Le seguenti funzioni Cloud Run di esempio leggono i messaggi Pub/Sub sullo stato di esecuzione della pipeline e poi riprovano a eseguire le pipeline non riuscite in Cloud Data Fusion.

Queste funzioni di esempio fanno riferimento ai seguenti componenti Google Cloud :

  • Google Cloud project: il progetto in cui vengono creati gli argomenti Pub/Sub e le funzioni Cloud Run
  • Argomento Pub/Sub: l'argomento Pub/Sub collegato alla tua istanza Cloud Data Fusion
  • Istanza Cloud Data Fusion: l'istanza Cloud Data Fusion in cui progettare ed eseguire le pipeline
  • Tabella BigQuery: la tabella BigQuery che acquisisce lo stato della pipeline e i dettagli dell'esecuzione e della ripetizione
  • Funzione Cloud Run: la funzione Cloud Run in cui esegui il deployment del codice che riprova le pipeline non riuscite
  1. Il seguente esempio di funzione Cloud Run legge i messaggi Pub/Sub sugli eventi di stato di Cloud Data Fusion.

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. La seguente funzione di esempio crea e salva una tabella BigQuery ed esegue query sui dettagli di esecuzione della pipeline.

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. La seguente funzione di esempio controlla le pipeline non riuscite e se sono state eseguite di nuovo nell'ultima ora.

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. Se la pipeline non è stata eseguita di recente, la seguente funzione di esempio riesegue la pipeline non riuscita.

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

Passaggi successivi