Questa pagina descrive come pubblicare eventi della pipeline Cloud Data Fusion, come lo stato della pipeline, gli argomenti Pub/Sub. Descrive inoltre come creare le funzioni di Cloud Run che elaborano i messaggi Pub/Sub eseguire azioni come identificare e riprovare le pipeline non riuscite.
Prima di iniziare
- Crea un argomento in cui Pub/Sub può pubblicare eventi della pipeline di Cloud Data Fusion.
Ruoli obbligatori
Per garantire che l'account di servizio Cloud Data Fusion disponga dei necessari
autorizzazioni per pubblicare eventi
della pipeline in un argomento Pub/Sub,
chiedi all'amministratore di concedere all'account di servizio Cloud Data Fusion la
Ruolo IAM Pub/Sub Publisher (roles/pubsub.publisher
) nel progetto in cui crei l'argomento Pub/Sub.
Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.
L'amministratore potrebbe anche essere in grado di fornire l'account di servizio Cloud Data Fusion le autorizzazioni richieste tramite la ruoli o altri ruoli predefiniti ruoli.
Gestisci la pubblicazione di eventi in un'istanza Cloud Data Fusion
Puoi gestire la pubblicazione di eventi in Cloud Data Fusion nuovi ed esistenti che utilizzano l'API REST nelle versioni 6.7.0 e successive.
Pubblica eventi in una nuova istanza
Crea una nuova istanza e includi il campo EventPublishConfig
. Per ulteriori informazioni
informazioni sui campi obbligatori per le nuove istanze, consulta
Risorsa istanze
riferimento.
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Sostituisci quanto segue:
PROJECT_ID
: l'ID progetto Google CloudLOCATION
: la località del progettoINSTANCE_ID
: l'ID della tua istanza Cloud Data FusionVERSION_NUMBER
: la versione di Cloud Data Fusion in cui crei l'istanza, ad esempio6.10.1
TOPIC_ID
: l'ID dell'argomento Pub/Sub
Abilita la pubblicazione di eventi in un'istanza Cloud Data Fusion esistente
Aggiorna il
EventPublishConfig
campo in un'istanza Cloud Data Fusion esistente:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google CloudLOCATION
: il valore località del progettoINSTANCE_ID
: l'ID di Cloud Data Fusion istanzaTOPIC_ID
: l'ID dell'argomento Pub/Sub
Rimuovi la pubblicazione di eventi da un'istanza
Per rimuovere la pubblicazione di eventi da un'istanza, aggiorna il valore enabled
della pubblicazione di eventi su false
:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
Creare funzioni per leggere i messaggi Pub/Sub
Le funzioni Cloud Run possono leggere i messaggi Pub/Sub e intervenire su di essi, ad esempio ripetere le pipeline non riuscite. Per creare una funzione Cloud Run, procedi nel seguente modo:
Nella console Google Cloud, vai alla pagina Funzioni Cloud Run.
Fai clic su Crea funzione.
Inserisci un nome di funzione e una regione.
Nel campo Tipo di trigger, seleziona Cloud Pub/Sub.
Inserisci l'ID argomento Pub/Sub.
Fai clic su Avanti.
Aggiungi funzioni per leggere i messaggi Pub/Sub e utilizzare altri azioni. Ad esempio, puoi aggiungere funzioni per i seguenti casi d'uso:
- Invia avvisi per gli errori della pipeline.
- Invia avvisi per i KPI, come il conteggio dei record o le informazioni sulle esecuzioni.
- Riavvia una pipeline non riuscita che non è stata eseguita nuovamente.
Per esempi di funzioni Cloud Run, consulta la sezione Caso d'uso.
Fai clic su Esegui il deployment. Per ulteriori informazioni, consulta Eseguire il deployment di una funzione Cloud Run.
Caso d'uso: documentare lo stato della pipeline e riprovare le pipeline non riuscite
Le funzioni di Cloud Run di esempio seguenti leggono i messaggi Pub/Sub sullo stato di esecuzione della pipeline e poi riprova di pipeline non riuscite in Cloud Data Fusion.
Queste funzioni di esempio fanno riferimento ai seguenti componenti di Google Cloud:
- Progetto Google Cloud: il progetto in cui Le funzioni Cloud Run e gli argomenti Pub/Sub sono creato
- Argomento Pub/Sub: l'argomento Pub/Sub collegato la tua istanza Cloud Data Fusion
- Istanza Cloud Data Fusion: Cloud Data Fusion in cui progetti ed esegui pipeline
- Tabella BigQuery: la tabella BigQuery che acquisisce lo stato della pipeline e i dettagli di esecuzione e riesecuzione
- Funzione Cloud Run: la funzione Cloud Run in cui esegui il deployment il codice che tenta di nuovo le pipeline non riuscite
Il seguente esempio di funzione Cloud Run legge i messaggi Pub/Sub relativi agli eventi di stato di Cloud Data Fusion.
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")
La funzione di esempio seguente crea e salva BigQuery ed esegue una query sui dettagli dell'esecuzione della pipeline.
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))
La funzione di esempio seguente controlla le pipeline che non hanno avuto esito positivo se sono state eseguite nuovamente nell'ultima ora.
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = True
Se la pipeline in errore non è stata eseguita di recente, la funzione di esempio seguente esegue nuovamente la pipeline in errore.
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1
Passaggi successivi
- Scopri come scrivere funzioni di Cloud Run.