Escreva dados do Kafka para o BigQuery através do Dataflow

Esta página mostra como usar o Dataflow para ler dados do Google Cloud Managed Service para Apache Kafka e escrever os registos numa tabela do BigQuery. Este tutorial usa o modelo Apache Kafka para BigQuery para criar a tarefa do Dataflow.

Vista geral

O Apache Kafka é uma plataforma de código aberto para eventos de streaming. O Kafka é usado frequentemente em arquiteturas distribuídas para permitir a comunicação entre componentes pouco acoplados. Pode usar o Dataflow para ler eventos do Kafka, processá-los e escrever os resultados numa tabela do BigQuery para análise adicional.

O Managed Service for Apache Kafka é um serviço da Google Cloud Platform que ajuda a executar clusters Kafka seguros e escaláveis.

Ler eventos do Kafka no BigQuery
Arquitetura orientada por eventos com o Apache Kafka

Autorizações necessárias

A conta de serviço do trabalhador do Dataflow tem de ter as seguintes funções de gestão de identidade e de acesso (IAM):

  • Cliente Kafka gerido (roles/managedkafka.client)
  • Editor de dados do BigQuery (roles/bigquery.dataEditor)

Para mais informações, consulte o artigo Segurança e autorizações do fluxo de dados.

Crie um cluster do Kafka

Neste passo, cria um cluster do Managed Service for Apache Kafka. Para mais informações, consulte o artigo Crie um cluster do Managed Service para Apache Kafka.

Consola

  1. Aceda à página Managed Service for Apache Kafka > Clusters.

    Aceda a Clusters

  2. Clique em Criar.

  3. Na caixa Nome do cluster, introduza um nome para o cluster.

  4. Na lista Região, selecione uma localização para o cluster.

  5. Clique em Criar.

gcloud

Use o comando managed-kafka clusters create.

gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

Substitua o seguinte:

  • CLUSTER: um nome para o cluster
  • REGION: a região onde criou a sub-rede
  • PROJECT_ID: o ID do seu projeto
  • SUBNET_NAME: a sub-rede onde quer implementar o cluster

Normalmente, a criação de um cluster demora 20 a 30 minutos.

Crie um tópico do Kafka

Depois de criar o cluster do Managed Service for Apache Kafka, crie um tópico.

Consola

  1. Aceda à página Managed Service for Apache Kafka > Clusters.

    Aceda a Clusters

  2. Clique no nome do cluster.

  3. Na página de detalhes do cluster, clique em Criar tópico.

  4. Na caixa Nome do tópico, introduza um nome para o tópico.

  5. Clique em Criar.

gcloud

Use o comando managed-kafka topics create.

gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3

Substitua o seguinte:

  • TOPIC_NAME: o nome do tópico a criar

Crie uma tabela do BigQuery

Neste passo, cria uma tabela do BigQuery com o seguinte esquema:

Nome da coluna Tipo de dados
name STRING
customer_id INTEGER

Se ainda não tiver um conjunto de dados do BigQuery, crie um primeiro. Para mais informações, consulte o artigo Crie conjuntos de dados. Em seguida, crie uma nova tabela vazia:

Consola

  1. Aceda à página do BigQuery.

    Aceda ao BigQuery

  2. No painel Explorador, expanda o seu projeto e, de seguida, selecione um conjunto de dados.

  3. Na secção de informações do conjunto de dados, clique em Criar tabela.

  4. Na lista Criar tabela a partir de, selecione Tabela vazia.

  5. Na caixa Tabela, introduza o nome da tabela.

  6. Na secção Esquema, clique em Editar como texto.

  7. Cole a seguinte definição do esquema:

    name:STRING,
    customer_id:INTEGER
    
  8. Clique em Criar tabela.

gcloud

Use o comando bq mk.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

Substitua o seguinte:

  • PROJECT_ID: o ID do seu projeto
  • DATASET_NAME: o nome do conjunto de dados
  • TABLE_NAME: o nome da tabela a criar

