Monitorize o estado da pipeline

Esta página descreve como publicar eventos de pipelines do Cloud Data Fusion, como o estado do pipeline, em tópicos do Pub/Sub. Também descreve como criar funções do Cloud Run que processam as mensagens do Pub/Sub e tomam medidas, como identificar e repetir pipelines com falhas.

Antes de começar

  • Crie um tópico onde o Pub/Sub possa publicar eventos de pipelines do Cloud Data Fusion.

Funções necessárias

Para garantir que a conta de serviço do Cloud Data Fusion tem as autorizações necessárias para publicar eventos de pipeline num tópico do Pub/Sub, peça ao seu administrador para conceder à conta de serviço do Cloud Data Fusion a função do IAM Publicador do Pub/Sub (roles/pubsub.publisher) no projeto onde cria o tópico do Pub/Sub.

Para mais informações sobre a concessão de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.

O seu administrador também pode conceder à conta de serviço do Cloud Data Fusion as autorizações necessárias através de funções personalizadas ou outras funções predefinidas.

Faça a gestão da publicação de eventos numa instância do Cloud Data Fusion

Pode gerir a publicação de eventos em instâncias novas e existentes do Cloud Data Fusion através da API REST nas versões 6.7.0 e posteriores.

Publicar eventos numa nova instância

Crie uma nova instância e inclua o campo EventPublishConfig. Para mais informações sobre os campos obrigatórios para novas instâncias, consulte a referência do recurso Instances.

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

Substitua o seguinte:

  • PROJECT_ID: o Google Cloud ID do projeto
  • LOCATION: a localização do seu projeto
  • INSTANCE_ID: o ID da sua instância do Cloud Data Fusion
  • VERSION_NUMBER: a versão do Cloud Data Fusion onde cria a instância, por exemplo, 6.10.1
  • TOPIC_ID: o ID do tópico do Pub/Sub

Ative a publicação de eventos numa instância do Cloud Data Fusion existente

Atualize o campo EventPublishConfig numa instância existente do Cloud Data Fusion:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

Substitua o seguinte:

  • PROJECT_ID: o Google Cloud ID do projeto
  • LOCATION: a localização do seu projeto
  • INSTANCE_ID: o ID da sua instância do Cloud Data Fusion
  • TOPIC_ID: o ID do tópico do Pub/Sub

Remova a publicação de eventos de uma instância

Para remover a publicação de eventos de uma instância, atualize o valor de enabledpublicação de eventosfalse para false:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

Crie funções para ler mensagens do Pub/Sub

As funções do Cloud Run podem ler mensagens do Pub/Sub e agir em conformidade, como repetir pipelines com falhas. Para criar funções do Cloud Run, faça o seguinte:

  1. Na Google Cloud consola, aceda à página Funções do Cloud Run.

    Aceder às funções do Cloud Run

  2. Clique em Criar função.

  3. Introduza um nome de função e uma região.

  4. No campo Tipo de acionador, selecione Cloud Pub/Sub.

  5. Introduza o ID do tópico Pub/Sub.

  6. Clicar em Seguinte.

  7. Adicione funções para ler as mensagens do Pub/Sub e realizar outras ações. Por exemplo, pode adicionar funções para os seguintes exemplos de utilização:

    • Enviar alertas de falhas de pipelines.
    • Enviar alertas para KPIs, como a contagem de registos ou informações de execução.
    • Reinicie um pipeline com falhas que não tenha sido executado novamente.

    Para ver exemplos de funções do Cloud Run, consulte a secção Exemplo de utilização.

  8. Clique em Implementar. Para mais informações, consulte o artigo Implemente uma função do Cloud Run.

Exemplo de utilização: documentar o estado do pipeline e repetir pipelines com falhas

As seguintes funções do Cloud Run leem mensagens do Pub/Sub sobre o estado de execução do pipeline e, em seguida, repetem os pipelines com falhas no Cloud Data Fusion.

Estas funções de exemplo referem-se aos seguintes Google Cloud componentes:

  • Google Cloud project: o projeto onde as funções do Cloud Run e os tópicos do Pub/Sub são criados
  • Tópico do Pub/Sub: o tópico do Pub/Sub associado à sua instância do Cloud Data Fusion
  • Instância do Cloud Data Fusion: a instância do Cloud Data Fusion onde cria e executa pipelines
  • Tabela do BigQuery: a tabela do BigQuery que captura o estado do pipeline e os detalhes de execução e nova execução
  • Função do Cloud Run: a função do Cloud Run onde implementa o código que tenta novamente os pipelines com falhas
  1. O exemplo de função do Cloud Run seguinte lê as mensagens do Pub/Sub sobre eventos de estado do Cloud Data Fusion.

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. A função de exemplo seguinte cria e guarda uma tabela do BigQuery e consulta os detalhes da execução do pipeline.

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. A função de exemplo seguinte verifica se existem pipelines que falharam e se foram executados novamente na última hora.

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. Se o pipeline com falhas não tiver sido executado recentemente, a função de exemplo seguinte volta a executar o pipeline com falhas.

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

O que se segue?