Gravar dados do Kafka no BigQuery com o Dataflow

Neste documento, forneceremos orientações detalhadas sobre como criar e implantar um pipeline do Dataflow que é transmitido do Apache Kafka para o BigQuery.

O Apache Kafka é uma plataforma de código aberto para eventos de streaming. O Kafka é usado com frequência em arquiteturas distribuídas para permitir a comunicação entre componentes acoplados com flexibilidade. É possível usá-lo para ler eventos do Kafka, processá-los e gravar os resultados em uma tabela do BigQuery para uma análise mais aprofundada.

Como ler eventos do Kafka no BigQuery

O Google fornece um modelo do Dataflow que configura um pipeline do Kafka para o BigQuery. O modelo usa o conector do BigQueryIO fornecido no SDK do Apache Beam.

Para usar esse modelo, siga estas etapas:

  1. Implante o Kafka no Google Cloud ou em outro lugar.
  2. Configure a rede.
  3. Defina permissões do Identity and Access Management (IAM).
  4. Escreva uma função para transformar os dados do evento.
  5. Crie a tabela de saída do BigQuery.
  6. Implante o modelo do Dataflow.

Implantar o Kafka

No Google Cloud, é possível implantar um cluster do Kafka em instâncias de máquina virtual (VM) do Compute Engine ou usar um serviço de terceiros do Kafka gerenciado. Para saber mais sobre opções de implantação no Google Cloud, consulte O que é o Apache Kafka?. Encontre soluções de terceiros do Kafka no Google Cloud Marketplace.

Você também pode ter um cluster atual do Kafka que reside fora do Google Cloud. Por exemplo, é possível ter uma carga de trabalho atual implantada no local ou em outra nuvem pública.

Configurar rede

Por padrão, o Dataflow inicia instâncias na rede padrão de nuvem privada virtual (VPC). Dependendo da configuração do Kafka, talvez seja necessário configurar uma rede e uma sub-rede diferentes para o Dataflow. Para mais informações, consulte Especificar uma rede e uma sub-rede. Ao configurar a rede, crie regras de firewall que permitam que as máquinas de worker do Dataflow alcancem os agentes do Kafka.

Se você estiver usando o VPC Service Controls, coloque o cluster do Kafka dentro do perímetro do VPC Service Controls ou estenda os perímetros para a VPN autorizada ou o Cloud Interconnect.

Se o cluster do Kafka for implantado fora do Google Cloud, crie uma conexão de rede entre o Dataflow e o cluster do Kafka. Existem várias opções de rede com diferentes vantagens e desvantagens:

A Interconexão dedicada é a melhor opção para desempenho e confiabilidade previsíveis, mas pode levar mais tempo para configurar porque os novos circuitos precisam ser provisionados por terceiros. Com uma topologia baseada em IP público e pouco trabalho em rede, é possível dar os primeiros passos rapidamente.

As próximas duas seções descrevem essas opções em mais detalhes.

Espaço de endereço do RFC 1918 compartilhado

A Interconexão dedicada e a VPN IPsec oferecem acesso direto aos endereços IP RFC 1918 na sua nuvem privada virtual (VPC), o que pode simplificar a configuração do Kafka. Se você estiver usando uma topologia baseada em VPN, configure uma VPN de alta capacidade.

Por padrão, o Dataflow inicia instâncias na rede VPC padrão. Em uma topologia de rede privada com rotas explicitamente definidas no Cloud Router, que conectam sub-redes do Google Cloud ao cluster do Kafka, você precisa de mais controle sobre onde localizar as instâncias do Dataflow. É possível usar o Dataflow para configurar os parâmetros de execução de network e subnetwork.

Verifique se a sub-rede correspondente tem endereços IP suficientes disponíveis para que o Dataflow possa iniciar instâncias enquanto tenta escalonar horizontalmente. Além disso, ao criar uma rede separada para iniciar as instâncias do Dataflow, verifique se você tem uma regra de firewall que ativa o tráfego TCP entre todas as máquinas virtuais no projeto. A rede padrão já tem essa regra de firewall configurada.

Espaço de endereço IP público

