Esta página mostra como usar o Dataflow para ler dados do Google Cloud Managed Service para Apache Kafka e escrever os registos numa tabela do BigQuery. Este tutorial usa o modelo Apache Kafka para BigQuery para criar a tarefa do Dataflow.
Vista geral
O Apache Kafka é uma plataforma de código aberto para eventos de streaming. O Kafka é usado frequentemente em arquiteturas distribuídas para permitir a comunicação entre componentes pouco acoplados. Pode usar o Dataflow para ler eventos do Kafka, processá-los e escrever os resultados numa tabela do BigQuery para análise adicional.
O Managed Service for Apache Kafka é um serviço da Google Cloud Platform que ajuda a executar clusters Kafka seguros e escaláveis.

Autorizações necessárias
A conta de serviço do trabalhador do Dataflow tem de ter as seguintes funções de gestão de identidade e de acesso (IAM):
- Cliente Kafka gerido (
roles/managedkafka.client
) - Editor de dados do BigQuery (
roles/bigquery.dataEditor
)
Para mais informações, consulte o artigo Segurança e autorizações do fluxo de dados.
Crie um cluster do Kafka
Neste passo, cria um cluster do Managed Service for Apache Kafka. Para mais informações, consulte o artigo Crie um cluster do Managed Service para Apache Kafka.
Consola
Aceda à página Managed Service for Apache Kafka > Clusters.
Clique em Criar.
Na caixa Nome do cluster, introduza um nome para o cluster.
Na lista Região, selecione uma localização para o cluster.
Clique em Criar.
gcloud
Use o comando
managed-kafka clusters create
.
gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
Substitua o seguinte:
CLUSTER
: um nome para o clusterREGION
: a região onde criou a sub-redePROJECT_ID
: o ID do seu projetoSUBNET_NAME
: a sub-rede onde quer implementar o cluster
Normalmente, a criação de um cluster demora 20 a 30 minutos.
Crie um tópico do Kafka
Depois de criar o cluster do Managed Service for Apache Kafka, crie um tópico.
Consola
Aceda à página Managed Service for Apache Kafka > Clusters.
Clique no nome do cluster.
Na página de detalhes do cluster, clique em Criar tópico.
Na caixa Nome do tópico, introduza um nome para o tópico.
Clique em Criar.
gcloud
Use o comando
managed-kafka topics create
.
gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3
Substitua o seguinte:
TOPIC_NAME
: o nome do tópico a criar
Crie uma tabela do BigQuery
Neste passo, cria uma tabela do BigQuery com o seguinte esquema:
Nome da coluna | Tipo de dados |
---|---|
name |
STRING |
customer_id |
INTEGER |
Se ainda não tiver um conjunto de dados do BigQuery, crie um primeiro. Para mais informações, consulte o artigo Crie conjuntos de dados. Em seguida, crie uma nova tabela vazia:
Consola
Aceda à página do BigQuery.
No painel Explorador, expanda o seu projeto e, de seguida, selecione um conjunto de dados.
Na secção de informações do conjunto de dados, clique em
Criar tabela.Na lista Criar tabela a partir de, selecione Tabela vazia.
Na caixa Tabela, introduza o nome da tabela.
Na secção Esquema, clique em Editar como texto.
Cole a seguinte definição do esquema:
name:STRING, customer_id:INTEGER
Clique em Criar tabela.
gcloud
Use o comando bq mk
.
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
Substitua o seguinte:
PROJECT_ID
: o ID do seu projetoDATASET_NAME
: o nome do conjunto de dadosTABLE_NAME
: o nome da tabela a criar
Execute a tarefa do Dataflow
Depois de criar o cluster do Kafka e a tabela do BigQuery, execute o modelo do Dataflow.
Consola
Primeiro, obtenha o endereço do servidor de arranque do cluster:
Na Google Cloud consola, aceda à página Clusters.
Clique no nome do cluster.
Clique no separador Configurações.
Copie o endereço do servidor de arranque do URL de arranque.
Em seguida, execute o modelo para criar a tarefa do Dataflow:
Aceda à página Dataflow > Tarefas.
Clique em Criar tarefa a partir de modelo.
No campo Nome da tarefa, introduza
kafka-to-bq
.Para o ponto final regional, selecione a região onde se encontra o cluster do Managed Service for Apache Kafka.
Selecione o modelo "Kafka para BigQuery".
Introduza os seguintes parâmetros do modelo:
- Servidor de arranque do Kafka: o endereço do servidor de arranque
- Tópico do Kafka de origem: o nome do tópico a ler
- Modo de autenticação da origem Kafka:
APPLICATION_DEFAULT_CREDENTIALS
- Formato de mensagem Kafka:
JSON
- Estratégia de nome das tabelas:
SINGLE_TABLE_NAME
- Tabela de saída do BigQuery: a tabela do BigQuery, formatada da seguinte forma:
PROJECT_ID
:DATASET_NAME
.TABLE_NAME
Em Fila de mensagens rejeitadas, selecione Escrever erros no BigQuery.
Introduza um nome de tabela do BigQuery para a fila de mensagens rejeitadas, formatado da seguinte forma:
PROJECT_ID
:DATASET_NAME
.ERROR_TABLE_NAME
Não crie esta tabela antecipadamente. O pipeline cria-o.
Clique em Executar tarefa.
gcloud
Use o comando
dataflow flex-template run
.
gcloud dataflow flex-template run kafka-to-bq \ --template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --region LOCATION \ --parameters \ readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\ persistKafkaKey=false,\ writeMode=SINGLE_TABLE_NAME,\ kafkaReadOffset=earliest,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\ useBigQueryDLQ=true,\ outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
Substitua as seguintes variáveis:
LOCATION
: a região onde o seu Managed Service for Apache Kafka está localizadoPROJECT_ID
: o nome do seu projeto da Google Cloud PlatformCLUSTER_ID
: o do clusterTOPIC
: o nome do tópico do KafkaDATASET_NAME
: o nome do conjunto de dadosTABLE_NAME
: o nome da tabelaERROR_TABLE_NAME
: um nome de tabela do BigQuery para a fila de mensagens rejeitadas
Não crie a tabela para a fila de mensagens rejeitadas antecipadamente. O pipeline cria-o.
Envie mensagens para o Kafka
Depois de a tarefa do Dataflow ser iniciada, pode enviar mensagens para o Kafka e o pipeline escreve-as no BigQuery.
Crie uma VM na mesma sub-rede que o cluster Kafka e instale as ferramentas de linha de comandos do Kafka. Para ver instruções detalhadas, consulte o artigo Configure um computador cliente em Publique e consuma mensagens com a CLI.
Execute o seguinte comando para escrever mensagens no tópico do Kafka:
kafka-console-producer.sh \ --topic TOPIC \ --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \ --producer.config client.properties
Substitua as seguintes variáveis:
TOPIC
: o nome do tópico do KafkaCLUSTER_ID
: o nome do clusterLOCATION
: a região onde o cluster está localizadoPROJECT_ID
: o nome do seu projeto da Google Cloud Platform
No comando, introduza as seguintes linhas de texto para enviar mensagens para o Kafka:
{"name": "Alice", "customer_id": 1} {"name": "Bob", "customer_id": 2} {"name": "Charles", "customer_id": 3}
Use uma fila de mensagens rejeitadas
Enquanto a tarefa está em execução, o pipeline pode não conseguir escrever mensagens individuais no BigQuery. Os possíveis erros incluem:
- Erros de serialização, incluindo JSON com formato incorreto.
- Erros de conversão de tipos, causados por uma incompatibilidade no esquema de tabela e nos dados JSON.
- Campos adicionais nos dados JSON que não estão presentes no esquema de tabela.
Estes erros não fazem com que a tarefa falhe e não aparecem como erros no registo de tarefas do Dataflow. Em alternativa, o pipeline usa uma fila de mensagens rejeitadas para processar estes tipos de erros.
Para ativar a fila de mensagens rejeitadas quando executar o modelo, defina os seguintes parâmetros do modelo:
useBigQueryDLQ
:true
outputDeadletterTable
: um nome de tabela do BigQuery totalmente qualificado; por exemplo,my-project:dataset1.errors
O pipeline cria automaticamente a tabela. Se ocorrer um erro durante o processamento de uma mensagem do Kafka, o pipeline escreve uma entrada de erro na tabela.
Exemplos de mensagens de erro:
Tipo de erro | Dados do evento | errorMessage |
---|---|---|
Erro de serialização | "Hello world" | Falha ao serializar o JSON para a linha da tabela: "Hello world" |
Erro de conversão de tipo | {"name":"Emily","customer_id":"abc"} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Não é possível converter o valor em número inteiro (valor inválido): abc", "reason" : "invalid" } ], "index" : 0 } |
Campo desconhecido | {"name":"Zoe","age":34} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 } |
Trabalhe com tipos de dados do BigQuery
Internamente, o conector de E/S do Kafka converte payloads de mensagens JSON em objetos TableRow
do Apache Beam e traduz os valores dos campos TableRow
em tipos do BigQuery.
A tabela seguinte mostra representações JSON dos tipos de dados do BigQuery.
Tipo do BigQuery | Representação JSON |
---|---|
ARRAY |
[1.2,3] |
BOOL |
true |
DATE |
"2022-07-01" |
DATETIME |
"2022-07-01 12:00:00.00" |
DECIMAL |
5.2E11 |
FLOAT64 |
3.142 |
GEOGRAPHY |
"POINT(1 2)" Especifique a geografia através de texto conhecido (WKT) ou GeoJSON, formatado como uma string. Para mais informações, consulte o artigo Carregar dados geoespaciais. |
INT64 |
10 |
INTERVAL |
"0-13 370 48:61:61" |
STRING |
"string_val" |
TIMESTAMP |
"2022-07-01T12:00:00.00Z" Use o método |
Dados estruturados
Se as suas mensagens JSON seguirem um esquema consistente, pode representar objetos JSON
usando o tipo de dados STRUCT
no BigQuery.
No exemplo seguinte, o campo answers
é um objeto JSON com dois subcampos, a
e b
:
{"name":"Emily","answers":{"a":"yes","b":"no"}}
A seguinte declaração SQL cria uma tabela do BigQuery com um esquema compatível:
CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);
A tabela resultante tem o seguinte aspeto:
+-------+----------------------+
| name | answers |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+
Dados semiestruturados
Se as suas mensagens JSON não seguirem um esquema rigoroso, considere armazená-las no
BigQuery como um tipo de dados JSON
.
Ao armazenar dados JSON como um tipo JSON
, não precisa de definir o esquema antecipadamente. Após o carregamento de dados, pode consultar os dados através dos operadores de acesso a campos (notação de pontos) e de acesso a matrizes no GoogleSQL. Para mais
informações, consulte o artigo
Trabalhar com dados JSON no GoogleSQL.
Use uma FDU para transformar os dados
Este tutorial pressupõe que as mensagens do Kafka estão formatadas como JSON e que o esquema da tabela do BigQuery corresponde aos dados JSON, sem transformações aplicadas aos dados.
Opcionalmente, pode fornecer uma função definida pelo utilizador (FDU) em JavaScript que transforma os dados antes de serem escritos no BigQuery. A FDU também pode realizar processamento adicional, como filtrar, remover informações de identificação pessoal (IIP) ou enriquecer os dados com campos adicionais.
Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.