Monitorar o status do pipeline

Esta página descreve como publicar eventos de pipeline do Cloud Data Fusion, como o status do pipeline, em tópicos do Pub/Sub. Também descreve como criar funções do Cloud Run que processam as mensagens do Pub/Sub e realizam ações, como identificar e tentar novamente pipelines com falhas.

Antes de começar

  • Crie um tópico em que O Pub/Sub pode publicar eventos de pipeline do Cloud Data Fusion.

Funções exigidas

Para que a conta de serviço do Cloud Data Fusion tenha as permissões para publicar eventos de pipeline em um tópico do Pub/Sub; peça ao administrador para conceder à conta de serviço do Cloud Data Fusion Papel do IAM de editor do Pub/Sub (roles/pubsub.publisher) no projeto em que você cria o tópico do Pub/Sub. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

O administrador também pode conceder à conta de serviço do Cloud Data Fusion as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.

Gerenciar a publicação de eventos em uma instância do Cloud Data Fusion

É possível gerenciar a publicação de eventos em instâncias novas e atuais do Cloud Data Fusion usando a API REST nas versões 6.7.0 e mais recentes.

Publicar eventos em uma nova instância

Crie uma nova instância e inclua o campo EventPublishConfig. Para mais informações sobre campos obrigatórios para novas instâncias, consulte a referência do recurso de instâncias.

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud
  • LOCATION: o local do projeto.
  • INSTANCE_ID: o ID do Cloud Data Fusion instância
  • VERSION_NUMBER: a versão do Cloud Data Fusion em que você cria a instância, por exemplo, 6.10.1
  • TOPIC_ID: o ID do tópico do Pub/Sub

Ativar a publicação de eventos em uma instância atual do Cloud Data Fusion

Atualize o EventPublishConfig em uma instância do Cloud Data Fusion:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud
  • LOCATION: o local do seu projeto
  • INSTANCE_ID: o ID do Cloud Data Fusion instância
  • TOPIC_ID: o ID do tópico do Pub/Sub

Remover a publicação de eventos de uma instância

Para remover a publicação de eventos de uma instância, atualize o valor de enabled de publicação de eventos para false:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

Criar funções para ler mensagens do Pub/Sub

As funções do Cloud Run podem ler mensagens do Pub/Sub e agir de acordo com elas, como tentar novamente pipelines com falhas. Para criar uma função do Cloud Run, faça o seguinte:

  1. No console do Google Cloud, acesse a página das funções do Cloud Run.

    Acessar as funções do Cloud Run

  2. Clique em Criar função.

  3. Insira o nome e a região da função.

  4. No campo Acionador, selecione Cloud Pub/Sub.

  5. Insira o ID do tópico do Pub/Sub.

  6. Clique em Próxima.

  7. Adicione funções para ler as mensagens do Pub/Sub e realizar outras ações. Por exemplo, é possível adicionar funções para os seguintes casos de uso:

    • Enviar alertas para falhas em pipelines.
    • Enviar alertas para KPIs, como contagem de registros ou informações de execução.
    • Reinicie um pipeline com falha que não foi executado novamente.

    Para exemplos de função do Cloud Run, consulte a caso de uso.

  8. Clique em Implantar. Para mais informações, consulte Implantar uma função do Cloud Run.

Caso de uso: documentar o status do pipeline e repetir pipelines com falha

O exemplo de funções do Cloud Run a seguir lê mensagens do Pub/Sub sobre o status de execução do pipeline e tenta novamente os pipelines com falha no Cloud Data Fusion.

Essas funções de exemplo se referem aos seguintes componentes do Google Cloud:

  • Projeto do Google Cloud: o projeto em que As funções do Cloud Run e os tópicos do Pub/Sub estão criados
  • Tópico do Pub/Sub: o tópico do Pub/Sub vinculado sua instância do Cloud Data Fusion
  • Instância do Cloud Data Fusion: a instância do Cloud Data Fusion em que você projeta e executa pipelines.
  • Tabela do BigQuery: a tabela do BigQuery que captura o status do pipeline e os detalhes de execução e reexecução.
  • Função do Cloud Run: a função do Cloud Run em que você implanta o código que tenta novamente pipelines com falha.
  1. O exemplo de função do Cloud Run a seguir lê Mensagens do Pub/Sub sobre o status do Cloud Data Fusion eventos.

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. A função de exemplo a seguir cria e salva uma tabela do BigQuery e consulta os detalhes da execução do pipeline.

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. A função de exemplo a seguir verifica se há pipelines com falha e se foram executados novamente na última hora.

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. Se o pipeline com falha não tiver sido executado recentemente, o exemplo de função a seguir executa novamente o pipeline com falha.

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

A seguir