Como trabalhar com pipelines de dados

Visão geral

Use os pipelines de dados do Dataflow para criar programações de jobs recorrentes, entender como os recursos são gastos em várias execuções de jobs, definir e gerenciar objetivos de atualização de dados e detalhar as etapas do pipeline para corrigir e otimizar pipelines.

Recursos de pipeline de dados:

  • Crie um pipeline em lote recorrente para executar um job em lote com base em uma programação.
  • Crie um pipeline em lote incremental recorrente para executar um job em lote com a versão mais recente dos dados de entrada.
  • Use a visão geral de resumo do pipeline para visualizar o uso de capacidade agregada e o consumo de recursos de um pipeline.
  • Veja a atualização de dados de um pipeline de streaming. Essa métrica, que evolui com o tempo, pode ser vinculada a um alerta que notifica quando a atualização fica abaixo de um objetivo específico.
  • Use os gráficos de métricas do pipeline para comparar jobs de pipeline em lote e encontrar anomalias.

Restrições de uso do pipeline de dados:

  • Disponibilidade regional: como os pipelines de dados do Dataflow usam o Cloud Scheduler, um aplicativo do App Engine, os pipelines de dados estão disponíveis em regiões disponíveis do App Engine.

  • Limites de cotas:

    • Número máximo de pipelines por projeto: 500
    • Número máximo de pipelines por organização: 2.500

Documentos de referência de APIs:

Para a documentação da API, consulte a referência de Data pipelines.

Tipos de pipelines de dados

Há dois tipos de pipelines de dados do Dataflow: streaming e lote. Os dois tipos de pipelines executam jobs definidos nos modelos do Dataflow.

Pipeline de dados de streaming
Um pipeline de dados de streaming executa um job de streaming do Dataflow imediatamente após a criação.
Pipeline de dados em lote
Um pipeline de dados em lote executa um job em lote do Dataflow em uma programação definida pelo usuário. O nome de arquivo de entrada do pipeline em lote pode ser parametrizado para permitir o processamento incremental do pipeline em lote.

Pipelines em lote incrementais

É possível usar marcadores de data e hora para especificar um formato de arquivo de entrada incremental para um pipeline em lote.

  • Marcadores de ano, mês, data, hora, minuto e segundo podem ser usados e precisam seguir o formato strftime(). Os marcadores são precedidos pelo símbolo de porcentagem (%).
  • A formatação do parâmetro não é verificada durante a criação do pipeline.
    • Exemplo: se você especificar "gs://bucket/Y" como o caminho do arquivo de entrada parametrizado, ele será avaliado como "gs://bucket/Y", já que "Y" sem um "%" antes não é mapeado para o formato strftime().

Em cada tempo de execução do pipeline em lote programado, a parte do marcador do caminho do arquivo de entrada é avaliada para a data/hora atual (ou time-shifted). Os valores de data são avaliados usando a data atual no fuso horário do job programado. Se o caminho do arquivo avaliado corresponder ao caminho de um arquivo de entrada, o arquivo será selecionado para processamento pelo pipeline em lote no horário programado.

  • Exemplo: um pipeline em lote está programado para se repetir no início de cada hora PST. Se você parametrizar o caminho do arquivo de entrada como gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv, em 15 de abril de 2021, às 18h PST, o caminho do arquivo de entrada será avaliado como gs://bucket-name/2021-04-15/prefix-18_00.csv.

Como usar parâmetros de time-shifting

É possível usar parâmetros de mudança de tempo de + ou - minuto ou hora, entre chaves com o formato, "{[+|-][0-9]+[m|h]}", para aceitar a correspondência de um caminho de arquivo de entrada com uma data e hora avaliada que é alterada antes ou depois da data e hora atual da programação do pipeline. O pipeline em lote continuará a ser repetido no horário programado, mas o caminho do arquivo de entrada será avaliado com o ajuste de tempo especificado.

  • Exemplo: um pipeline em lote está programado para se repetir no início de cada hora PST. Se você parametrizar o caminho do arquivo de entrada como gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h}, em 15 de abril de 2021, às 18h PST, o caminho do arquivo de entrada será avaliado como gs://bucket-name/2021-04-15/prefix-16_00.csv.

Papéis do pipeline de dados

Para que as operações de pipeline de dados funcionem, o usuário precisa receber os papéis necessários do IAM, da seguinte maneira:

  1. Um usuário precisa ter o papel apropriado para executar operações:

  2. Um usuário precisa ser capaz de atuar como a conta de serviço usada pelo Cloud Scheduler e pelo Dataflow ao receber o papel roles/iam.serviceAccountUser nessa conta. Se o usuário não selecionar uma conta de serviço para o Cloud Scheduler e o Dataflow, a conta de serviço padrão do Compute Engine será usada.

Como criar um pipeline de dados

Há duas maneiras para criar um pipeline de dados:

  1. Importar um job ou
  2. Criar um pipeline de dados

