Esta página descreve como publicar eventos de pipelines do Cloud Data Fusion, como o estado do pipeline, em tópicos do Pub/Sub. Também descreve como criar funções do Cloud Run que processam as mensagens do Pub/Sub e tomam medidas, como identificar e repetir pipelines com falhas.
Antes de começar
- Crie um tópico onde o Pub/Sub possa publicar eventos de pipelines do Cloud Data Fusion.
Funções necessárias
Para garantir que a conta de serviço do Cloud Data Fusion tem as autorizações necessárias para publicar eventos de pipeline num tópico do Pub/Sub,
peça ao seu administrador para conceder à conta de serviço do Cloud Data Fusion a função do IAM
Publicador do Pub/Sub (roles/pubsub.publisher
)
no projeto onde cria o tópico do Pub/Sub.
O seu administrador também pode conceder à conta de serviço do Cloud Data Fusion as autorizações necessárias através de funções personalizadas ou outras funções predefinidas.
Faça a gestão da publicação de eventos numa instância do Cloud Data Fusion
Pode gerir a publicação de eventos em instâncias novas e existentes do Cloud Data Fusion através da API REST nas versões 6.7.0 e posteriores.
Publicar eventos numa nova instância
Crie uma nova instância e inclua o campo EventPublishConfig
. Para mais
informações sobre os campos obrigatórios para novas instâncias, consulte a referência do
recurso Instances.
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Substitua o seguinte:
PROJECT_ID
: o Google Cloud ID do projetoLOCATION
: a localização do seu projetoINSTANCE_ID
: o ID da sua instância do Cloud Data FusionVERSION_NUMBER
: a versão do Cloud Data Fusion onde cria a instância, por exemplo,6.10.1
TOPIC_ID
: o ID do tópico do Pub/Sub
Ative a publicação de eventos numa instância do Cloud Data Fusion existente
Atualize o campo
EventPublishConfig
numa instância existente do Cloud Data Fusion:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Substitua o seguinte:
PROJECT_ID
: o Google Cloud ID do projetoLOCATION
: a localização do seu projetoINSTANCE_ID
: o ID da sua instância do Cloud Data FusionTOPIC_ID
: o ID do tópico do Pub/Sub
Remova a publicação de eventos de uma instância
Para remover a publicação de eventos de uma instância, atualize o valor de enabled
publicação de eventosfalse
para false
:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
Crie funções para ler mensagens do Pub/Sub
As funções do Cloud Run podem ler mensagens do Pub/Sub e agir em conformidade, como repetir pipelines com falhas. Para criar funções do Cloud Run, faça o seguinte:
Na Google Cloud consola, aceda à página Funções do Cloud Run.
Clique em Criar função.
Introduza um nome de função e uma região.
No campo Tipo de acionador, selecione Cloud Pub/Sub.
Introduza o ID do tópico Pub/Sub.
Clicar em Seguinte.
Adicione funções para ler as mensagens do Pub/Sub e realizar outras ações. Por exemplo, pode adicionar funções para os seguintes exemplos de utilização:
- Enviar alertas de falhas de pipelines.
- Enviar alertas para KPIs, como a contagem de registos ou informações de execução.
- Reinicie um pipeline com falhas que não tenha sido executado novamente.
Para ver exemplos de funções do Cloud Run, consulte a secção Exemplo de utilização.
Clique em Implementar. Para mais informações, consulte o artigo Implemente uma função do Cloud Run.
Exemplo de utilização: documentar o estado do pipeline e repetir pipelines com falhas
As seguintes funções do Cloud Run leem mensagens do Pub/Sub sobre o estado de execução do pipeline e, em seguida, repetem os pipelines com falhas no Cloud Data Fusion.
Estas funções de exemplo referem-se aos seguintes Google Cloud componentes:
- Google Cloud project: o projeto onde as funções do Cloud Run e os tópicos do Pub/Sub são criados
- Tópico do Pub/Sub: o tópico do Pub/Sub associado à sua instância do Cloud Data Fusion
- Instância do Cloud Data Fusion: a instância do Cloud Data Fusion onde cria e executa pipelines
- Tabela do BigQuery: a tabela do BigQuery que captura o estado do pipeline e os detalhes de execução e nova execução
- Função do Cloud Run: a função do Cloud Run onde implementa o código que tenta novamente os pipelines com falhas
O exemplo de função do Cloud Run seguinte lê as mensagens do Pub/Sub sobre eventos de estado do Cloud Data Fusion.
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")
A função de exemplo seguinte cria e guarda uma tabela do BigQuery e consulta os detalhes da execução do pipeline.
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))
A função de exemplo seguinte verifica se existem pipelines que falharam e se foram executados novamente na última hora.
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = True
Se o pipeline com falhas não tiver sido executado recentemente, a função de exemplo seguinte volta a executar o pipeline com falhas.
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1
O que se segue?
- Saiba como escrever funções do Cloud Run.