Modelo do Apache Kafka para BigQuery

O modelo Apache Kafka para BigQuery é um pipeline de streaming que ingere dados de texto de clusters do Apache Kafka para BigQuery e gera os registros resultantes em tabelas do BigQuery. Todos os erros que ocorrem ao inserir dados na tabela de saída são inseridos em uma tabela de erros separada no BigQuery.

Também é possível usar o modelo Apache Kafka para BigQuery com o Kafka autogerenciado ou externo.

Requisitos de pipeline

  • O servidor do agente do Apache Kafka precisa estar em execução e acessível em máquinas de trabalho do Dataflow.
  • Os tópicos do Apache Kafka precisam existir.
  • Ative as APIs Dataflow, BigQuery e Cloud Storage. Se a autenticação é necessária, você também precisa ativar a API Secret Manager.
  • Criar um conjunto de dados e uma tabela do BigQuery com o esquema apropriado para seu tópico de entrada do Kafka Se você estiver usando vários esquemas no mesmo tópico e quiser gravar em várias tabelas, não será necessário criar a tabela antes de configurar o pipeline.
  • Quando a fila de mensagens inativas (mensagens não processadas) do modelo estiver ativada, crie uma tabela vazia que não tenha um esquema para a fila.

Formato de mensagem do Kafka

O modelo do Apache Kafka para BigQuery oferece suporte à leitura de mensagens do Kafka nos seguintes formatos: CONFLUENT_AVRO_WIRE_FORMAT, AVRO_BINARY_FORMAT e JSON.

Autenticação

O modelo Apache Kafka para BigQuery oferece suporte à autenticação SASL/PLAIN para agentes Kafka.

Parâmetros do modelo

Parâmetros obrigatórios

  • readBootstrapServerAndTopic: o tópico do Kafka a ser usado para ler a entrada.
  • kafkaReadAuthenticationMode: o modo de autenticação a ser usado com o cluster do Kafka. Use NONE para nenhuma autenticação ou SASL_PLAIN para o nome de usuário e senha SASL/PLAIN. O Apache Kafka para BigQuery só oferece suporte ao modo de autenticação SASL_PLAIN. O padrão é: SASL_PLAIN.
  • writeMode: modo de gravação: grava registros em uma ou várias tabelas (com base no esquema). O modo DYNAMIC_TABLE_NAMES é compatível apenas com o formato de mensagem de origem AVRO_CONFLUENT_WIRE_FORMAT e a origem do esquema SCHEMA_REGISTRY. O nome da tabela de destino será gerado automaticamente com base no nome do esquema Avro de cada mensagem. Ele pode ser um único esquema (criando uma única tabela) ou vários esquemas (criando várias tabelas). O modo SINGLE_TABLE_NAME grava em uma única tabela (esquema único) especificada pelo usuário. O padrão é SINGLE_TABLE_NAME.
  • useBigQueryDLQ: se verdadeiro, as mensagens com falha serão gravadas no BigQuery com informações extras sobre o erro. A tabela de mensagens inativas precisa ser criada sem esquema. O padrão é: falso.
  • messageFormat: o formato das mensagens do Kafka a serem lidas. Os valores aceitos são AVRO_CONFLUENT_WIRE_FORMAT (Avro codificado do Confluent Schema Registry), AVRO_BINARY_ENCODING (Avro binário simples) e JSON. O padrão é: AVRO_CONFLUENT_WIRE_FORMAT.