Execute a tarefa do Dataflow

Depois de criar o cluster do Kafka e a tabela do BigQuery, execute o modelo do Dataflow.

Consola

Primeiro, obtenha o endereço do servidor de arranque do cluster:

  1. Na Google Cloud consola, aceda à página Clusters.

    Aceda a Clusters

  2. Clique no nome do cluster.

  3. Clique no separador Configurações.

  4. Copie o endereço do servidor de arranque do URL de arranque.

Em seguida, execute o modelo para criar a tarefa do Dataflow:

  1. Aceda à página Dataflow > Tarefas.

    Aceda a Empregos

  2. Clique em Criar tarefa a partir de modelo.

  3. No campo Nome da tarefa, introduza kafka-to-bq.

  4. Para o ponto final regional, selecione a região onde se encontra o cluster do Managed Service for Apache Kafka.

  5. Selecione o modelo "Kafka para BigQuery".

  6. Introduza os seguintes parâmetros do modelo:

    • Servidor de arranque do Kafka: o endereço do servidor de arranque
    • Tópico do Kafka de origem: o nome do tópico a ler
    • Modo de autenticação da origem Kafka: APPLICATION_DEFAULT_CREDENTIALS
    • Formato de mensagem Kafka: JSON
    • Estratégia de nome das tabelas: SINGLE_TABLE_NAME
    • Tabela de saída do BigQuery: a tabela do BigQuery, formatada da seguinte forma: PROJECT_ID:DATASET_NAME.TABLE_NAME
  7. Em Fila de mensagens rejeitadas, selecione Escrever erros no BigQuery.

  8. Introduza um nome de tabela do BigQuery para a fila de mensagens rejeitadas, formatado da seguinte forma: PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

    Não crie esta tabela antecipadamente. O pipeline cria-o.

  9. Clique em Executar tarefa.

gcloud

Use o comando dataflow flex-template run.

gcloud dataflow flex-template run kafka-to-bq \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
persistKafkaKey=false,\
writeMode=SINGLE_TABLE_NAME,\
kafkaReadOffset=earliest,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

Substitua as seguintes variáveis:

  • LOCATION: a região onde o seu Managed Service for Apache Kafka está localizado
  • PROJECT_ID: o nome do seu projeto da Google Cloud Platform
  • CLUSTER_ID: o do cluster
  • TOPIC: o nome do tópico do Kafka
  • DATASET_NAME: o nome do conjunto de dados
  • TABLE_NAME: o nome da tabela
  • ERROR_TABLE_NAME: um nome de tabela do BigQuery para a fila de mensagens rejeitadas

Não crie a tabela para a fila de mensagens rejeitadas antecipadamente. O pipeline cria-o.

Envie mensagens para o Kafka

Depois de a tarefa do Dataflow ser iniciada, pode enviar mensagens para o Kafka e o pipeline escreve-as no BigQuery.

  1. Crie uma VM na mesma sub-rede que o cluster Kafka e instale as ferramentas de linha de comandos do Kafka. Para ver instruções detalhadas, consulte o artigo Configure um computador cliente em Publique e consuma mensagens com a CLI.

  2. Execute o seguinte comando para escrever mensagens no tópico do Kafka:

    kafka-console-producer.sh \
     --topic TOPIC \
     --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \
     --producer.config client.properties

    Substitua as seguintes variáveis:

    • TOPIC: o nome do tópico do Kafka
    • CLUSTER_ID: o nome do cluster
    • LOCATION: a região onde o cluster está localizado
    • PROJECT_ID: o nome do seu projeto da Google Cloud Platform
  3. No comando, introduza as seguintes linhas de texto para enviar mensagens para o Kafka:

    {"name": "Alice", "customer_id": 1}
    {"name": "Bob", "customer_id": 2}
    {"name": "Charles", "customer_id": 3}
    

Use uma fila de mensagens rejeitadas

