Modelo Apache Kafka para Kafka

O modelo Apache Kafka para Apache Kafka cria um pipeline de streaming que carrega dados como bytes a partir de uma origem do Apache Kafka e, em seguida, escreve os bytes num destino do Apache Kafka.

Requisitos do pipeline

  • O tópico de origem do Apache Kafka tem de existir.
  • Os servidores de agente de origem e destino do Apache Kafka têm de estar em execução e acessíveis a partir das máquinas de trabalho do Dataflow.
  • Se estiver a usar o Google Cloud Managed Service para Apache Kafka como origem ou destino, o tópico tem de existir antes de iniciar o modelo.

Formato de mensagem do Kafka

As mensagens de origem do Apache Kafka são lidas como bytes e os bytes são escritos no destino do Apache Kafka.

Autenticação

O modelo Apache Kafka para Apache Kafka suporta a autenticação SASL/PLAIN e TLS para agentes Kafka.

Parâmetros de modelos

Parâmetros obrigatórios

  • readBootstrapServerAndTopic: servidor de arranque e tópico do Kafka para ler a entrada. Por exemplo, localhost:9092;topic1,topic2.
  • kafkaReadAuthenticationMode: o modo de autenticação a usar com o cluster Kafka. Use KafkaAuthenticationMethod.NONE para sem autenticação, KafkaAuthenticationMethod.SASL_PLAIN para nome de utilizador e palavra-passe SASL/PLAIN, KafkaAuthenticationMethod.SASL_SCRAM_512 para autenticação SASL_SCRAM_512 e KafkaAuthenticationMethod.TLS para autenticação baseada em certificados. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS deve ser usado apenas para o cluster do Google Cloud Apache Kafka para BigQuery. Permite a autenticação através das credenciais predefinidas da aplicação.
  • writeBootstrapServerAndTopic: tópico do Kafka para o qual escrever o resultado.
  • kafkaWriteAuthenticationMethod: o modo de autenticação a usar com o cluster Kafka. Use NONE para nenhuma autenticação, SASL_PLAIN para nome de utilizador e palavra-passe SASL/PLAIN, SASL_SCRAM_512 para autenticação baseada em SASL_SCRAM_512 e TLS para autenticação baseada em certificado. A predefinição é: APPLICATION_DEFAULT_CREDENTIALS.

Parâmetros opcionais

  • enableCommitOffsets: confirma os desvios das mensagens processadas no Kafka. Se estiver ativada, esta opção minimiza as lacunas ou o processamento duplicado de mensagens quando reinicia o pipeline. Requer a especificação do ID do grupo de consumidores. A predefinição é: false.
  • consumerGroupId: o identificador exclusivo do grupo de consumidores ao qual esta conduta pertence. Obrigatório se a opção Commit Offsets to Kafka estiver ativada. A predefinição é vazio.
  • kafkaReadOffset: o ponto de partida para ler mensagens quando não existem deslocamentos comprometidos. A mais antiga começa desde o início e a mais recente começa a partir da mensagem mais recente. A predefinição é: latest.
  • kafkaReadUsernameSecretId: o ID secreto do Google Cloud Secret Manager que contém o nome de utilizador do Kafka a usar com a autenticação SASL_PLAIN. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. A predefinição é vazio.
  • kafkaReadPasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe do Kafka a usar com a autenticação SASL_PLAIN. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. A predefinição é vazio.
  • kafkaReadKeystoreLocation: o caminho do Google Cloud Storage para o ficheiro Java KeyStore (JKS) que contém o certificado TLS e a chave privada a usar na autenticação com o cluster Kafka. Por exemplo, gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation: o caminho do Google Cloud Storage para o ficheiro Java TrustStore (JKS) que contém os certificados fidedignos a usar para validar a identidade do agente Kafka.
  • kafkaReadTruststorePasswordSecretId: o ID do segredo do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder ao ficheiro Java TrustStore (JKS) para autenticação TLS do Kafka. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder ao ficheiro Java KeyStore (JKS) para a autenticação TLS do Kafka. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeyPasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder à chave privada no ficheiro Java KeyStore (JKS) para autenticação TLS do Kafka. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramUsernameSecretId: o ID secreto do Google Cloud Secret Manager que contém o nome de utilizador do Kafka a usar com a autenticação SASL_SCRAM. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramPasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe do Kafka a usar com a autenticação SASL_SCRAM. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramTruststoreLocation: o caminho do Google Cloud Storage para o ficheiro Java TrustStore (JKS) que contém os certificados fidedignos a usar para validar a identidade do agente Kafka.
  • kafkaReadSaslScramTruststorePasswordSecretId: o ID do segredo do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder ao ficheiro Java TrustStore (JKS) para autenticação SASL_SCRAM do Kafka. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteUsernameSecretId: o ID secreto do Google Cloud Secret Manager que contém o nome de utilizador do Kafka para autenticação SASL_PLAIN com o cluster Kafka de destino. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. A predefinição é vazio.
  • kafkaWritePasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe do Kafka a usar para a autenticação SASL_PLAIN com o cluster Kafka de destino. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. A predefinição é vazio.
  • kafkaWriteKeystoreLocation: o caminho do Google Cloud Storage para o ficheiro Java KeyStore (JKS) que contém o certificado TLS e a chave privada para autenticação com o cluster Kafka de destino. Por exemplo, gs://<BUCKET>/<KEYSTORE>.jks.
  • kafkaWriteTruststoreLocation: o caminho do Google Cloud Storage para o ficheiro Java TrustStore (JKS) que contém os certificados fidedignos a usar para validar a identidade do agente Kafka de destino.
  • kafkaWriteTruststorePasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder ao ficheiro Java TrustStore (JKS) para autenticação TLS com o cluster Kafka de destino. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteKeystorePasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe para aceder ao ficheiro Java KeyStore (JKS) a usar para a autenticação TLS com o cluster Kafka de destino. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteKeyPasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder à chave privada no ficheiro Java KeyStore (JKS) para autenticação TLS com o cluster Kafka de destino. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Kafka to Cloud Storage template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Opcional: para mudar do processamento exatamente uma vez para o modo de streaming pelo menos uma vez, selecione Pelo menos uma vez.
  8. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \
    --parameters \
readBootstrapServerAndTopic=READ_BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
writeBootstrapServerAndTopic=WRITE_BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaWriteAuthenticationMethod=APPLICATION_DEFAULT_CREDENTIALS
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • READ_BOOTSTRAP_SERVER_AND_TOPIC: o endereço do servidor de arranque do Apache Kafka e o tópico a partir do qual ler
  • WRITE_BOOTSTRAP_SERVER_AND_TOPIC: o endereço do servidor de arranque do Apache Kafka e o tópico para o qual escrever

    O formato do endereço do servidor de arranque e do tópico depende do tipo de cluster:

    • Cluster do Managed Service for Apache Kafka: projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Cluster Kafka externo: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readBootstrapServerAndTopic": "READ_BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
          "writeBootstrapServerAndTopic": "WRITE_BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaWriteAuthenticationMethod": "APPLICATION_DEFAULT_CREDENTIALS
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka",
   }
}
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • READ_BOOTSTRAP_SERVER_AND_TOPIC: o endereço do servidor de arranque do Apache Kafka e o tópico a partir do qual ler
  • WRITE_BOOTSTRAP_SERVER_AND_TOPIC: o endereço do servidor de arranque do Apache Kafka e o tópico para o qual escrever

    O formato do endereço do servidor de arranque e do tópico depende do tipo de cluster:

    • Cluster do Managed Service for Apache Kafka: projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Cluster Kafka externo: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME

O que se segue?