Essa arquitetura usa o Transport Layer Security (TLS, na sigla em inglês) para proteger o tráfego entre clientes externos e o Kafka e usa tráfego não criptografado para comunicação entre agentes. Quando o listener do Kafka se vincula a uma interface de rede usada para comunicação interna e externa, a configuração dele é simples. No entanto, em muitos cenários, os endereços anunciados externamente dos agentes do Kafka no cluster são diferentes das interfaces de rede internas usadas pelo Kafka. Nesses cenários, é possível usar a propriedade advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Os clientes externos se conectam pela porta 9093 por um canal "SSL", e os clientes internos se conectam pela porta 9092 por um canal de texto simples. Ao especificar um endereço em advertised.listeners, use nomes de DNS (kafkabroker-n.mydomain.com, nesta amostra) que resolvem para a mesma instância para tráfego externo e interno. O uso de endereços IP públicos pode não funcionar porque os endereços podem apresentar falha ao resolver o tráfego interno.

Definir permissões do IAM

Os jobs do Dataflow usam duas contas de serviço do IAM:

  • O serviço do Dataflow usa uma conta de serviço do Dataflow para manipular recursos do Google Cloud, como a criação de VMs.
  • As VMs de workers do Dataflow usam uma conta de serviço de worker para acessar arquivos e outros recursos do seu pipeline. Essa conta de serviço precisa de acesso de gravação à tabela de saída do BigQuery. Ela também precisa de acesso a todos os outros recursos que o job do pipeline referencia.

Verifique se essas duas contas de serviço têm os papéis apropriados. Para mais informações, consulte Segurança e permissões do Dataflow.

Transformar os dados do BigQuery

O modelo do Kafka para o BigQuery cria um pipeline que lê eventos de um ou mais tópicos do Kafka e os grava em uma tabela do BigQuery. Também é possível fornecer uma função JavaScript definida pelo usuário (UDF, na sigla em inglês) que transforma os dados de eventos antes que eles sejam gravados no BigQuery.

A saída do pipeline precisa ser de dados no formato JSON que correspondam ao esquema da tabela de saída. Se os dados do evento do Kafka já estiverem no formato JSON, crie uma tabela do BigQuery com um esquema correspondente e transmita os eventos diretamente para o BigQuery. Caso contrário, crie uma UDF que use os dados de eventos como entrada e retorne dados JSON que correspondem à sua tabela do BigQuery.

Por exemplo, suponha que os dados do evento contenham dois campos:

  • name (string)
  • customer_id (inteiro)

A saída do pipeline do Dataflow pode ser assim:

{ "name": "Alice", "customer_id": 1234 }

Supondo que os dados do evento ainda não estejam no formato JSON, grave um UDF que transforme os dados da seguinte maneira:

// UDF
function process(eventData) {
  var name;
  var customer_id;

  // TODO Parse the event data to extract the name and customer_id fields.

  // Return a JSON payload.
  return JSON.stringify({ name: name, customer_id: customer_id });
}

A UDF pode executar outros processamentos nos dados de eventos, como filtrar eventos, remover informações de identificação pessoal (PII) ou enriquecer os dados com campos adicionais.

Para mais informações sobre como criar uma UDF para o modelo, consulte Estender seu modelo do Dataflow com UDFs. Fazer upload do arquivo JavaScript para o Cloud Storage.

Criar a tabela de saída do BigQuery

Crie a tabela de saída do BigQuery antes de executar o modelo. O esquema da tabela precisa ser compatível com a saída JSON do pipeline. O pipeline grava o valor na coluna da tabela do BigQuery com o mesmo nome para cada propriedade no payload JSON. Todas as propriedades ausentes no JSON são interpretadas como valores NULL.

Usando o exemplo anterior, a tabela do BigQuery teria as colunas a seguir:

Nome da coluna Tipo de dados
name STRING
customer_id INTEGER

É possível usar a instrução SQL CREATE TABLE para criar a tabela:

CREATE TABLE my_dataset.kafka_events (name STRING, customer_id INTEGER);

Como alternativa, é possível especificar o esquema da tabela usando um arquivo de definição JSON. Para mais informações, consulte Como especificar um esquema na documentação do BigQuery.

Executar o job do Dataflow

Depois de criar a tabela do BigQuery, execute o modelo do Dataflow.

Console