Enquanto a tarefa está em execução, o pipeline pode não conseguir escrever mensagens individuais no BigQuery. Os possíveis erros incluem:

  • Erros de serialização, incluindo JSON com formato incorreto.
  • Erros de conversão de tipos, causados por uma incompatibilidade no esquema de tabela e nos dados JSON.
  • Campos adicionais nos dados JSON que não estão presentes no esquema de tabela.

Estes erros não fazem com que a tarefa falhe e não aparecem como erros no registo de tarefas do Dataflow. Em alternativa, o pipeline usa uma fila de mensagens rejeitadas para processar estes tipos de erros.

Para ativar a fila de mensagens rejeitadas quando executar o modelo, defina os seguintes parâmetros do modelo:

  • useBigQueryDLQ: true
  • outputDeadletterTable: um nome de tabela do BigQuery totalmente qualificado; por exemplo, my-project:dataset1.errors

O pipeline cria automaticamente a tabela. Se ocorrer um erro durante o processamento de uma mensagem do Kafka, o pipeline escreve uma entrada de erro na tabela.

Exemplos de mensagens de erro:

Tipo de erro Dados do evento errorMessage
Erro de serialização "Hello world" Falha ao serializar o JSON para a linha da tabela: "Hello world"
Erro de conversão de tipo {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Não é possível converter o valor em número inteiro (valor inválido): abc", "reason" : "invalid" } ], "index" : 0 }
Campo desconhecido {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

Trabalhe com tipos de dados do BigQuery

Internamente, o conector de E/S do Kafka converte payloads de mensagens JSON em objetos TableRow do Apache Beam e traduz os valores dos campos TableRow em tipos do BigQuery.

A tabela seguinte mostra representações JSON dos tipos de dados do BigQuery.

Tipo do BigQuery Representação JSON
ARRAY [1.2,3]
BOOL true
DATE "2022-07-01"
DATETIME "2022-07-01 12:00:00.00"
DECIMAL 5.2E11
FLOAT64 3.142
GEOGRAPHY "POINT(1 2)"

Especifique a geografia através de texto conhecido (WKT) ou GeoJSON, formatado como uma string. Para mais informações, consulte o artigo Carregar dados geoespaciais.

INT64 10
INTERVAL "0-13 370 48:61:61"
STRING "string_val"
TIMESTAMP "2022-07-01T12:00:00.00Z"

Use o método Date.toJSON de JavaScript para formatar o valor.

Dados estruturados

Se as suas mensagens JSON seguirem um esquema consistente, pode representar objetos JSON usando o tipo de dados STRUCT no BigQuery.

No exemplo seguinte, o campo answers é um objeto JSON com dois subcampos, a e b:

{"name":"Emily","answers":{"a":"yes","b":"no"}}

A seguinte declaração SQL cria uma tabela do BigQuery com um esquema compatível:

CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);

A tabela resultante tem o seguinte aspeto:

+-------+----------------------+
| name  |       answers        |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

Dados semiestruturados

Se as suas mensagens JSON não seguirem um esquema rigoroso, considere armazená-las no BigQuery como um tipo de dados JSON. Ao armazenar dados JSON como um tipo JSON, não precisa de definir o esquema antecipadamente. Após o carregamento de dados, pode consultar os dados através dos operadores de acesso a campos (notação de pontos) e de acesso a matrizes no GoogleSQL. Para mais informações, consulte o artigo Trabalhar com dados JSON no GoogleSQL.

Use uma FDU para transformar os dados

Este tutorial pressupõe que as mensagens do Kafka estão formatadas como JSON e que o esquema da tabela do BigQuery corresponde aos dados JSON, sem transformações aplicadas aos dados.

Opcionalmente, pode fornecer uma função definida pelo utilizador (FDU) em JavaScript que transforma os dados antes de serem escritos no BigQuery. A FDU também pode realizar processamento adicional, como filtrar, remover informações de identificação pessoal (IIP) ou enriquecer os dados com campos adicionais.

Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.

O que se segue?