Página de configuração de pipelines de dados: quando você acessa o recurso de pipelines do Dataflow no Console do Cloud pela primeira vez, uma página de configuração é aberta.

  1. Ativar as APIs listadas
  2. Selecione uma região para o aplicativo do App Engine que o Cloud Scheduler usará para programar seus pipelines.

Importar um job

É possível importar um job em lote ou de streaming do Dataflow baseado em um modelo clássico ou flexível e torná-lo um pipeline de dados.

  1. Acesse a página Jobs do Dataflow no Console do Cloud, selecione um job concluído e, na página Detalhes do job, selecione "+IMPORTAR COMO PIPELINE".

  2. Na página Criar pipeline a partir do modelo, a opção de pipeline "pipeline de dados" é selecionada. Outros parâmetros são preenchidos com as opções do job importado.

    1. Para um job em lote, forneça uma programação de recorrência na seção "Programar seu pipeline" em "Parâmetros de modelo". Fornecer um endereço de conta de e-mail para o Cloud Scheduler, que é usado para programar execuções em lote, é opcional. Se não for especificado, a conta de serviço padrão do Compute Engine será usada. Observação: o usuário precisa receber o papel roles/iam.serviceAccountUser na conta de serviço usada pelo Cloud Scheduler, seja uma especificada pelo usuário ou a conta padrão do Compute Engine (consulte Papéis do pipeline de dados).

Criar um pipeline de dados

  1. Acesse a página Pipelines do Dataflow no Console do Cloud e selecione "+ CRIAR PIPELINE DE DADOS".

  2. Na página Criar pipeline a partir do modelo em "Gerenciamento de jobs", selecione "Pipeline de dados", forneça um nome de pipeline e preencha os outros campos de seleção e parâmetro do modelo.

    1. Para um job em lote, forneça uma programação de recorrência na seção "Programar seu pipeline" em "Parâmetros de modelo". Fornecer um endereço de conta de e-mail para o Cloud Scheduler, que é usado para programar execuções em lote, é opcional. Se não for especificado, a conta de serviço padrão do Compute Engine será usada. Observação: o usuário precisa receber o papel roles/iam.serviceAccountUser na conta de serviço usada pelo Cloud Scheduler, seja uma especificada pelo usuário ou a conta padrão do Compute Engine (consulte Papéis do pipeline de dados).

Criar um pipeline de dados em lote

Para criar esse exemplo de pipeline de dados em lote, você precisa ter acesso aos seguintes recursos no seu projeto:

Este exemplo de pipeline usa o modelo de pipeline em lote Cloud Storage Text para BigQuery, que lê arquivos em formato CSV do Cloud Storage, executa uma transformação e insere valores em your-project-id:your-dataset-name.three_column_table.

  1. Crie os seguintes arquivos no seu drive local:
    1. Um arquivo bq_three_column_table.json que contém o seguinte esquema da tabela de destino do BigQuery.
{
  "BigQuery Schema": [
    {
      "name": "col1",
      "type": "STRING"
    },
    {
      "name": "col2",
      "type": "STRING"
    },
    {
      "name": "col3",
      "type": "INT64"
    }
  ]
}
  1. Um arquivo JavaScript split_csv_3cols.js, que implementa uma transformação simples nos dados de entrada antes da inserção no BigQuery.