Para criar o job do Dataflow usando o Console do Google Cloud, execute as seguintes etapas:

  1. Acesse a página do Dataflow no Console do Google Cloud.
  2. Clique em Criar job usando um modelo.
  3. No campo Nome do job, insira um nome do job.
  4. Em Endpoint regional, selecione uma região.
  5. Selecione o modelo "Kafka para BigQuery".
  6. Em Parâmetros obrigatórios, insira o nome da tabela de saída do BigQuery. A tabela já precisa existir e ter um esquema válido.
  7. Clique em Mostrar parâmetros opcionais e insira valores para, pelo menos, os seguintes parâmetros:

    • O tópico do Kafka a ser usado para ler a entrada.
    • A lista de servidores de inicialização do Kafka, separados por vírgulas.
    • Um e-mail da conta de serviço.

    Insira parâmetros adicionais conforme necessário. Especificamente, pode ser necessário especificar o seguinte:

    • Rede: para usar uma rede VPC diferente da rede padrão, especifique a rede e a sub-rede.
    • UDF: para usar uma UDF em JavaScript, especifique o local do script do Cloud Storage e o nome da função JavaScript a ser invocada.

gcloud

Para criar o job do Dataflow usando a Google Cloud CLI, execute o seguinte comando:

gcloud dataflow flex-template run JOB_NAME \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters inputTopics=KAFKA_TOPICS \
--parameters bootstrapServers=BOOTSTRAP_SERVERS \
--parameters outputTableSpec=OUTPUT_TABLE \
--parameters serviceAccount=IAM_SERVICE_ACCOUNT \
--parameters javascriptTextTransformGcsPath=UDF_SCRIPT_PATH \
--parameters javascriptTextTransformFunctionName=UDF_FUNCTION_NAME \
--network VPC_NETWORK_NAME \
--subnetwork SUBNET_NAME

Substitua as seguintes variáveis:

  • JOB_NAME. Um nome de job de sua escolha
  • LOCATION. A região em que o job será executado. Para mais informações sobre regiões e locais, consulte Locais do Dataflow.
  • KAFKA_TOPICS. Uma lista separada por vírgulas de tópicos do Kafka para ler.
  • BOOTSTRAP_SERVERS Uma lista separada por vírgulas de servidores de inicialização Kafka. Exemplo: 127:9092,127.0.0.1:9093.
  • OUTPUT_TABLE. A tabela de saída do BigQuery, especificada como PROJECT_ID:DATASET_NAME.TABLE_NAME. Exemplo: my_project:dataset1.table1.
  • IAM_SERVICE_ACCOUNT: opcional. O endereço de e-mail da conta de serviço para executar o job.
  • UDF_SCRIPT_PATH: opcional. O caminho do Cloud Storage para um arquivo JavaScript que contém uma UDF. Por exemplo: gs://your-bucket/your-function.js.
  • UDF_FUNCTION_NAME: opcional. O nome da função JavaScript a ser chamada como UDF.
  • VPC_NETWORK_NAME: opcional. Rede à qual os workers serão atribuídos.
  • SUBNET_NAME: opcional. A sub-rede a que os workers serão atribuídos.

Tipos de dados

Veja nesta seção como lidar com vários tipos de dados no esquema da tabela do BigQuery.

Internamente, as mensagens JSON são convertidas em objetos TableRow, e os valores dos campos TableRow são convertidos em tipos do BigQuery.

Tipos de escalar

O exemplo a seguir cria uma tabela do BigQuery com diferentes tipos de dados escalares, incluindo tipos de string, numéricos, booleanos, data/hora, intervalo e geográficos.

CREATE TABLE  my_dataset.kafka_events (
    string_col STRING,
    integer_col INT64,
    float_col FLOAT64,
    decimal_col DECIMAL,
    bool_col BOOL,
    date_col DATE,
    dt_col DATETIME,
    ts_col TIMESTAMP,
    interval_col INTERVAL,
    geo_col GEOGRAPHY
);

Veja um payload JSON com campos compatíveis:

{
  "string_col": "string_val",
  "integer_col": 10,
  "float_col": 3.142,
  "decimal_col": 5.2E11,
  "bool_col": true,
  "date_col": "2022-07-01",
  "dt_col": "2022-07-01 12:00:00.00",
  "ts_col": "2022-07-01T12:00:00.00Z",
  "interval_col": "0-13 370 48:61:61",
  "geo_col": "POINT(1 2)"
}

