Esta página descreve como publicar eventos de pipeline do Cloud Data Fusion, como o status do pipeline, em tópicos do Pub/Sub. Também descreve como criar funções do Cloud Run que processam as mensagens do Pub/Sub e realizam ações, como identificar e tentar novamente pipelines com falhas.
Antes de começar
- Crie um tópico em que O Pub/Sub pode publicar eventos de pipeline do Cloud Data Fusion.
Funções exigidas
Para que a conta de serviço do Cloud Data Fusion tenha as
permissões para publicar eventos de pipeline em um tópico do Pub/Sub;
peça ao administrador para conceder à conta de serviço do Cloud Data Fusion
Papel do IAM de editor do Pub/Sub (roles/pubsub.publisher
) no projeto em que você cria o tópico do Pub/Sub.
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
O administrador também pode conceder à conta de serviço do Cloud Data Fusion as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.
Gerenciar a publicação de eventos em uma instância do Cloud Data Fusion
É possível gerenciar a publicação de eventos em instâncias novas e atuais do Cloud Data Fusion usando a API REST nas versões 6.7.0 e mais recentes.
Publicar eventos em uma nova instância
Crie uma nova instância e inclua o campo EventPublishConfig
. Para mais
informações sobre campos obrigatórios para novas instâncias, consulte a
referência do recurso de instâncias.
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Substitua:
PROJECT_ID
: o ID do projeto do Google CloudLOCATION
: o local do projeto.INSTANCE_ID
: o ID do Cloud Data Fusion instânciaVERSION_NUMBER
: a versão do Cloud Data Fusion em que você cria a instância, por exemplo,6.10.1
TOPIC_ID
: o ID do tópico do Pub/Sub
Ativar a publicação de eventos em uma instância atual do Cloud Data Fusion
Atualize o
EventPublishConfig
em uma instância do Cloud Data Fusion:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Substitua:
PROJECT_ID
: o ID do projeto do Google CloudLOCATION
: o local do seu projetoINSTANCE_ID
: o ID do Cloud Data Fusion instânciaTOPIC_ID
: o ID do tópico do Pub/Sub
Remover a publicação de eventos de uma instância
Para remover a publicação de eventos de uma instância, atualize o
valor de enabled
de publicação de eventos para false
:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
Criar funções para ler mensagens do Pub/Sub
As funções do Cloud Run podem ler mensagens do Pub/Sub e agir de acordo com elas, como tentar novamente pipelines com falhas. Para criar uma função do Cloud Run, faça o seguinte:
No console do Google Cloud, acesse a página das funções do Cloud Run.
Clique em Criar função.
Insira o nome e a região da função.
No campo Acionador, selecione Cloud Pub/Sub.
Insira o ID do tópico do Pub/Sub.
Clique em Próxima.
Adicione funções para ler as mensagens do Pub/Sub e realizar outras ações. Por exemplo, é possível adicionar funções para os seguintes casos de uso:
- Enviar alertas para falhas em pipelines.
- Enviar alertas para KPIs, como contagem de registros ou informações de execução.
- Reinicie um pipeline com falha que não foi executado novamente.
Para exemplos de função do Cloud Run, consulte a caso de uso.
Clique em Implantar. Para mais informações, consulte Implantar uma função do Cloud Run.
Caso de uso: documentar o status do pipeline e repetir pipelines com falha
O exemplo de funções do Cloud Run a seguir lê mensagens do Pub/Sub sobre o status de execução do pipeline e tenta novamente os pipelines com falha no Cloud Data Fusion.
Essas funções de exemplo se referem aos seguintes componentes do Google Cloud:
- Projeto do Google Cloud: o projeto em que As funções do Cloud Run e os tópicos do Pub/Sub estão criados
- Tópico do Pub/Sub: o tópico do Pub/Sub vinculado sua instância do Cloud Data Fusion
- Instância do Cloud Data Fusion: a instância do Cloud Data Fusion em que você projeta e executa pipelines.
- Tabela do BigQuery: a tabela do BigQuery que captura o status do pipeline e os detalhes de execução e reexecução.
- Função do Cloud Run: a função do Cloud Run em que você implanta o código que tenta novamente pipelines com falha.
O exemplo de função do Cloud Run a seguir lê Mensagens do Pub/Sub sobre o status do Cloud Data Fusion eventos.
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")
A função de exemplo a seguir cria e salva uma tabela do BigQuery e consulta os detalhes da execução do pipeline.
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))
A função de exemplo a seguir verifica se há pipelines com falha e se foram executados novamente na última hora.
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = True
Se o pipeline com falha não tiver sido executado recentemente, o exemplo de função a seguir executa novamente o pipeline com falha.
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1
A seguir
- Saiba como escrever funções do Cloud Run.