Monitora lo stato della pipeline

Questa pagina descrive come pubblicare eventi della pipeline Cloud Data Fusion, come lo stato della pipeline, gli argomenti Pub/Sub. Descrive inoltre come creare le funzioni di Cloud Run che elaborano i messaggi Pub/Sub eseguire azioni come identificare e riprovare le pipeline non riuscite.

Prima di iniziare

  • Crea un argomento in cui Pub/Sub può pubblicare eventi della pipeline di Cloud Data Fusion.

Ruoli obbligatori

Per garantire che l'account di servizio Cloud Data Fusion disponga dei necessari autorizzazioni per pubblicare eventi della pipeline in un argomento Pub/Sub, chiedi all'amministratore di concedere all'account di servizio Cloud Data Fusion la Ruolo IAM Pub/Sub Publisher (roles/pubsub.publisher) nel progetto in cui crei l'argomento Pub/Sub. Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.

L'amministratore potrebbe anche essere in grado di fornire l'account di servizio Cloud Data Fusion le autorizzazioni richieste tramite la ruoli o altri ruoli predefiniti ruoli.

Gestisci la pubblicazione di eventi in un'istanza Cloud Data Fusion

Puoi gestire la pubblicazione di eventi in Cloud Data Fusion nuovi ed esistenti che utilizzano l'API REST nelle versioni 6.7.0 e successive.

Pubblica eventi in una nuova istanza

Crea una nuova istanza e includi il campo EventPublishConfig. Per ulteriori informazioni informazioni sui campi obbligatori per le nuove istanze, consulta Risorsa istanze riferimento.

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud
  • LOCATION: la località del progetto
  • INSTANCE_ID: l'ID della tua istanza Cloud Data Fusion
  • VERSION_NUMBER: la versione di Cloud Data Fusion in cui crei l'istanza, ad esempio 6.10.1
  • TOPIC_ID: l'ID dell'argomento Pub/Sub

Abilita la pubblicazione di eventi in un'istanza Cloud Data Fusion esistente

Aggiorna il EventPublishConfig campo in un'istanza Cloud Data Fusion esistente:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud
  • LOCATION: il valore località del progetto
  • INSTANCE_ID: l'ID di Cloud Data Fusion istanza
  • TOPIC_ID: l'ID dell'argomento Pub/Sub

Rimuovi la pubblicazione di eventi da un'istanza

Per rimuovere la pubblicazione di eventi da un'istanza, aggiorna il valore enabled della pubblicazione di eventi su false:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

Creare funzioni per leggere i messaggi Pub/Sub

Le funzioni Cloud Run possono leggere i messaggi Pub/Sub e intervenire su di essi, ad esempio ripetere le pipeline non riuscite. Per creare una funzione Cloud Run, procedi nel seguente modo:

  1. Nella console Google Cloud, vai alla pagina Funzioni Cloud Run.

    Vai alle funzioni Cloud Run

  2. Fai clic su Crea funzione.

  3. Inserisci un nome di funzione e una regione.

  4. Nel campo Tipo di trigger, seleziona Cloud Pub/Sub.

  5. Inserisci l'ID argomento Pub/Sub.

  6. Fai clic su Avanti.

  7. Aggiungi funzioni per leggere i messaggi Pub/Sub e utilizzare altri azioni. Ad esempio, puoi aggiungere funzioni per i seguenti casi d'uso:

    • Invia avvisi per gli errori della pipeline.
    • Invia avvisi per i KPI, come il conteggio dei record o le informazioni sulle esecuzioni.
    • Riavvia una pipeline non riuscita che non è stata eseguita nuovamente.

    Per esempi di funzioni Cloud Run, consulta la sezione Caso d'uso.

  8. Fai clic su Esegui il deployment. Per ulteriori informazioni, consulta Eseguire il deployment di una funzione Cloud Run.

Caso d'uso: documentare lo stato della pipeline e riprovare le pipeline non riuscite

Le funzioni di Cloud Run di esempio seguenti leggono i messaggi Pub/Sub sullo stato di esecuzione della pipeline e poi riprova di pipeline non riuscite in Cloud Data Fusion.

Queste funzioni di esempio fanno riferimento ai seguenti componenti di Google Cloud:

  • Progetto Google Cloud: il progetto in cui Le funzioni Cloud Run e gli argomenti Pub/Sub sono creato
  • Argomento Pub/Sub: l'argomento Pub/Sub collegato la tua istanza Cloud Data Fusion
  • Istanza Cloud Data Fusion: Cloud Data Fusion in cui progetti ed esegui pipeline
  • Tabella BigQuery: la tabella BigQuery che acquisisce lo stato della pipeline e i dettagli di esecuzione e riesecuzione
  • Funzione Cloud Run: la funzione Cloud Run in cui esegui il deployment il codice che tenta di nuovo le pipeline non riuscite
  1. Il seguente esempio di funzione Cloud Run legge i messaggi Pub/Sub relativi agli eventi di stato di Cloud Data Fusion.

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. La funzione di esempio seguente crea e salva BigQuery ed esegue una query sui dettagli dell'esecuzione della pipeline.

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. La funzione di esempio seguente controlla le pipeline che non hanno avuto esito positivo se sono state eseguite nuovamente nell'ultima ora.

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. Se la pipeline in errore non è stata eseguita di recente, la funzione di esempio seguente esegue nuovamente la pipeline in errore.

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

Passaggi successivi