Modelo do Apache Kafka para BigQuery

O modelo do Apache Kafka para BigQuery é um pipeline de streaming que ingere dados de texto do Apache Kafka, executa uma função definida pelo usuário (UDF) e gera os registros resultantes no BigQuery. Qualquer erro que ocorrer na transformação dos dados, na execução da UDF ou na inserção na tabela de respostas será inserido em uma tabela de erros separada no BigQuery. Se a tabela de erros não existir antes da execução, ela será criada.

Requisitos de pipeline

  • A tabela de respostas do BigQuery precisa existir.
  • O servidor do agente do Apache Kafka precisa estar em execução e acessível em máquinas de trabalho do Dataflow.
  • Os tópicos do Apache Kafka precisam existir, e as mensagens precisam estar codificadas em um formato JSON válido.

Parâmetros do modelo

Parâmetro Descrição
outputTableSpec O local da tabela de respostas do BigQuery para gravar as mensagens do Apache Kafka, no formato de my-project:dataset.table
inputTopics Os tópicos de entrada do Apache Kafka para leitura em uma lista separada por vírgulas. Exemplo: messages
bootstrapServers O endereço do host dos servidores do agente do Apache Kafka em execução em uma lista separada por vírgulas, cada endereço de host no formato 35.70.252.199:9092
javascriptTextTransformGcsPath Opcional: O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName Opcional: O nome da função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.
javascriptTextTransformReloadIntervalMinutes Opcional: especifica a frequência de recarregamento da UDF em minutos. Se o valor for maior que 0, o Dataflow verificará periodicamente o arquivo da UDF no Cloud Storage e atualizará a UDF se o arquivo for modificado. Esse parâmetro permite atualizar a UDF enquanto o pipeline está em execução, sem precisar reiniciar o job. Se o valor for 0, o recarregamento da UDF será desativado. O valor padrão é 0.
outputDeadletterTable Opcional: a tabela do BigQuery para mensagens que não alcançaram a tabela de saída, no formato de my-project:dataset.my-deadletter-table. Se não existir, a tabela será criada durante a execução do pipeline. Se não for especificada, será usada <outputTableSpec>_error_records.
useStorageWriteApi Opcional: Se true, o pipeline usa a API BigQuery Storage Write. O valor padrão é false. Para mais informações, consulte Como usar a API Storage Write.
useStorageWriteApiAtLeastOnce Opcional: Ao usar a API Storage Write, especifica a semântica de gravação. Para usar semântica pelo menos uma vez, defina esse parâmetro como true. Para usar semântica exatamente uma vez, defina o parâmetro como false. Esse parâmetro se aplica apenas quando useStorageWriteApi é true. O valor padrão é false.
numStorageWriteApiStreams Opcional: Ao usar a API Storage Write, especifica o número de fluxos de gravação. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, você precisará definir esse parâmetro.
storageWriteApiTriggeringFrequencySec Opcional: Ao usar a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, você precisará definir esse parâmetro.

Função definida pelo usuário

Também é possível estender esse modelo escrevendo uma função definida pelo usuário (UDF). O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Especificação da função

A UDF tem a seguinte especificação:

  • Entrada: o valor de registro Kafka, serializado como uma string JSON.
  • Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Kafka to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Opcional: para alternar do processamento "Exatamente uma vez" para o modo de streaming "Pelo menos uma vez", selecione Pelo menos uma vez.
  8. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • KAFKA_TOPICS: a lista de tópicos do Apache Kakfa. Se vários tópicos forem fornecidos, siga as instruções sobre como evitar vírgulas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: o URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar

    Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

  • KAFKA_SERVER_ADDRESSES: a lista de endereços IP do servidor de gerenciamento do Apache Kafka. Cada endereço IP precisa ter o número de portas que o servidor pode acessar. Por exemplo, 35.70.252.199:9092. Se vários endereços forem fornecidos, siga instruções sobre como evitar vírgulas.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • KAFKA_TOPICS: a lista de tópicos do Apache Kakfa. Se vários tópicos forem fornecidos, siga as instruções sobre como evitar vírgulas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: o URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar

    Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

  • KAFKA_SERVER_ADDRESSES: a lista de endereços IP do servidor de gerenciamento do Apache Kafka. Cada endereço IP precisa ter o número de portas que o servidor pode acessar. Por exemplo, 35.70.252.199:9092. Se vários endereços forem fornecidos, siga instruções sobre como evitar vírgulas.

Para mais informações, consulte Gravar dados do Kafka no BigQuery com o Dataflow.

A seguir