Modelo do Apache Kafka para o Cloud Storage

O modelo Apache Kafka para Cloud Storage é um pipeline de streaming que carrega dados de texto do Google Cloud Managed Service para Apache Kafka e envia os registos para o Cloud Storage.

Também pode usar o modelo Apache Kafka para Cloud Storage com o Kafka autogerido ou externo.

Requisitos do pipeline

  • O contentor do Cloud Storage de saída tem de existir.
  • O servidor do agente do Apache Kafka tem de estar em execução e acessível a partir das máquinas de trabalho do Dataflow.
  • Os tópicos do Apache Kafka têm de existir.

Formato de mensagem do Kafka

Este modelo suporta a leitura de mensagens do Kafka nos seguintes formatos:

Formato JSON

Para ler mensagens JSON, defina o parâmetro de modelo messageFormat como "JSON".

Codificação binária Avro

Para ler mensagens Avro binárias, defina os seguintes parâmetros de modelo:

  • messageFormat: "AVRO_BINARY_ENCODING".
  • binaryAvroSchemaPath: a localização de um ficheiro de esquema Avro no Cloud Storage. Exemplo: gs://BUCKET_NAME/message-schema.avsc.

Para mais informações sobre o formato binário Avro, consulte a secção Codificação binária na documentação do Apache Avro.

Avro codificado no registo de esquemas da Confluent

Para ler mensagens em Avro codificado no Confluent Schema Registry, defina os seguintes parâmetros do modelo:

  • messageFormat: "AVRO_CONFLUENT_WIRE_FORMAT".

  • schemaFormat: um dos seguintes valores:
    • "SINGLE_SCHEMA_FILE": o esquema de mensagens está definido num ficheiro de esquema Avro. Especifique a localização do ficheiro de esquema no Cloud Storage no parâmetro confluentAvroSchemaPath.
    • "SCHEMA_REGISTRY": as mensagens são codificadas através do Confluent Schema Registry. Especifique o URL da instância do Confluent Schema Registry no parâmetro schemaRegistryConnectionUrl e especifique o modo de autenticação no parâmetro schemaRegistryAuthenticationMode.

Para mais informações sobre este formato, consulte Formato de transmissão na documentação da Confluent.

Formato do ficheiro de saída

O formato do ficheiro de saída é o mesmo formato da mensagem Kafka de entrada. Por exemplo, se selecionar JSON para o formato de mensagem do Kafka, os ficheiros JSON são escritos no contentor do Cloud Storage de saída.

Autenticação

O modelo Apache Kafka para Cloud Storage suporta a autenticação SASL/PLAIN para agentes Kafka.

Parâmetros de modelos

Parâmetros obrigatórios

  • readBootstrapServerAndTopic: tópico do Kafka a partir do qual ler a entrada.
  • outputDirectory: o caminho e o prefixo do nome de ficheiro para escrever ficheiros de saída. Tem de terminar com uma barra. Por exemplo, gs://your-bucket/your-path/.
  • kafkaReadAuthenticationMode: o modo de autenticação a usar com o cluster Kafka. Use KafkaAuthenticationMethod.NONE para sem autenticação, KafkaAuthenticationMethod.SASL_PLAIN para nome de utilizador e palavra-passe SASL/PLAIN, KafkaAuthenticationMethod.SASL_SCRAM_512 para autenticação SASL_SCRAM_512 e KafkaAuthenticationMethod.TLS para autenticação baseada em certificados. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS deve ser usado apenas para o cluster do Google Cloud Apache Kafka para BigQuery. Permite a autenticação através das credenciais predefinidas da aplicação.
  • messageFormat: o formato das mensagens Kafka a ler. Os valores suportados são AVRO_CONFLUENT_WIRE_FORMAT (Avro codificado no registo de esquemas da Confluent), AVRO_BINARY_ENCODING (Avro binário simples) e JSON. A predefinição é: AVRO_CONFLUENT_WIRE_FORMAT.
  • useBigQueryDLQ: se for verdadeiro, as mensagens com falhas são escritas no BigQuery com informações de erro adicionais. A predefinição é: false.