function transform(line) {
    var values = line.split(',');
    var obj = new Object();
    obj.col1 = values[0];
    obj.col2 = values[1];
    obj.col3 = values[2];
    var jsonString = JSON.stringify(obj);
    return jsonString;
}
  1. Um arquivo CSV file01.csv com vários registros que serão inseridos na tabela do BigQuery.
    b8e5087a,74,27531
    7a52c051,4a,25846
    672de80f,cd,76981
    111b92bf,2e,104653
    ff658424,f0,149364
    e6c17c75,84,38840
    833f5a69,8f,76892
    d8c833ff,7d,201386
    7d3da7fb,d5,81919
    3836d29b,70,181524
    ca66e6e5,d7,172076
    c8475eb6,03,247282
    558294df,f3,155392
    737b82a8,c7,235523
    82c8f5dc,35,468039
    57ab17f9,5e,480350
    cbcdaf84,bd,354127
    52b55391,eb,423078
    825b8863,62,88160
    26f16d4f,fd,397783
      
  2. Use gsutil para copiar os arquivos para as pastas em um bucket do Cloud Storage no projeto, da seguinte maneira:
    1. Copie bq_three_column_table.json e split_csv_3cols.js para gs://your-bucket/text_to_bigquery/
      gsutil cp bq_three_column_table.json gs://your-bucket/text_to_bigquery/
        gsutil cp split_csv_3cols.js gs://your-bucket/text_to_bigquery/
      
    2. Copie file01.csv para gs://your-bucket/inputs/
      gsutil cp file01.csv gs://your-bucket/inputs/
      
  3. Crie uma pasta "tmp" em your-bucket no navegador do Cloud Storage. Selecione o nome da pasta para abrir a página de detalhes do bucket e clique em "CRIAR FOLDER" para criar uma pasta "tmp" no seu bucket.
  4. Acesse a página Pipelines do Dataflow e selecione "CRIAR PIPELINE DE DADOS". Insira ou selecione os seguintes itens na página Criar pipeline do modelo:

    1. Gerenciamento de jobs:
      1. Selecione "Pipeline de dados".
      2. Nome do pipeline: digite "text_to_bq_batch_data_pipeline".
      3. Clique em "CONTINUAR".
    2. Seleção de modelos:
      1. Endpoint regional: selecione uma região do Compute Engine.
      2. Lista de modelos: em "Processar dados em massa (lote)", selecione "Arquivo de texto no Cloud Storage para BigQuery". Descrição: pipeline em lote. Lê arquivos de texto armazenados no Cloud Storage, transforma-os usando uma função definida pelo usuário (UDF) do JavaScript e gera o resultado no BigQuery." Observação: não selecione o pipeline de streaming com o mesmo nome em "Processar dados continuamente (stream)".
      3. Clique em "CONTINUAR".
    3. Parâmetros do modelo:
      1. Programe seu pipeline: selecione uma programação, como por hora no minuto 25, no seu fuso horário. É possível editar a programação depois do envio do pipeline, conforme explicado abaixo.
    4. Parâmetros obrigatórios:
      1. Caminho da UDF em JavaScript no Cloud Storage:
        gs://your-bucket/text_to_bigquery/split_csv_3cols.js
        
      2. Caminho de JSON:
        gs://your-bucket/text_to_bigquery/bq_three_column_table.json
        
      3. Nome da UDF JavaScript: "transform"
      4. Tabela de saída do BigQuery (nome totalmente qualificado da tabela):
        your_project_id:your_dataset.three_column_table
        
      5. Caminho de entrada do Cloud Storage:
        gs://your_bucket/inputs/file*.csv
        
      6. _Diretório temporário do BigQuery:
        gs://your_bucket/tmp
        
      7. Local temporário:
        gs://your_bucket/tmp
        
    5. Clique em "ENVIAR".
  5. Confirme as informações do pipeline e do modelo e veja o histórico atual e anterior na página Detalhes do pipeline.

Também é possível executar um pipeline em lote sob demanda usando o botão Executar no console dos pipelines do Dataflow.

Criar um pipeline de dados de streaming de amostra

É possível criar um exemplo de pipeline de dados de streaming seguindo as instruções do pipeline de amostra em lote, com as seguintes diferenças:

  • Programação do pipeline. Não especifique uma programação para um pipeline de dados de streaming. O job de streaming do Dataflow é iniciado imediatamente.

  • Seleção de modelos: em "Processar dados continuamente (stream)", selecione "Arquivos de texto no Cloud Storage para BigQuery". Descrição: um pipeline de streaming que pode ler arquivos de texto armazenados no Cloud Storage, realizar uma transformação por uma função JavaScript definida pelo usuário e fazer streaming dos resultados para o BigQuery. Esse pipeline requer uma função JavaScript e uma representação JSON do TableSchema do BigQuery.

  • Tipo de máquina de worker: o pipeline processará o conjunto inicial de arquivos correspondentes ao padrão gs://<your_bucket>/inputs/file*.csv e todos os outros arquivos correspondentes a esse padrão que você enviar para a pasta inputs/. Se o tamanho dos arquivos CSV exceder vários GB, para evitar possíveis erros de falta de memória, selecione um tipo de máquina com mais memória do que o tipo n1-standard-4 padrão, como n1-highmem-8.

Como investigar violações dos objetivos do pipeline

Pipelines em lote recorrentes

Na página "Detalhes do pipeline" no Console do Cloud, use os gráficos "Status do job individual" e "Tempo de linha de execução por etapa" do painel de status do pipeline para uma análise inicial da integridade do seu pipeline.

Exemplo de investigação:

  1. Você tem um pipeline em lote recorrente que é executado a cada hora nos três minutos após a hora, cada job normalmente é executado por aproximadamente nove minutos e você tem um objetivo para todos os jobs serem concluídos em menos de 10 minutos.

  2. O gráfico "status do job" mostra que um job foi executado há mais de 10 minutos.

  3. Na tabela do histórico de atualização/execução, localize o job executado durante a hora de interesse e clique na página de detalhes do job do Dataflow. Nessa página, você encontrará a etapa de execução mais longa e, em seguida, procure nos registros possíveis erros para determinar a causa do atraso.

Pipelines de streaming

Na guia "INFORMAÇÕES DO PIPELINE" na página Detalhes do pipeline no Console do Cloud, use o gráfico de atualização de dados do painel de status do pipeline para uma análise inicial da integridade do pipeline.

Exemplo de investigação:

  1. Você tem um pipeline de streaming que normalmente produz uma saída com uma atualização de dados de 20 segundos.

  2. Você define um objetivo de ter uma garantia de atualização de dados de 30 segundos. Ao analisar o gráfico de atualização de dados, você percebe que entre 9h e 10h, a atualização de dados aumentou para quase 40 segundos.

  3. Alterne para a guia "MÉTRICAS DE PIPELINE" e veja os gráficos de capacidade, utilização da CPU e memória para uma análise mais detalhada.