O modelo Apache Kafka para Cloud Storage é um pipeline de streaming que carrega dados de texto do Google Cloud Managed Service para Apache Kafka e envia os registos para o Cloud Storage.
Também pode usar o modelo Apache Kafka para Cloud Storage com o Kafka autogerido ou externo.
Requisitos do pipeline
- O contentor do Cloud Storage de saída tem de existir.
- O servidor do agente do Apache Kafka tem de estar em execução e acessível a partir das máquinas de trabalho do Dataflow.
- Os tópicos do Apache Kafka têm de existir.
Formato de mensagem do Kafka
Este modelo suporta a leitura de mensagens do Kafka nos seguintes formatos:
Formato JSON
Para ler mensagens JSON, defina o parâmetro de modelo messageFormat
como "JSON"
.
Codificação binária Avro
Para ler mensagens Avro binárias, defina os seguintes parâmetros de modelo:
messageFormat
:"AVRO_BINARY_ENCODING"
.binaryAvroSchemaPath
: a localização de um ficheiro de esquema Avro no Cloud Storage. Exemplo:gs://BUCKET_NAME/message-schema.avsc
.
Para mais informações sobre o formato binário Avro, consulte a secção Codificação binária na documentação do Apache Avro.
Avro codificado no registo de esquemas da Confluent
Para ler mensagens em Avro codificado no Confluent Schema Registry, defina os seguintes parâmetros do modelo:
messageFormat
:"AVRO_CONFLUENT_WIRE_FORMAT"
.schemaFormat
: um dos seguintes valores:"SINGLE_SCHEMA_FILE"
: o esquema de mensagens está definido num ficheiro de esquema Avro. Especifique a localização do ficheiro de esquema no Cloud Storage no parâmetroconfluentAvroSchemaPath
.-
"SCHEMA_REGISTRY"
: as mensagens são codificadas através do Confluent Schema Registry. Especifique o URL da instância do Confluent Schema Registry no parâmetroschemaRegistryConnectionUrl
e especifique o modo de autenticação no parâmetroschemaRegistryAuthenticationMode
.
Para mais informações sobre este formato, consulte Formato de transmissão na documentação da Confluent.
Formato do ficheiro de saída
O formato do ficheiro de saída é o mesmo formato da mensagem Kafka de entrada. Por exemplo, se selecionar JSON para o formato de mensagem do Kafka, os ficheiros JSON são escritos no contentor do Cloud Storage de saída.
Autenticação
O modelo Apache Kafka para Cloud Storage suporta a autenticação SASL/PLAIN para agentes Kafka.
Parâmetros de modelos
Parâmetros obrigatórios
- readBootstrapServerAndTopic: tópico do Kafka a partir do qual ler a entrada.
- outputDirectory: o caminho e o prefixo do nome de ficheiro para escrever ficheiros de saída. Tem de terminar com uma barra. Por exemplo,
gs://your-bucket/your-path/
. - kafkaReadAuthenticationMode: o modo de autenticação a usar com o cluster Kafka. Use
KafkaAuthenticationMethod.NONE
para sem autenticação,KafkaAuthenticationMethod.SASL_PLAIN
para nome de utilizador e palavra-passe SASL/PLAIN,KafkaAuthenticationMethod.SASL_SCRAM_512
para autenticação SASL_SCRAM_512 eKafkaAuthenticationMethod.TLS
para autenticação baseada em certificados.KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
deve ser usado apenas para o cluster do Google Cloud Apache Kafka para BigQuery. Permite a autenticação através das credenciais predefinidas da aplicação. - messageFormat: o formato das mensagens Kafka a ler. Os valores suportados são
AVRO_CONFLUENT_WIRE_FORMAT
(Avro codificado no registo de esquemas da Confluent),AVRO_BINARY_ENCODING
(Avro binário simples) eJSON
. A predefinição é: AVRO_CONFLUENT_WIRE_FORMAT. - useBigQueryDLQ: se for verdadeiro, as mensagens com falhas são escritas no BigQuery com informações de erro adicionais. A predefinição é: false.
Parâmetros opcionais
- windowDuration: a duração/tamanho da janela em que os dados vão ser escritos no Cloud Storage. Os formatos permitidos são: Ns (para segundos, exemplo: 5s), Nm (para minutos, exemplo: 12m), Nh (para horas, exemplo: 2h). Por exemplo,
5m
. A predefinição é: 5 m. - outputFilenamePrefix: o prefixo a colocar em cada ficheiro dividido em janelas. Por exemplo,
output-
. A predefinição é: output. - numShards: o número máximo de fragmentos de saída produzidos durante a escrita. Um número mais elevado de fragmentos significa um débito mais elevado para a escrita no Cloud Storage, mas um custo de agregação de dados potencialmente mais elevado entre fragmentos ao processar ficheiros do Cloud Storage de saída. O valor predefinido é decidido pelo Dataflow.
- enableCommitOffsets: confirma os desvios das mensagens processadas no Kafka. Se estiver ativada, esta opção minimiza as lacunas ou o processamento duplicado de mensagens quando reinicia o pipeline. Requer a especificação do ID do grupo de consumidores. A predefinição é: false.
- consumerGroupId: o identificador exclusivo do grupo de consumidores ao qual esta conduta pertence. Obrigatório se a opção Commit Offsets to Kafka estiver ativada. A predefinição é vazio.
- kafkaReadOffset: o ponto de partida para ler mensagens quando não existem deslocamentos comprometidos. A mais antiga começa desde o início e a mais recente começa a partir da mensagem mais recente. A predefinição é: latest.
- kafkaReadUsernameSecretId: o ID secreto do Google Cloud Secret Manager que contém o nome de utilizador do Kafka a usar com a autenticação
SASL_PLAIN
. Por exemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. A predefinição é vazio. - kafkaReadPasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe do Kafka a usar com a autenticação
SASL_PLAIN
. Por exemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. A predefinição é vazio. - kafkaReadKeystoreLocation: o caminho do Google Cloud Storage para o ficheiro Java KeyStore (JKS) que contém o certificado TLS e a chave privada a usar na autenticação com o cluster Kafka. Por exemplo,
gs://your-bucket/keystore.jks
. - kafkaReadTruststoreLocation: o caminho do Google Cloud Storage para o ficheiro Java TrustStore (JKS) que contém os certificados fidedignos a usar para validar a identidade do agente Kafka.
- kafkaReadTruststorePasswordSecretId: o ID do segredo do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder ao ficheiro Java TrustStore (JKS) para autenticação TLS do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeystorePasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder ao ficheiro Java KeyStore (JKS) para a autenticação TLS do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeyPasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder à chave privada no ficheiro Java KeyStore (JKS) para autenticação TLS do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramUsernameSecretId: o ID secreto do Google Cloud Secret Manager que contém o nome de utilizador do Kafka a usar com a autenticação
SASL_SCRAM
. Por exemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramPasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe do Kafka a usar com a autenticação
SASL_SCRAM
. Por exemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramTruststoreLocation: o caminho do Google Cloud Storage para o ficheiro Java TrustStore (JKS) que contém os certificados fidedignos a usar para validar a identidade do agente Kafka.
- kafkaReadSaslScramTruststorePasswordSecretId: o ID do segredo do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder ao ficheiro Java TrustStore (JKS) para autenticação SASL_SCRAM do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - schemaFormat: o formato do esquema do Kafka. Pode ser fornecido como
SINGLE_SCHEMA_FILE
ouSCHEMA_REGISTRY
. SeSINGLE_SCHEMA_FILE
for especificado, use o esquema mencionado no ficheiro de esquema avro para todas as mensagens. SeSCHEMA_REGISTRY
for especificado, as mensagens podem ter um único esquema ou vários esquemas. A predefinição é: SINGLE_SCHEMA_FILE. - confluentAvroSchemaPath: o caminho do Google Cloud Storage para o ficheiro de esquema Avro único usado para descodificar todas as mensagens num tópico. A predefinição é vazio.
- schemaRegistryConnectionUrl: o URL da instância do Confluent Schema Registry usado para gerir esquemas Avro para descodificação de mensagens. A predefinição é vazio.
- binaryAvroSchemaPath: o caminho do Google Cloud Storage para o ficheiro de esquema Avro usado para descodificar mensagens Avro codificadas em binário. A predefinição é vazio.
- schemaRegistryAuthenticationMode: modo de autenticação do registo de esquemas. Pode ser NONE, TLS ou OAUTH. A predefinição é: NENHUM.
- schemaRegistryTruststoreLocation: localização do certificado SSL onde o repositório fidedigno para autenticação no Schema Registry está armazenado. Por exemplo,
/your-bucket/truststore.jks
. - schemaRegistryTruststorePasswordSecretId: SecretId no gestor de segredos onde a palavra-passe para aceder ao segredo no repositório de confiança está armazenada. Por exemplo,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryKeystoreLocation: localização do keystore que contém o certificado SSL e a chave privada. Por exemplo,
/your-bucket/keystore.jks
. - schemaRegistryKeystorePasswordSecretId: SecretId no Secret Manager onde se encontra a palavra-passe para aceder ao ficheiro keystore. Por exemplo,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryKeyPasswordSecretId: SecretId da palavra-passe necessária para aceder à chave privada do cliente armazenada no keystore. Por exemplo,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryOauthClientId: ID de cliente usado para autenticar o cliente do Schema Registry no modo OAUTH. Obrigatório para o formato de mensagem AVRO_CONFLUENT_WIRE_FORMAT.
- schemaRegistryOauthClientSecretId: o ID do segredo do Google Cloud Secret Manager que contém o segredo do cliente a usar para autenticar o cliente do Schema Registry no modo OAUTH. Obrigatório para o formato de mensagem AVRO_CONFLUENT_WIRE_FORMAT. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - schemaRegistryOauthScope: o âmbito do token de acesso usado para autenticar o cliente do Schema Registry no modo OAUTH. Este campo é opcional, uma vez que o pedido pode ser feito sem um parâmetro de âmbito transmitido. Por exemplo,
openid
. - schemaRegistryOauthTokenEndpointUrl: o URL baseado em HTTP(S) para o fornecedor de identidade OAuth/OIDC usado para autenticar o cliente do Schema Registry no modo OAUTH. Obrigatório para o formato de mensagem AVRO_CONFLUENT_WIRE_FORMAT.
- outputDeadletterTable: nome da tabela do BigQuery totalmente qualificado para mensagens com falhas. As mensagens que não conseguiram alcançar a tabela de saída por diferentes motivos (por exemplo, esquema em conflito, JSON com formato incorreto) são escritas nesta tabela. A tabela é criada pelo modelo. Por exemplo,
your-project-id:your-dataset.your-table-name
.
Execute o modelo
Consola
- Aceda à página do fluxo de dados Criar tarefa a partir de um modelo. Aceda a Criar tarefa a partir de modelo
- No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
- Opcional: para Ponto final regional, selecione um valor no menu pendente. A região
predefinida é
us-central1
.Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.
- No menu pendente Modelo do fluxo de dados, selecione the Kafka to Cloud Storage template.
- Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
- Opcional: para mudar do processamento exatamente uma vez para o modo de streaming pelo menos uma vez, selecione Pelo menos uma vez.
- Clique em Executar tarefa.
gcloud
Na shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \ --parameters \ readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputDirectory=gs://STORAGE_BUCKET_NAME,\ useBigQueryDLQ=false
Substitua o seguinte:
PROJECT_ID
: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME
: um nome de tarefa exclusivo à sua escolhaREGION_NAME
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
BOOTSTRAP_SERVER_AND_TOPIC
: o endereço do servidor de arranque e o tópico do Apache KafkaO formato do endereço do servidor de arranque e do tópico depende do tipo de cluster:
- Cluster do Managed Service for Apache Kafka:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Cluster Kafka externo:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster do Managed Service for Apache Kafka:
STORAGE_BUCKET_NAME
: o contentor do Cloud Storage onde a saída é escrita
API
Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "messageFormat": "JSON", "outputDirectory": "gs://STORAGE_BUCKET_NAME", "useBigQueryDLQ": "false" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex", } }
Substitua o seguinte:
PROJECT_ID
: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME
: um nome de tarefa exclusivo à sua escolhaLOCATION
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
BOOTSTRAP_SERVER_AND_TOPIC
: o endereço do servidor de arranque e o tópico do Apache KafkaO formato do endereço do servidor de arranque e do tópico depende do tipo de cluster:
- Cluster do Managed Service for Apache Kafka:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Cluster Kafka externo:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster do Managed Service for Apache Kafka:
STORAGE_BUCKET_NAME
: o contentor do Cloud Storage onde a saída é escrita
O que se segue?
- Saiba mais sobre os modelos do Dataflow.
- Consulte a lista de modelos fornecidos pela Google.