Parâmetros opcionais

  • windowDuration: a duração/tamanho da janela em que os dados vão ser escritos no Cloud Storage. Os formatos permitidos são: Ns (para segundos, exemplo: 5s), Nm (para minutos, exemplo: 12m), Nh (para horas, exemplo: 2h). Por exemplo, 5m. A predefinição é: 5 m.
  • outputFilenamePrefix: o prefixo a colocar em cada ficheiro dividido em janelas. Por exemplo, output-. A predefinição é: output.
  • numShards: o número máximo de fragmentos de saída produzidos durante a escrita. Um número mais elevado de fragmentos significa um débito mais elevado para a escrita no Cloud Storage, mas um custo de agregação de dados potencialmente mais elevado entre fragmentos ao processar ficheiros do Cloud Storage de saída. O valor predefinido é decidido pelo Dataflow.
  • enableCommitOffsets: confirma os desvios das mensagens processadas no Kafka. Se estiver ativada, esta opção minimiza as lacunas ou o processamento duplicado de mensagens quando reinicia o pipeline. Requer a especificação do ID do grupo de consumidores. A predefinição é: false.
  • consumerGroupId: o identificador exclusivo do grupo de consumidores ao qual esta conduta pertence. Obrigatório se a opção Commit Offsets to Kafka estiver ativada. A predefinição é vazio.
  • kafkaReadOffset: o ponto de partida para ler mensagens quando não existem deslocamentos comprometidos. A mais antiga começa desde o início e a mais recente começa a partir da mensagem mais recente. A predefinição é: latest.
  • kafkaReadUsernameSecretId: o ID secreto do Google Cloud Secret Manager que contém o nome de utilizador do Kafka a usar com a autenticação SASL_PLAIN. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. A predefinição é vazio.
  • kafkaReadPasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe do Kafka a usar com a autenticação SASL_PLAIN. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. A predefinição é vazio.
  • kafkaReadKeystoreLocation: o caminho do Google Cloud Storage para o ficheiro Java KeyStore (JKS) que contém o certificado TLS e a chave privada a usar na autenticação com o cluster Kafka. Por exemplo, gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation: o caminho do Google Cloud Storage para o ficheiro Java TrustStore (JKS) que contém os certificados fidedignos a usar para validar a identidade do agente Kafka.
  • kafkaReadTruststorePasswordSecretId: o ID do segredo do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder ao ficheiro Java TrustStore (JKS) para autenticação TLS do Kafka. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder ao ficheiro Java KeyStore (JKS) para a autenticação TLS do Kafka. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeyPasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder à chave privada no ficheiro Java KeyStore (JKS) para autenticação TLS do Kafka. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramUsernameSecretId: o ID secreto do Google Cloud Secret Manager que contém o nome de utilizador do Kafka a usar com a autenticação SASL_SCRAM. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramPasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe do Kafka a usar com a autenticação SASL_SCRAM. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramTruststoreLocation: o caminho do Google Cloud Storage para o ficheiro Java TrustStore (JKS) que contém os certificados fidedignos a usar para validar a identidade do agente Kafka.
  • kafkaReadSaslScramTruststorePasswordSecretId: o ID do segredo do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder ao ficheiro Java TrustStore (JKS) para autenticação SASL_SCRAM do Kafka. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaFormat: o formato do esquema do Kafka. Pode ser fornecido como SINGLE_SCHEMA_FILE ou SCHEMA_REGISTRY. Se SINGLE_SCHEMA_FILE for especificado, use o esquema mencionado no ficheiro de esquema avro para todas as mensagens. Se SCHEMA_REGISTRY for especificado, as mensagens podem ter um único esquema ou vários esquemas. A predefinição é: SINGLE_SCHEMA_FILE.
  • confluentAvroSchemaPath: o caminho do Google Cloud Storage para o ficheiro de esquema Avro único usado para descodificar todas as mensagens num tópico. A predefinição é vazio.
  • schemaRegistryConnectionUrl: o URL da instância do Confluent Schema Registry usado para gerir esquemas Avro para descodificação de mensagens. A predefinição é vazio.
  • binaryAvroSchemaPath: o caminho do Google Cloud Storage para o ficheiro de esquema Avro usado para descodificar mensagens Avro codificadas em binário. A predefinição é vazio.
  • schemaRegistryAuthenticationMode: modo de autenticação do registo de esquemas. Pode ser NONE, TLS ou OAUTH. A predefinição é: NENHUM.
  • schemaRegistryTruststoreLocation: localização do certificado SSL onde o repositório fidedigno para autenticação no Schema Registry está armazenado. Por exemplo, /your-bucket/truststore.jks.
  • schemaRegistryTruststorePasswordSecretId: SecretId no gestor de segredos onde a palavra-passe para aceder ao segredo no repositório de confiança está armazenada. Por exemplo, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeystoreLocation: localização do keystore que contém o certificado SSL e a chave privada. Por exemplo, /your-bucket/keystore.jks.
  • schemaRegistryKeystorePasswordSecretId: SecretId no Secret Manager onde se encontra a palavra-passe para aceder ao ficheiro keystore. Por exemplo, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeyPasswordSecretId: SecretId da palavra-passe necessária para aceder à chave privada do cliente armazenada no keystore. Por exemplo, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryOauthClientId: ID de cliente usado para autenticar o cliente do Schema Registry no modo OAUTH. Obrigatório para o formato de mensagem AVRO_CONFLUENT_WIRE_FORMAT.
  • schemaRegistryOauthClientSecretId: o ID do segredo do Google Cloud Secret Manager que contém o segredo do cliente a usar para autenticar o cliente do Schema Registry no modo OAUTH. Obrigatório para o formato de mensagem AVRO_CONFLUENT_WIRE_FORMAT. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaRegistryOauthScope: o âmbito do token de acesso usado para autenticar o cliente do Schema Registry no modo OAUTH. Este campo é opcional, uma vez que o pedido pode ser feito sem um parâmetro de âmbito transmitido. Por exemplo, openid.
  • schemaRegistryOauthTokenEndpointUrl: o URL baseado em HTTP(S) para o fornecedor de identidade OAuth/OIDC usado para autenticar o cliente do Schema Registry no modo OAUTH. Obrigatório para o formato de mensagem AVRO_CONFLUENT_WIRE_FORMAT.
  • outputDeadletterTable: nome da tabela do BigQuery totalmente qualificado para mensagens com falhas. As mensagens que não conseguiram alcançar a tabela de saída por diferentes motivos (por exemplo, esquema em conflito, JSON com formato incorreto) são escritas nesta tabela. A tabela é criada pelo modelo. Por exemplo, your-project-id:your-dataset.your-table-name.

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Kafka to Cloud Storage template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Opcional: para mudar do processamento exatamente uma vez para o modo de streaming pelo menos uma vez, selecione Pelo menos uma vez.
  8. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \
    --parameters \
readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
outputDirectory=gs://STORAGE_BUCKET_NAME,\
useBigQueryDLQ=false
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • BOOTSTRAP_SERVER_AND_TOPIC: o endereço do servidor de arranque e o tópico do Apache Kafka

    O formato do endereço do servidor de arranque e do tópico depende do tipo de cluster:

    • Cluster do Managed Service for Apache Kafka: projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Cluster Kafka externo: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • STORAGE_BUCKET_NAME: o contentor do Cloud Storage onde a saída é escrita

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
          "messageFormat": "JSON",
          "outputDirectory": "gs://STORAGE_BUCKET_NAME",
          "useBigQueryDLQ": "false"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex",
   }
}
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • BOOTSTRAP_SERVER_AND_TOPIC: o endereço do servidor de arranque e o tópico do Apache Kafka

    O formato do endereço do servidor de arranque e do tópico depende do tipo de cluster:

    • Cluster do Managed Service for Apache Kafka: projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Cluster Kafka externo: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • STORAGE_BUCKET_NAME: o contentor do Cloud Storage onde a saída é escrita

O que se segue?