Parâmetros opcionais

  • outputTableSpec: o local da tabela do BigQuery em que a saída será gravada. O nome precisa estar no formato <project>:<dataset>.<table_name>. O esquema da tabela precisa corresponder aos objetos de entrada.
  • persistKafkaKey: se verdadeiro, o pipeline manterá a chave da mensagem do Kafka na tabela do BigQuery, em um campo _key do tipo BYTES. O padrão é falso (a chave é ignorada).
  • outputProject: projeto de saída do BigQuery em que o conjunto de dados reside. As tabelas serão criadas dinamicamente no conjunto de dados. O padrão é vazio.
  • outputDataset: conjunto de dados de saída do BigQuery para gravar a saída. As tabelas serão criadas dinamicamente no conjunto de dados. Se as tabelas forem criadas de antemão, os nomes das tabelas devem seguir a convenção de nomenclatura especificada. O nome precisa ser bqTableNamePrefix + Avro Schema FullName , e cada palavra será separada por um hífen "-". O padrão é vazio.
  • bqTableNamePrefix: prefixo de nomenclatura a ser usado na criação de tabelas de saída do BigQuery. Aplicável apenas ao usar o registro de esquemas. O padrão é vazio.
  • createDisposition : CreateDisposition do BigQuery. Por exemplo, CREATE_IF_NEEDED, CREATE_NEVER. O valor padrão é: CREATE_IF_NEEDED.
  • writeDisposition: WriteDisposition do BigQuery. Por exemplo, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. O valor padrão é: WRITE_APPEND.
  • useAutoSharding: se verdadeiro, o pipeline usa a fragmentação automática ao gravar no BigQuery. O valor padrão é true.
  • numStorageWriteApiStreams: especifica o número de streams de gravação. Esse parâmetro precisa ser definido. O padrão é 0.
  • storageWriteApiTriggeringFrequencySec: especifica a frequência de acionamento em segundos. Esse parâmetro precisa ser definido. O padrão é 5 segundos.
  • useStorageWriteApiAtLeastOnce : esse parâmetro só entra em vigor se a opção "Usar a API BigQuery Storage Write" está ativada. Se ativada, a semântica do tipo "pelo menos uma vez" será usada para a API Storage Write. Caso contrário, a semântica "exatamente uma" será usada. O padrão é: falso.
  • outputDeadletterTable: tabela do BigQuery para mensagens com falha. As mensagens não chegam à tabela de saída por diferentes motivos (por exemplo, esquema incompatível, json incorreto) são gravadas nesta tabela. (Exemplo: id-do-projeto:seu-conjunto-de-dados.nome-da-tabela).
  • enableCommitOffsets: deslocamentos de confirmação de mensagens processadas para o Kafka. Se ativado, isso minimizará as lacunas ou o processamento duplicado de mensagens ao reiniciar o pipeline. Exige especificar o ID do grupo de consumidores. O padrão é: falso.
  • consumerGroupId: o identificador exclusivo do grupo de consumidores ao qual esse pipeline pertence. Obrigatório se os deslocamentos de confirmação para Kafka estiverem ativados. O padrão é vazio.
  • kafkaReadOffset: o ponto de partida para ler mensagens quando não há deslocamentos confirmados. O mais antigo começa no início, o mais recente a partir da mensagem mais recente. O padrão é: mais recente.
  • kafkaReadUsernameSecretId: o ID do secret do Secret Manager do Google Cloud que contém o nome de usuário do Kafka a ser usado com a autenticação SASL_PLAIN. (Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). O padrão é vazio.
  • kafkaReadPasswordSecretId: o ID do secret do Secret Manager do Google Cloud que contém a senha do Kafka a ser usada com a autenticação SASL_PLAIN. (Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). O padrão é vazio.
  • schemaFormat: o formato de esquema do Kafka. Pode ser fornecido como SINGLE_SCHEMA_FILE ou SCHEMA_REGISTRY. Se SINGLE_SCHEMA_FILE for especificado, todas as mensagens vão precisar ter o esquema mencionado no arquivo de esquema avro. Se SCHEMA_REGISTRY for especificado, as mensagens poderão ter um único esquema ou vários esquemas. O padrão é SINGLE_SCHEMA_FILE.
  • confluentAvroSchemaPath: o caminho do Google Cloud Storage para o único arquivo de esquema Avro usado para decodificar todas as mensagens em um tópico. O padrão é vazio.
  • schemaRegistryConnectionUrl: o URL da instância do Confluent Schema Registry usada para gerenciar esquemas Avro para decodificação de mensagens. O padrão é vazio.
  • binaryAvroSchemaPath: o caminho do Google Cloud Storage para o arquivo de esquema do Avro usado para decodificar mensagens Avro codificadas em binário. O padrão é vazio.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Kafka to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Opcional: para alternar do processamento "Exatamente uma vez" para o modo de streaming "Pelo menos uma vez", selecione Pelo menos uma vez.
  8. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • KAFKA_TOPICS: a lista de tópicos do Apache Kakfa. Se vários tópicos forem fornecidos, use vírgulas. Consulte gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF) do JavaScript que você quer usar, por exemplo, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar

    Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

  • KAFKA_SERVER_ADDRESSES: a lista de endereços IP do servidor de gerenciamento do Apache Kafka. Cada endereço IP precisa do número da porta que o servidor pode acessar. Por exemplo, 35.70.252.199:9092. Se vários endereços forem fornecidos, use vírgulas para separá-los. Consulte gcloud topic escaping.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • KAFKA_TOPICS: a lista de tópicos do Apache Kakfa. Se vários tópicos forem fornecidos, use vírgulas. Consulte gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF) do JavaScript que você quer usar, por exemplo, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar

    Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

  • KAFKA_SERVER_ADDRESSES: a lista de endereços IP do servidor de gerenciamento do Apache Kafka. Cada endereço IP precisa do número da porta que o servidor pode acessar. Por exemplo, 35.70.252.199:9092. Se vários endereços forem fornecidos, use vírgulas para separá-los. Consulte gcloud topic escaping.

Para mais informações, consulte Gravar dados do Kafka no BigQuery com o Dataflow.

A seguir