Neste documento, forneceremos orientações detalhadas sobre como criar e implantar um pipeline do Dataflow que é transmitido do Apache Kafka para o BigQuery.
O Apache Kafka é uma plataforma de código aberto para eventos de streaming. O Kafka é usado com frequência em arquiteturas distribuídas para permitir a comunicação entre componentes acoplados com flexibilidade. É possível usá-lo para ler eventos do Kafka, processá-los e gravar os resultados em uma tabela do BigQuery para uma análise mais aprofundada.
O Google fornece um modelo do Dataflow que configura um pipeline do Kafka para o BigQuery. O modelo usa o conector do BigQueryIO fornecido no SDK do Apache Beam.
Para usar esse modelo, siga estas etapas:
- Implante o Kafka no Google Cloud ou em outro lugar.
- Configure a rede.
- Defina permissões do Identity and Access Management (IAM).
- Escreva uma função para transformar os dados do evento.
- Crie a tabela de saída do BigQuery.
- Implante o modelo do Dataflow.
Implantar o Kafka
No Google Cloud, é possível implantar um cluster do Kafka em instâncias de máquina virtual (VM) do Compute Engine ou usar um serviço de terceiros do Kafka gerenciado. Para saber mais sobre opções de implantação no Google Cloud, consulte O que é o Apache Kafka?. Encontre soluções de terceiros do Kafka no Google Cloud Marketplace.
Você também pode ter um cluster atual do Kafka que reside fora do Google Cloud. Por exemplo, é possível ter uma carga de trabalho atual implantada no local ou em outra nuvem pública.
Configurar rede
Por padrão, o Dataflow inicia instâncias na rede padrão de nuvem privada virtual (VPC). Dependendo da configuração do Kafka, talvez seja necessário configurar uma rede e uma sub-rede diferentes para o Dataflow. Para mais informações, consulte Especificar uma rede e uma sub-rede na documentação do Dataflow. Ao configurar a rede, crie regras de firewall que permitam que as máquinas de worker do Dataflow alcancem os agentes do Kafka.
Se você estiver usando o VPC Service Controls, coloque o cluster do Kafka dentro do perímetro do VPC Service Controls ou estenda os perímetros para a VPN autorizada ou o Cloud Interconnect.
Conectar a um cluster externo
Se o cluster do Kafka for implantado fora do Google Cloud, crie uma conexão de rede entre o Dataflow e o cluster do Kafka. Existem várias opções de rede com diferentes vantagens e desvantagens:
- Conecte-se usando um espaço de endereço RFC 1918 compartilhado.
- Alcance seu cluster do Kafka hospedado externamente com endereços IP públicos.
- Internet pública
- Peering direto
- Peering de operadora
A Interconexão dedicada é a melhor opção para desempenho e confiabilidade previsíveis, mas pode levar mais tempo para configurar porque os novos circuitos precisam ser provisionados por terceiros. Com uma topologia baseada em IP público e pouco trabalho em rede, é possível dar os primeiros passos rapidamente.
As próximas duas seções descrevem essas opções em mais detalhes.
Espaço de endereço do RFC 1918 compartilhado
A Interconexão dedicada e a VPN IPsec oferecem acesso direto aos endereços IP RFC 1918 na sua nuvem privada virtual (VPC), o que pode simplificar a configuração do Kafka. Se você estiver usando uma topologia baseada em VPN, configure uma VPN de alta capacidade.
Por padrão, o Dataflow inicia instâncias na
rede VPC padrão. Em uma topologia de rede privada com rotas explicitamente definidas no Cloud Router, que conectam sub-redes do Google Cloud ao cluster do Kafka, você precisa de mais controle sobre onde localizar as instâncias do Dataflow. É possível
usar o Dataflow para configurar os parâmetros de execução
de network
e subnetwork
.
Verifique se a sub-rede correspondente tem endereços IP suficientes disponíveis para que o Dataflow possa iniciar instâncias enquanto tenta escalonar horizontalmente. Além disso, ao criar uma rede separada para iniciar as instâncias do Dataflow, verifique se você tem uma regra de firewall que ativa o tráfego TCP entre todas as máquinas virtuais no projeto. A rede padrão já tem essa regra de firewall configurada.
Espaço de endereço IP público
Essa arquitetura usa o Transport Layer Security (TLS, na sigla em inglês) para proteger o tráfego entre clientes externos e o Kafka e usa texto simples para comunicação entre agentes. Quando o listener do Kafka se vincula a uma interface de rede usada
para comunicação interna e externa,
a configuração dele é simples. No entanto, em muitos cenários, os endereços anunciados externamente dos agentes do Kafka
no cluster são diferentes das interfaces de rede internas
usadas pelo Kafka. Nesses cenários, é possível usar a propriedade
advertised.listeners
:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
Os clientes externos se conectam pela porta 9093 por um canal "SSL", e os clientes internos
se conectam pela porta 9092 por um canal de texto simples. Ao especificar um endereço em advertised.listeners
,
use nomes de DNS (kafkabroker-n.mydomain.com
, nesta amostra) que resolvem para a mesma instância
para tráfego externo e interno. O uso de endereços IP públicos pode não funcionar porque os endereços podem apresentar falha ao resolver o tráfego interno.
Definir permissões do IAM
Os jobs do Dataflow usam duas contas de serviço do IAM:
- O serviço do Dataflow usa uma conta de serviço do Dataflow para manipular recursos do Google Cloud, como a criação de VMs.
- As VMs de workers do Dataflow usam uma conta de serviço de worker para acessar arquivos e outros recursos do seu pipeline. Essa conta de serviço precisa de acesso de gravação à tabela de saída do BigQuery. Ela também precisa de acesso a todos os outros recursos que o job do pipeline referencia.
Verifique se essas duas contas de serviço têm os papéis apropriados. Para mais informações, consulte Segurança e permissões do Dataflow.
Transformar os dados do BigQuery
O modelo do Kafka para o BigQuery cria um pipeline que lê eventos de um ou mais tópicos do Kafka e os grava em uma tabela do BigQuery. Também é possível fornecer uma função JavaScript definida pelo usuário (UDF, na sigla em inglês) que transforma os dados de eventos antes que eles sejam gravados no BigQuery.
A saída do pipeline precisa ser de dados no formato JSON que correspondam ao esquema da tabela de saída. Se os dados do evento do Kafka já estiverem no formato JSON, crie uma tabela do BigQuery com um esquema correspondente e transmita os eventos diretamente para o BigQuery. Caso contrário, crie uma UDF que use os dados de eventos como entrada e retorne dados JSON que correspondem à sua tabela do BigQuery.
Por exemplo, suponha que os dados do evento contenham dois campos:
name
(string)customer_id
(inteiro)
A saída do pipeline do Dataflow pode ser assim:
{ "name": "Alice", "customer_id": 1234 }
Supondo que os dados do evento ainda não estejam no formato JSON, grave um UDF que transforme os dados da seguinte maneira:
// UDF
function process(eventData) {
var name;
var customer_id;
// TODO Parse the event data to extract the name and customer_id fields.
// Return a JSON payload.
return JSON.stringify({ name: name, customer_id: customer_id });
}
A UDF pode executar outros processamentos nos dados de eventos, como filtrar eventos, remover informações de identificação pessoal (PII) ou enriquecer os dados com campos adicionais.
Para mais informações sobre como criar uma UDF para o modelo, consulte Estender seu modelo do Dataflow com UDFs. Fazer upload do arquivo JavaScript para o Cloud Storage.
Criar a tabela de saída do BigQuery
Crie a tabela de saída do BigQuery antes de executar o modelo. O esquema da tabela precisa ser compatível com a saída JSON do pipeline. O pipeline grava o valor na coluna da tabela do BigQuery com o mesmo nome para cada propriedade no payload JSON. Todas as propriedades ausentes no JSON são interpretadas como valores NULL.
Usando o exemplo anterior, a tabela do BigQuery teria as colunas a seguir:
Nome da coluna | Tipo de dados |
---|---|
name |
STRING |
customer_id |
INTEGER |
É possível usar a instrução SQL
CREATE TABLE
para criar a tabela:
CREATE TABLE my_dataset.kafka_events (name STRING, customer_id INTEGER);
Como alternativa, é possível especificar o esquema da tabela usando um arquivo de definição JSON. Para mais informações, consulte Como especificar um esquema na documentação do BigQuery.
Executar o job do Dataflow
Depois de criar a tabela do BigQuery, execute o modelo do Dataflow.
Console
Para criar o job do Dataflow usando o Console do Google Cloud, execute as seguintes etapas:
- Acesse a página do Dataflow no Console do Google Cloud.
- Clique em Criar job usando um modelo.
- No campo Nome do job, insira um nome do job.
- Em Endpoint regional, selecione uma região.
- Selecione o modelo "Kafka para BigQuery".
- Em Parâmetros obrigatórios, insira o nome da tabela de saída do BigQuery. A tabela já precisa existir e ter um esquema válido.
Clique em Mostrar parâmetros opcionais e insira valores para, pelo menos, os seguintes parâmetros:
- O tópico do Kafka a ser usado para ler a entrada.
- A lista de servidores de inicialização do Kafka, separados por vírgulas.
- Um e-mail da conta de serviço.
Insira parâmetros adicionais conforme necessário. Especificamente, pode ser necessário especificar o seguinte:
- Rede: para usar uma rede VPC diferente da rede padrão, especifique a rede e a sub-rede.
- UDF: para usar uma UDF em JavaScript, especifique o local do script do Cloud Storage e o nome da função JavaScript a ser invocada.
gcloud
Para criar o job do Dataflow usando a Google Cloud CLI, execute o seguinte comando:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --region LOCATION \ --parameters inputTopics=KAFKA_TOPICS \ --parameters bootstrapServers=BOOTSTRAP_SERVERS \ --parameters outputTableSpec=OUTPUT_TABLE \ --parameters serviceAccount=IAM_SERVICE_ACCOUNT \ --parameters javascriptTextTransformGcsPath=UDF_SCRIPT_PATH \ --parameters javascriptTextTransformFunctionName=UDF_FUNCTION_NAME \ --network VPC_NETWORK_NAME \ --subnetwork SUBNET_NAME
Substitua as seguintes variáveis:
- JOB_NAME. Um nome de job de sua escolha
- LOCATION. A região em que o job será executado. Para mais informações sobre regiões e locais, consulte Locais do Dataflow.
- KAFKA_TOPICS. Uma lista separada por vírgulas de tópicos do Kafka para ler.
- BOOTSTRAP_SERVERS Uma lista separada por vírgulas de servidores de inicialização
Kafka. Exemplo:
127:9092,127.0.0.1:9093
. - OUTPUT_TABLE. A tabela de saída do BigQuery, especificada como PROJECT_ID:DATASET_NAME.TABLE_NAME. Exemplo:
my_project:dataset1.table1
. - IAM_SERVICE_ACCOUNT: opcional. O endereço de e-mail da conta de serviço para executar o job.
- UDF_SCRIPT_PATH: opcional. O caminho do Cloud Storage para um arquivo JavaScript que contém uma UDF. Por exemplo:
gs://your-bucket/your-function.js
. - UDF_FUNCTION_NAME: opcional. O nome da função JavaScript a ser chamada como UDF.
- VPC_NETWORK_NAME: opcional. Rede à qual os workers serão atribuídos.
- SUBNET_NAME: opcional. A sub-rede a que os workers serão atribuídos.
Tipos de dados
Veja nesta seção como lidar com vários tipos de dados no esquema da tabela do BigQuery.
Internamente, as mensagens JSON são convertidas em objetos TableRow
, e os valores dos campos TableRow
são convertidos em tipos do BigQuery.
Tipos de escalar
O exemplo a seguir cria uma tabela do BigQuery com diferentes tipos de dados escalares, incluindo tipos de string, numéricos, booleanos, data/hora, intervalo e geográficos.
CREATE TABLE my_dataset.kafka_events (
string_col STRING,
integer_col INT64,
float_col FLOAT64,
decimal_col DECIMAL,
bool_col BOOL,
date_col DATE,
dt_col DATETIME,
ts_col TIMESTAMP,
interval_col INTERVAL,
geo_col GEOGRAPHY
);
Veja um payload JSON com campos compatíveis:
{
"string_col": "string_val",
"integer_col": 10,
"float_col": 3.142,
"decimal_col": 5.2E11,
"bool_col": true,
"date_col": "2022-07-01",
"dt_col": "2022-07-01 12:00:00.00",
"ts_col": "2022-07-01T12:00:00.00Z",
"interval_col": "0-13 370 48:61:61",
"geo_col": "POINT(1 2)"
}
Observações:
- Para uma coluna
TIMESTAMP
, use o métodoDate.toJSON
do JavaScript para formatar o valor. - Para a coluna
GEOGRAPHY
, é possível especificar a região geográfica usando texto conhecido (WKT, na sigla em inglês) ou GeoJSON formatado como uma string. Para mais informações, consulte Como carregar dados geoespaciais.
Para mais informações sobre os tipos de dados no BigQuery, consulte Tipos de dados.
Matrizes
É possível armazenar uma matriz no BigQuery usando o tipo de dados ARRAY
. No exemplo a seguir, o payload JSON contém uma propriedade denominada
scores
, em que o valor é uma matriz JSON:
{"name":"Emily","scores":[10,7,10,9]}
A instrução SQL CREATE TABLE
a seguir cria uma tabela do BigQuery com um esquema compatível:
CREATE TABLE my_dataset.kafka_events (name STRING, scores ARRAY<INTEGER>);
A tabela terá esta aparência:
+-------+-------------+
| name | scores |
+-------+-------------+
| Emily | [10,7,10,9] |
+-------+-------------+
Estruturas
O tipo de dados STRUCT
no
BigQuery contém uma lista ordenada de campos nomeados. É possível usar um
STRUCT
para armazenar objetos JSON que seguem um esquema consistente.
No exemplo a seguir, o payload JSON contém uma propriedade denominada val
, em que
o valor é um objeto JSON:
{"name":"Emily","val":{"a":"yes","b":"no"}}
A instrução SQL CREATE TABLE
a seguir cria uma tabela do BigQuery com um esquema compatível:
CREATE TABLE my_dataset.kafka_events (name STRING, val STRUCT<a STRING, b STRING>);
A tabela terá esta aparência:
+-------+----------------------+
| name | val |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+
Dados de eventos semiestruturados
Se os dados de eventos do Kafka não seguirem um esquema rigoroso, armazene-os no
BigQuery como um
tipo de dados JSON
(visualização). Ao armazenar dados JSON como um tipo de dados JSON
, não é necessário definir antecipadamente o esquema de evento. Após a ingestão de dados, é possível consultar a tabela de saída
usando os operadores de acesso a campo (notação por pontos) e de matriz.
Primeiro, crie uma tabela com uma coluna JSON
:
-- Create the BigQuery table
CREATE TABLE my_dataset.kafka_events (event_data JSON);
Em seguida, defina uma UDF em JavaScript que envolve o payload de eventos em um objeto JSON:
// UDF
function process(eventData) {
var json;
// TODO Convert the event data to JSON.
return JSON.stringify({ "event_data": json });
}
Depois que os dados forem gravados no BigQuery, use o operador de acesso a campos para consultar os campos individuais. Por exemplo, a consulta a seguir retorna o valor do campo name
de cada registro:
SELECT event_data.name FROM my_dataset1.kafka_events;
Para mais informações sobre como usar JSON no BigQuery, consulte Como trabalhar com dados JSON no SQL padrão do Google.
Erros e geração de registros
Podem ocorrer erros ao executar o pipeline ou ao processar eventos individuais do Kafka.
Para mais informações sobre como lidar com erros de pipeline, consulte Solução de problemas e depuração de pipelines.
Se o job for executado com sucesso, mas ocorrer um erro ao processar um evento individual do Kafka, o job do pipeline gravará um registro de erros em uma tabela no BigQuery. O job em si não falha, e o erro no nível do evento não aparece como um erro no registro de jobs do Dataflow.
O job do pipeline cria automaticamente a tabela para armazenar registros de erros. Por padrão, o nome da tabela é "output_table_error_records", em que
output_table é o nome da tabela de saída. Por exemplo, se a tabela de saída
for denominada kafka_events
, a tabela de erro será denominada kafka_events_error_records
.
É possível especificar um nome diferente definindo o parâmetro de modelo outputDeadletterTable
:
outputDeadletterTable=my_project:dataset1.errors_table
Possíveis erros incluem:
- Erros de serialização, incluindo JSON formatado incorretamente.
- Digite erros de conversão, causados por uma incompatibilidade no esquema da tabela e nos dados JSON.
- Campos extras nos dados JSON que não estão no esquema da tabela.
Exemplo de mensagens de erro:
Tipo de erro | Dados do evento | errorMessage |
---|---|---|
Erro de serialização | "Hello world" | Falha ao serializar json para a linha da tabela: "Hello world" |
Erro de conversão de tipo | {"name":"Emily","customer_id":"abc"} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 } |
Campo desconhecido | {"name":"Zoe","age":34} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 } |
Próximas etapas
- Saiba mais sobre os modelos do Dataflow.
- Começar com o BigQuery