Observações:

  • Para uma coluna TIMESTAMP, use o método Date.toJSON do JavaScript para formatar o valor.
  • Para a coluna GEOGRAPHY, é possível especificar a região geográfica usando texto conhecido (WKT, na sigla em inglês) ou GeoJSON formatado como uma string. Para mais informações, consulte Como carregar dados geoespaciais.

Para mais informações sobre os tipos de dados no BigQuery, consulte Tipos de dados.

Matrizes

É possível armazenar uma matriz no BigQuery usando o tipo de dados ARRAY. No exemplo a seguir, o payload JSON contém uma propriedade denominada scores, em que o valor é uma matriz JSON:

{"name":"Emily","scores":[10,7,10,9]}

A instrução SQL CREATE TABLE a seguir cria uma tabela do BigQuery com um esquema compatível:

CREATE TABLE my_dataset.kafka_events (name STRING, scores ARRAY<INTEGER>);

A tabela terá esta aparência:

+-------+-------------+
| name  |   scores    |
+-------+-------------+
| Emily | [10,7,10,9] |
+-------+-------------+

Estruturas

O tipo de dados STRUCT no BigQuery contém uma lista ordenada de campos nomeados. É possível usar um STRUCT para armazenar objetos JSON que seguem um esquema consistente.

No exemplo a seguir, o payload JSON contém uma propriedade denominada val, em que o valor é um objeto JSON:

{"name":"Emily","val":{"a":"yes","b":"no"}}

A instrução SQL CREATE TABLE a seguir cria uma tabela do BigQuery com um esquema compatível:

CREATE TABLE my_dataset.kafka_events (name STRING, val STRUCT<a STRING, b STRING>);

A tabela terá esta aparência:

+-------+----------------------+
| name  |         val          |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

Dados de eventos semiestruturados

Se os dados de eventos do Kafka não seguirem um esquema rigoroso, armazene-os no BigQuery como um tipo de dados JSON (visualização). Ao armazenar dados JSON como um tipo de dados JSON, não é necessário definir antecipadamente o esquema de evento. Após a ingestão de dados, é possível consultar a tabela de saída usando os operadores de acesso a campo (notação por pontos) e de matriz.

Primeiro, crie uma tabela com uma coluna JSON:

-- Create the BigQuery table
CREATE TABLE my_dataset.kafka_events (event_data JSON);

Em seguida, defina uma UDF em JavaScript que envolve o payload de eventos em um objeto JSON:

// UDF
function process(eventData) {
  var json;

  // TODO Convert the event data to JSON.

  return JSON.stringify({ "event_data": json });
}

Depois que os dados forem gravados no BigQuery, use o operador de acesso a campos para consultar os campos individuais. Por exemplo, a consulta a seguir retorna o valor do campo name de cada registro:

SELECT event_data.name FROM my_dataset1.kafka_events;

Para mais informações sobre como usar JSON no BigQuery, consulte Como trabalhar com dados JSON no SQL padrão do Google.

Erros e geração de registros

Podem ocorrer erros ao executar o pipeline ou ao processar eventos individuais do Kafka.

Para mais informações sobre como lidar com erros de pipeline, consulte Solução de problemas e depuração de pipelines.

Se o job for executado com sucesso, mas ocorrer um erro ao processar um evento individual do Kafka, o job do pipeline gravará um registro de erros em uma tabela no BigQuery. O job em si não falha, e o erro no nível do evento não aparece como um erro no registro de jobs do Dataflow.

O job do pipeline cria automaticamente a tabela para armazenar registros de erros. Por padrão, o nome da tabela é "output_table_error_records", em que output_table é o nome da tabela de saída. Por exemplo, se a tabela de saída for denominada kafka_events, a tabela de erro será denominada kafka_events_error_records. É possível especificar um nome diferente definindo o parâmetro de modelo outputDeadletterTable:

outputDeadletterTable=my_project:dataset1.errors_table

Possíveis erros incluem:

  • Erros de serialização, incluindo JSON formatado incorretamente.
  • Digite erros de conversão, causados por uma incompatibilidade no esquema da tabela e nos dados JSON.
  • Campos extras nos dados JSON que não estão no esquema da tabela.

Exemplo de mensagens de erro:

Tipo de erro Dados do evento errorMessage
Erro de serialização "Hello world" Falha ao serializar json para a linha da tabela: "Hello world"
Erro de conversão de tipo {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
Campo desconhecido {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

Próximas etapas