O modelo Apache Kafka para Apache Kafka cria um pipeline de streaming que ingere dados como bytes de uma origem do Apache Kafka e, em seguida, grava os bytes em um coletor do Apache Kafka.
Requisitos de pipeline
- O tópico de origem do Apache Kafka precisa existir.
- Os servidores do agente de origem e coletor do Apache Kafka precisam estar em execução e poder ser acessados nas máquinas de worker do Dataflow.
- Se você estiver usando o Google Cloud Managed Service para Apache Kafka como origem ou coletor, o tópico precisará existir antes de iniciar o modelo.
Formato de mensagem do Kafka
As mensagens de origem do Apache Kafka são lidas como bytes, e os bytes são gravados no coletor do Apache Kafka.
Autenticação
O modelo Apache Kafka para Apache Kafka aceita a autenticação SASL/PLAIN e TLS nos agentes do Kafka.
Parâmetros do modelo
Parâmetros obrigatórios
- readBootstrapServerAndTopic : o servidor e o tópico de inicialização do Kafka em que a entrada será lida. (Exemplo: localhost:9092;topic1,topic2).
- kafkaReadAuthenticationMode: o modo de autenticação a ser usado com o cluster do Kafka. Use NONE para nenhuma autenticação, SASL_PLAIN para o nome de usuário e senha SASL/PLAIN e TLS para autenticação com base no certificado. APPLICATION_DEFAULT_CREDENTIALS só pode ser usado para o cluster do Apache Kafka para BigQuery do Google Cloud, porque permite a autenticação com o Apache Kafka para BigQuery do Google Cloud usando as credenciais padrão do aplicativo.
- writeBootstrapServerAndTopic: tópico do Kafka em que a saída será gravada.
- kafkaWriteAuthenticationMethod: o modo de autenticação a ser usado com o cluster do Kafka. Use NONE para nenhuma autenticação ou SASL_PLAIN para o nome de usuário e senha SASL/PLAIN e TLS para autenticação com base no certificado. O padrão é: APPLICATION_DEFAULT_CREDENTIALS.
Parâmetros opcionais
- enableCommitOffsets: deslocamentos de confirmação de mensagens processadas para o Kafka. Se ativado, isso minimizará as lacunas ou o processamento duplicado de mensagens ao reiniciar o pipeline. Exige especificar o ID do grupo de consumidores. O padrão é: falso.
- consumerGroupId: o identificador exclusivo do grupo de consumidores ao qual esse pipeline pertence. Obrigatório se os deslocamentos de confirmação para Kafka estiverem ativados. O padrão é vazio.
- kafkaReadOffset: o ponto de partida para ler mensagens quando não há deslocamentos confirmados. O mais antigo começa no início, o mais recente a partir da mensagem mais recente. O padrão é: mais recente.
- kafkaReadUsernameSecretId: o ID do secret do Secret Manager do Google Cloud que contém o nome de usuário do Kafka a ser usado com a autenticação SASL_PLAIN. (Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). O padrão é vazio.
- kafkaReadPasswordSecretId: o ID do secret do Secret Manager do Google Cloud que contém a senha do Kafka a ser usada com a autenticação SASL_PLAIN. (Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). O padrão é vazio.
- kafkaReadKeystoreLocation: o caminho do Google Cloud Storage para o arquivo Java KeyStore (JKS) que contém o certificado TLS e a chave privada a serem usados na autenticação com o cluster do Kafka. Exemplo: gs://your-bucket/keystore.jks.
- kafkaReadTruststoreLocation: o caminho do Google Cloud Storage para o arquivo Java TrustStore (JKS) que contém os certificados confiáveis a serem usados para verificar a identidade do agente Kafka.
- kafkaReadTruststorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha a ser usada para acessar o arquivo Java TrustStore (JKS) para autenticação TLS do Kafka (exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
- kafkaReadKeystorePasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a senha usada para acessar o arquivo Java KeyStore (JKS) para a autenticação TLS do Kafka. Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
- kafkaReadKeyPasswordSecretId: o ID do secret do Secret Manager do Google Cloud que contém a senha usada para acessar a chave privada no arquivo Java KeyStore (JKS) para a autenticação TLS do Kafka. Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
- kafkaWriteUsernameSecretId: o ID do secret do Secret Manager do Google Cloud que contém o nome de usuário do Kafka para autenticação SASL_PLAIN com o cluster do Kafka de destino. (Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). O padrão é vazio.
- kafkaWritePasswordSecretId: o ID do secret do Secret Manager do Google Cloud que contém a senha do Kafka a ser usada para a autenticação SASL_PLAIN com o cluster do Kafka de destino. (Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). O padrão é vazio.
- kafkaWriteKeystoreLocation: o caminho do Google Cloud Storage para o arquivo Java KeyStore (JKS) que contém o certificado TLS e a chave privada para autenticação com o cluster Kafka de destino. (Exemplo: gs://
- kafkaWriteTruststoreLocation: o caminho do Google Cloud Storage para o arquivo Java TrustStore (JKS) que contém os certificados confiáveis a serem usados para verificar a identidade do agente Kafka de destino.
- kafkaWriteTruststorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha usada para acessar o arquivo Java TrustStore (JKS) para autenticação TLS com o cluster do Kafka de destino. Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
- kafkaWriteKeystorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha para acessar o arquivo Java KeyStore (JKS) a ser usado para a autenticação TLS com o cluster Kafka de destino. Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
- kafkaWriteKeyPasswordSecretId: o ID do secret do Secret Manager do Google Cloud que contém a senha usada para acessar a chave privada no arquivo Java KeyStore (JKS) para autenticação TLS com o cluster de destino do Kafka. Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Kafka to Cloud Storage template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Opcional: para alternar do processamento "Exatamente uma vez" para o modo de streaming "Pelo menos uma vez", selecione Pelo menos uma vez.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaREGION_NAME
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: nome da tabela do Cloud StorageKAFKA_TOPICS
: a lista de tópicos do Apache Kakfa. Se vários tópicos forem fornecidos, use vírgulas. Consultegcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: O URI do Cloud Storage do arquivo.js
que define a função definida pelo usuário (UDF) do JavaScript que você quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usarPor exemplo, se o código de função do JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função serámyTransform
. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.KAFKA_SERVER_ADDRESSES
: a lista de endereços IP do servidor de gerenciamento do Apache Kafka. Cada endereço IP precisa ter o número da porta que o servidor pode acessar. Por exemplo,35.70.252.199:9092
. Se vários endereços forem fornecidos, use vírgulas para separá-los. Consultegcloud topic escaping
.
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka", } }
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaLOCATION
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: nome da tabela do Cloud StorageKAFKA_TOPICS
: a lista de tópicos do Apache Kakfa. Se vários tópicos forem fornecidos, use vírgulas. Consultegcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: O URI do Cloud Storage do arquivo.js
que define a função definida pelo usuário (UDF) do JavaScript que você quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usarPor exemplo, se o código de função do JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função serámyTransform
. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.KAFKA_SERVER_ADDRESSES
: a lista de endereços IP do servidor de gerenciamento do Apache Kafka. Cada endereço IP precisa ter o número da porta que o servidor pode acessar. Por exemplo,35.70.252.199:9092
. Se vários endereços forem fornecidos, use vírgulas para separá-los. Consultegcloud topic escaping
.
Para mais informações, consulte Gravar dados do Kafka no Cloud Storage com Dataflow.
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.