Questa pagina descrive come pubblicare eventi della pipeline Cloud Data Fusion, ad esempio lo stato della pipeline, negli argomenti Pub/Sub. Descrive inoltre come creare funzioni Cloud Run che elaborano i messaggi Pub/Sub e intraprendono azioni, ad esempio identificare e riprovare le pipeline non riuscite.
Prima di iniziare
- Crea un argomento in cui Pub/Sub può pubblicare gli eventi della pipeline Cloud Data Fusion.
Ruoli obbligatori
Per assicurarti che l'account di servizio Cloud Data Fusion disponga delle autorizzazioni necessarie per pubblicare eventi della pipeline in un argomento Pub/Sub,
chiedi all'amministratore di concedere all'account di servizio Cloud Data Fusion il ruolo IAM Publisher Pub/Sub (roles/pubsub.publisher
) nel progetto in cui crei l'argomento Pub/Sub.
L'amministratore potrebbe anche essere in grado di concedere all'account di servizio Cloud Data Fusion le autorizzazioni richieste tramite ruoli personalizzati o altri ruoli predefiniti.
Gestire la pubblicazione di eventi in un'istanza Cloud Data Fusion
Puoi gestire la pubblicazione di eventi nelle istanze Cloud Data Fusion nuove ed esistenti utilizzando l'API REST nelle versioni 6.7.0 e successive.
Pubblicare gli eventi in una nuova istanza
Crea una nuova istanza e includi il campo EventPublishConfig
. Per ulteriori informazioni sui campi obbligatori per le nuove istanze, consulta la documentazione di riferimento della risorsa Istanze.
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Sostituisci quanto segue:
PROJECT_ID
: l' Google Cloud ID progettoLOCATION
: la posizione del progettoINSTANCE_ID
: l'ID della tua istanza Cloud Data FusionVERSION_NUMBER
: la versione di Cloud Data Fusion in cui crei l'istanza, ad esempio6.10.1
TOPIC_ID
: l'ID dell'argomento Pub/Sub
Attivare la pubblicazione di eventi in un'istanza Cloud Data Fusion esistente
Aggiorna il campo EventPublishConfig
in un'istanza Cloud Data Fusion esistente:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Sostituisci quanto segue:
PROJECT_ID
: l' Google Cloud ID progettoLOCATION
: la posizione del progettoINSTANCE_ID
: l'ID della tua istanza Cloud Data FusionTOPIC_ID
: l'ID dell'argomento Pub/Sub
Rimuovere la pubblicazione di eventi da un'istanza
Per rimuovere la pubblicazione di eventi da un'istanza, aggiorna il valore enabled
della pubblicazione di eventi su false
:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
Creare funzioni per leggere i messaggi Pub/Sub
Le funzioni Cloud Run possono leggere i messaggi Pub/Sub e intervenire su di essi, ad esempio ripetere le pipeline non riuscite. Per creare una funzione Cloud Run:
Nella console Google Cloud, vai alla pagina Funzioni Cloud Run.
Fai clic su Crea funzione.
Inserisci un nome di funzione e una regione.
Nel campo Tipo di trigger, seleziona Cloud Pub/Sub.
Inserisci l'ID argomento Pub/Sub.
Fai clic su Avanti.
Aggiungi funzioni per leggere i messaggi Pub/Sub ed eseguire altre azioni. Ad esempio, puoi aggiungere funzioni per i seguenti casi d'uso:
- Invia avvisi per gli errori della pipeline.
- Invia avvisi per i KPI, ad esempio il conteggio dei record o le informazioni sulle esecuzioni.
- Riavviare una pipeline non riuscita che non è stata eseguita di nuovo.
Per esempi di funzioni Cloud Run, consulta la sezione relativa ai casi d'uso.
Fai clic su Esegui il deployment. Per saperne di più, consulta Eseguire il deployment di una funzione Cloud Run.
Caso d'uso: documentare lo stato della pipeline e riprovare le pipeline non riuscite
Le seguenti funzioni Cloud Run di esempio leggono i messaggi Pub/Sub relativi allo stato di esecuzione della pipeline e poi riprovano le pipeline non riuscite in Cloud Data Fusion.
Queste funzioni di esempio fanno riferimento ai seguenti Google Cloud componenti:
- Google Cloud project: il progetto in cui vengono create le funzioni Cloud Run e gli argomenti Pub/Sub
- Argomento Pub/Sub: l'argomento Pub/Sub collegato alla tua istanza Cloud Data Fusion
- Istanza Cloud Data Fusion: l'istanza Cloud Data Fusion in cui progetti ed esegui le pipeline
- Tabella BigQuery: la tabella BigQuery che acquisisce lo stato della pipeline e i dettagli di esecuzione e di nuova esecuzione
- Funzione Cloud Run: la funzione Cloud Run in cui esegui il deployment del codice che esegue nuovamente le pipeline non riuscite
Il seguente esempio di funzione Cloud Run legge i messaggi Pub/Sub relativi agli eventi di stato di Cloud Data Fusion.
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")
La seguente funzione di esempio crea e salva una tabella BigQuery ed esegue query sui dettagli dell'esecuzione della pipeline.
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))
La seguente funzione di esempio controlla le pipeline che non sono riuscite e se sono state eseguite di nuovo nell'ultima ora.
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = True
Se la pipeline non riuscita non è stata eseguita di recente, la seguente funzione di esempio la esegue di nuovo.
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1
Passaggi successivi
- Scopri come scrivere funzioni Cloud Run.