Modelo do Apache Kafka para Kafka

O modelo Apache Kafka para Apache Kafka cria um pipeline de streaming que ingere dados como bytes de uma origem do Apache Kafka e, em seguida, grava os bytes em um coletor do Apache Kafka.

Requisitos de pipeline

  • O tópico de origem do Apache Kafka precisa existir.
  • Os servidores do agente de origem e coletor do Apache Kafka precisam estar em execução e poder ser acessados nas máquinas de worker do Dataflow.
  • Se você estiver usando o Google Cloud Managed Service para Apache Kafka como origem ou coletor, o tópico precisará existir antes de iniciar o modelo.

Formato de mensagem do Kafka

As mensagens de origem do Apache Kafka são lidas como bytes, e os bytes são gravados no coletor do Apache Kafka.

Autenticação

O modelo Apache Kafka para Apache Kafka aceita a autenticação SASL/PLAIN e TLS nos agentes do Kafka.

Parâmetros do modelo

Parâmetros obrigatórios

  • readBootstrapServerAndTopic: o tópico do Kafka a ser usado para ler a entrada.
  • kafkaReadAuthenticationMode: o modo de autenticação a ser usado com o cluster do Kafka. Use NONE para nenhuma autenticação ou SASL_PLAIN para o nome de usuário e senha SASL/PLAIN e TLS para autenticação com base no certificado. O Apache Kafka para BigQuery só oferece suporte ao modo de autenticação SASL_PLAIN. O padrão é: SASL_PLAIN.
  • writeBootstrapServerAndTopic: tópico do Kafka em que a saída será gravada.
  • kafkaWriteAuthenticationMethod: o modo de autenticação a ser usado com o cluster do Kafka. Use NONE para nenhuma autenticação ou SASL_PLAIN para o nome de usuário e senha SASL/PLAIN e TLS para autenticação com base no certificado. O padrão é: NENHUM.

Parâmetros opcionais

  • enableCommitOffsets: deslocamentos de confirmação de mensagens processadas para o Kafka. Se ativado, isso minimizará as lacunas ou o processamento duplicado de mensagens ao reiniciar o pipeline. Exige especificar o ID do grupo de consumidores. O padrão é: falso.
  • consumerGroupId: o identificador exclusivo do grupo de consumidores ao qual esse pipeline pertence. Obrigatório se os deslocamentos de confirmação para Kafka estiverem ativados. O padrão é vazio.
  • kafkaReadOffset: o ponto de partida para ler mensagens quando não há deslocamentos confirmados. O mais antigo começa no início, o mais recente a partir da mensagem mais recente. O padrão é: mais recente.
  • kafkaReadUsernameSecretId: o ID do secret do Secret Manager do Google Cloud que contém o nome de usuário do Kafka a ser usado com a autenticação SASL_PLAIN. (Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). O padrão é vazio.
  • kafkaReadPasswordSecretId: o ID do secret do Secret Manager do Google Cloud que contém a senha do Kafka a ser usada com a autenticação SASL_PLAIN. (Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). O padrão é vazio.
  • kafkaReadKeystoreLocation: o caminho do Google Cloud Storage para o arquivo Java KeyStore (JKS) que contém o certificado TLS e a chave privada a serem usados na autenticação com o cluster do Kafka. Exemplo: gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation: o caminho do Google Cloud Storage para o arquivo Java TrustStore (JKS) que contém os certificados confiáveis a serem usados para verificar a identidade do agente Kafka.
  • kafkaReadTruststorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha a ser usada para acessar o arquivo Java TrustStore (JKS) para autenticação TLS do Kafka (exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaReadKeystorePasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a senha usada para acessar o arquivo Java KeyStore (JKS) para a autenticação TLS do Kafka. Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeyPasswordSecretId: o ID do secret do Secret Manager do Google Cloud que contém a senha usada para acessar a chave privada no arquivo Java KeyStore (JKS) para a autenticação TLS do Kafka. Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteUsernameSecretId: o ID do secret do Secret Manager do Google Cloud que contém o nome de usuário do Kafka para autenticação SASL_PLAIN com o cluster do Kafka de destino. (Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). O padrão é vazio.
  • kafkaWritePasswordSecretId: o ID do secret do Secret Manager do Google Cloud que contém a senha do Kafka a ser usada para a autenticação SASL_PLAIN com o cluster do Kafka de destino. (Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). O padrão é vazio.
  • kafkaWriteKeystoreLocation: o caminho do Google Cloud Storage para o arquivo Java KeyStore (JKS) que contém o certificado TLS e a chave privada para autenticação com o cluster Kafka de destino. (Exemplo: gs://
  • kafkaWriteTruststoreLocation: o caminho do Google Cloud Storage para o arquivo Java TrustStore (JKS) que contém os certificados confiáveis a serem usados para verificar a identidade do agente Kafka de destino.
  • kafkaWriteTruststorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha usada para acessar o arquivo Java TrustStore (JKS) para autenticação TLS com o cluster do Kafka de destino. Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteKeystorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha para acessar o arquivo Java KeyStore (JKS) a ser usado para a autenticação TLS com o cluster Kafka de destino. Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteKeyPasswordSecretId: o ID do secret do Secret Manager do Google Cloud que contém a senha usada para acessar a chave privada no arquivo Java KeyStore (JKS) para autenticação TLS com o cluster de destino do Kafka. Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Kafka to Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Opcional: para alternar do processamento "Exatamente uma vez" para o modo de streaming "Pelo menos uma vez", selecione Pelo menos uma vez.
  8. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Cloud Storage \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • BIGQUERY_TABLE: nome da tabela do Cloud Storage
  • KAFKA_TOPICS: a lista de tópicos do Apache Kakfa. Se vários tópicos forem fornecidos, use vírgulas. Consulte gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF) do JavaScript que você quer usar, por exemplo, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar

    Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

  • KAFKA_SERVER_ADDRESSES: a lista de endereços IP do servidor de gerenciamento do Apache Kafka. Cada endereço IP precisa ter o número da porta que o servidor pode acessar. Por exemplo, 35.70.252.199:9092. Se vários endereços forem fornecidos, use vírgulas para separá-los. Consulte gcloud topic escaping.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Cloud Storage",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • BIGQUERY_TABLE: nome da tabela do Cloud Storage
  • KAFKA_TOPICS: a lista de tópicos do Apache Kakfa. Se vários tópicos forem fornecidos, use vírgulas. Consulte gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF) do JavaScript que você quer usar, por exemplo, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar

    Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

  • KAFKA_SERVER_ADDRESSES: a lista de endereços IP do servidor de gerenciamento do Apache Kafka. Cada endereço IP precisa ter o número da porta que o servidor pode acessar. Por exemplo, 35.70.252.199:9092. Se vários endereços forem fornecidos, use vírgulas para separá-los. Consulte gcloud topic escaping.

Para mais informações, consulte Gravar dados do Kafka no Cloud Storage com Dataflow.

A seguir