O modelo Apache Kafka para Apache Kafka cria um pipeline de streaming que carrega dados como bytes a partir de uma origem do Apache Kafka e, em seguida, escreve os bytes num destino do Apache Kafka.
Requisitos do pipeline
- O tópico de origem do Apache Kafka tem de existir.
- Os servidores de agente de origem e destino do Apache Kafka têm de estar em execução e acessíveis a partir das máquinas de trabalho do Dataflow.
- Se estiver a usar o Google Cloud Managed Service para Apache Kafka como origem ou destino, o tópico tem de existir antes de iniciar o modelo.
Formato de mensagem do Kafka
As mensagens de origem do Apache Kafka são lidas como bytes e os bytes são escritos no destino do Apache Kafka.
Autenticação
O modelo Apache Kafka para Apache Kafka suporta a autenticação SASL/PLAIN e TLS para agentes Kafka.
Parâmetros de modelos
Parâmetros obrigatórios
- readBootstrapServerAndTopic: servidor de arranque e tópico do Kafka para ler a entrada. Por exemplo,
localhost:9092;topic1,topic2
. - kafkaReadAuthenticationMode: o modo de autenticação a usar com o cluster Kafka. Use
KafkaAuthenticationMethod.NONE
para sem autenticação,KafkaAuthenticationMethod.SASL_PLAIN
para nome de utilizador e palavra-passe SASL/PLAIN,KafkaAuthenticationMethod.SASL_SCRAM_512
para autenticação SASL_SCRAM_512 eKafkaAuthenticationMethod.TLS
para autenticação baseada em certificados.KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
deve ser usado apenas para o cluster do Google Cloud Apache Kafka para BigQuery. Permite a autenticação através das credenciais predefinidas da aplicação. - writeBootstrapServerAndTopic: tópico do Kafka para o qual escrever o resultado.
- kafkaWriteAuthenticationMethod: o modo de autenticação a usar com o cluster Kafka. Use NONE para nenhuma autenticação, SASL_PLAIN para nome de utilizador e palavra-passe SASL/PLAIN, SASL_SCRAM_512 para autenticação baseada em SASL_SCRAM_512 e TLS para autenticação baseada em certificado. A predefinição é: APPLICATION_DEFAULT_CREDENTIALS.
Parâmetros opcionais
- enableCommitOffsets: confirma os desvios das mensagens processadas no Kafka. Se estiver ativada, esta opção minimiza as lacunas ou o processamento duplicado de mensagens quando reinicia o pipeline. Requer a especificação do ID do grupo de consumidores. A predefinição é: false.
- consumerGroupId: o identificador exclusivo do grupo de consumidores ao qual esta conduta pertence. Obrigatório se a opção Commit Offsets to Kafka estiver ativada. A predefinição é vazio.
- kafkaReadOffset: o ponto de partida para ler mensagens quando não existem deslocamentos comprometidos. A mais antiga começa desde o início e a mais recente começa a partir da mensagem mais recente. A predefinição é: latest.
- kafkaReadUsernameSecretId: o ID secreto do Google Cloud Secret Manager que contém o nome de utilizador do Kafka a usar com a autenticação
SASL_PLAIN
. Por exemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. A predefinição é vazio. - kafkaReadPasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe do Kafka a usar com a autenticação
SASL_PLAIN
. Por exemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. A predefinição é vazio. - kafkaReadKeystoreLocation: o caminho do Google Cloud Storage para o ficheiro Java KeyStore (JKS) que contém o certificado TLS e a chave privada a usar na autenticação com o cluster Kafka. Por exemplo,
gs://your-bucket/keystore.jks
. - kafkaReadTruststoreLocation: o caminho do Google Cloud Storage para o ficheiro Java TrustStore (JKS) que contém os certificados fidedignos a usar para validar a identidade do agente Kafka.
- kafkaReadTruststorePasswordSecretId: o ID do segredo do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder ao ficheiro Java TrustStore (JKS) para autenticação TLS do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeystorePasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder ao ficheiro Java KeyStore (JKS) para a autenticação TLS do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeyPasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder à chave privada no ficheiro Java KeyStore (JKS) para autenticação TLS do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramUsernameSecretId: o ID secreto do Google Cloud Secret Manager que contém o nome de utilizador do Kafka a usar com a autenticação
SASL_SCRAM
. Por exemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramPasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe do Kafka a usar com a autenticação
SASL_SCRAM
. Por exemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramTruststoreLocation: o caminho do Google Cloud Storage para o ficheiro Java TrustStore (JKS) que contém os certificados fidedignos a usar para validar a identidade do agente Kafka.
- kafkaReadSaslScramTruststorePasswordSecretId: o ID do segredo do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder ao ficheiro Java TrustStore (JKS) para autenticação SASL_SCRAM do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaWriteUsernameSecretId: o ID secreto do Google Cloud Secret Manager que contém o nome de utilizador do Kafka para autenticação SASL_PLAIN com o cluster Kafka de destino. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. A predefinição é vazio. - kafkaWritePasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe do Kafka a usar para a autenticação SASL_PLAIN com o cluster Kafka de destino. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. A predefinição é vazio. - kafkaWriteKeystoreLocation: o caminho do Google Cloud Storage para o ficheiro Java KeyStore (JKS) que contém o certificado TLS e a chave privada para autenticação com o cluster Kafka de destino. Por exemplo,
gs://<BUCKET>/<KEYSTORE>.jks
. - kafkaWriteTruststoreLocation: o caminho do Google Cloud Storage para o ficheiro Java TrustStore (JKS) que contém os certificados fidedignos a usar para validar a identidade do agente Kafka de destino.
- kafkaWriteTruststorePasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder ao ficheiro Java TrustStore (JKS) para autenticação TLS com o cluster Kafka de destino. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaWriteKeystorePasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe para aceder ao ficheiro Java KeyStore (JKS) a usar para a autenticação TLS com o cluster Kafka de destino. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaWriteKeyPasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a palavra-passe a usar para aceder à chave privada no ficheiro Java KeyStore (JKS) para autenticação TLS com o cluster Kafka de destino. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
.
Execute o modelo
Consola
- Aceda à página do fluxo de dados Criar tarefa a partir de um modelo. Aceda a Criar tarefa a partir de modelo
- No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
- Opcional: para Ponto final regional, selecione um valor no menu pendente. A região
predefinida é
us-central1
.Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.
- No menu pendente Modelo do fluxo de dados, selecione the Kafka to Cloud Storage template.
- Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
- Opcional: para mudar do processamento exatamente uma vez para o modo de streaming pelo menos uma vez, selecione Pelo menos uma vez.
- Clique em Executar tarefa.
gcloud
Na shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \ --parameters \ readBootstrapServerAndTopic=READ_BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ writeBootstrapServerAndTopic=WRITE_BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaWriteAuthenticationMethod=APPLICATION_DEFAULT_CREDENTIALS
Substitua o seguinte:
PROJECT_ID
: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME
: um nome de tarefa exclusivo à sua escolhaREGION_NAME
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
READ_BOOTSTRAP_SERVER_AND_TOPIC
: o endereço do servidor de arranque do Apache Kafka e o tópico a partir do qual lerWRITE_BOOTSTRAP_SERVER_AND_TOPIC
: o endereço do servidor de arranque do Apache Kafka e o tópico para o qual escreverO formato do endereço do servidor de arranque e do tópico depende do tipo de cluster:
- Cluster do Managed Service for Apache Kafka:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Cluster Kafka externo:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster do Managed Service for Apache Kafka:
API
Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "READ_BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "writeBootstrapServerAndTopic": "WRITE_BOOTSTRAP_SERVER_AND_TOPIC", "kafkaWriteAuthenticationMethod": "APPLICATION_DEFAULT_CREDENTIALS }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka", } }
Substitua o seguinte:
PROJECT_ID
: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME
: um nome de tarefa exclusivo à sua escolhaLOCATION
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
READ_BOOTSTRAP_SERVER_AND_TOPIC
: o endereço do servidor de arranque do Apache Kafka e o tópico a partir do qual lerWRITE_BOOTSTRAP_SERVER_AND_TOPIC
: o endereço do servidor de arranque do Apache Kafka e o tópico para o qual escreverO formato do endereço do servidor de arranque e do tópico depende do tipo de cluster:
- Cluster do Managed Service for Apache Kafka:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Cluster Kafka externo:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster do Managed Service for Apache Kafka:
O que se segue?
- Saiba mais sobre os modelos do Dataflow.
- Consulte a lista de modelos fornecidos pela Google.