O modelo Apache Kafka para Cloud Storage é um pipeline de streaming que ingere dados de texto do Google Cloud Managed Service para Apache Kafka e envia os registros para o Cloud Storage.
Também é possível usar o modelo Apache Kafka para BigQuery com o Kafka autogerenciado ou externo.
Requisitos de pipeline
- O bucket de saída do Cloud Storage precisa existir.
- O servidor do agente do Apache Kafka precisa estar em execução e acessível em máquinas de trabalho do Dataflow.
- Os tópicos do Apache Kafka precisam existir.
Formato de mensagem do Kafka
O modelo do Apache Kafka para Cloud Storage oferece suporte à leitura de mensagens do Kafka nos seguintes formatos: CONFLUENT_AVRO_WIRE_FORMAT
e JSON
.
Formato de arquivo de saída
O formato do arquivo de saída é igual à mensagem de entrada do Kafka. Por exemplo, se você selecionar JSON para o formato de mensagem do Kafka, os arquivos JSON serão gravados no bucket de saída do Cloud Storage.
Autenticação
O modelo Apache Kafka para Cloud Storage oferece suporte à autenticação SASL/PLAIN para agentes Kafka.
Parâmetros do modelo
Parâmetros obrigatórios
- readBootstrapServerAndTopic: o tópico do Kafka a ser usado para ler a entrada.
- outputDirectory: o caminho e o prefixo do nome do arquivo para gravar arquivos de saída. Precisa terminar com uma barra. Exemplo: gs://your-bucket/your-path/.
- kafkaReadAuthenticationMode: o modo de autenticação a ser usado com o cluster do Kafka. Use NONE para nenhuma autenticação, SASL_PLAIN para o nome de usuário e senha SASL/PLAIN e TLS para autenticação com base no certificado. APPLICATION_DEFAULT_CREDENTIALS só pode ser usado para o cluster do Apache Kafka para BigQuery do Google Cloud, porque permite a autenticação com o Apache Kafka para BigQuery do Google Cloud usando as credenciais padrão do aplicativo.
- messageFormat: o formato das mensagens do Kafka a serem lidas. Os valores aceitos são AVRO_CONFLUENT_WIRE_FORMAT (Avro codificado do Confluent Schema Registry), AVRO_BINARY_ENCODING (Avro binário simples) e JSON. O padrão é: AVRO_CONFLUENT_WIRE_FORMAT.
- useBigQueryDLQ: se verdadeiro, as mensagens com falha serão gravadas no BigQuery com informações extras sobre o erro. O padrão é: falso.
Parâmetros opcionais
- windowDuration: a duração/tamanho da janela em que os dados serão gravados no Cloud Storage. Os formatos permitidos são: Ns (para segundos, exemplo: "5s"), Nm (para minutos, exemplo: "12m") e Nh (para horas, exemplo: "2h"). (Exemplo: 5 min). O padrão é de 5 minutos.
- outputFilenamePrefix: o prefixo a ser colocado em cada arquivo em janela. (Exemplo: output-). O padrão é: saída.
- numShards: o número máximo de fragmentos de saída produzidos durante a gravação. Um número maior de fragmentos significa maior capacidade de gravação no Cloud Storage, mas um custo de agregação de dados potencialmente maior entre os fragmentos ao processar os arquivos de saída do Cloud Storage. O valor padrão é decidido pelo Dataflow.
- enableCommitOffsets: deslocamentos de confirmação de mensagens processadas para o Kafka. Se ativado, isso minimizará as lacunas ou o processamento duplicado de mensagens ao reiniciar o pipeline. Exige especificar o ID do grupo de consumidores. O padrão é: falso.
- consumerGroupId: o identificador exclusivo do grupo de consumidores ao qual esse pipeline pertence. Obrigatório se os deslocamentos de confirmação para Kafka estiverem ativados. O padrão é vazio.
- kafkaReadOffset: o ponto de partida para ler mensagens quando não há deslocamentos confirmados. O mais antigo começa no início, o mais recente a partir da mensagem mais recente. O padrão é: mais recente.
- kafkaReadUsernameSecretId: o ID do secret do Secret Manager do Google Cloud que contém o nome de usuário do Kafka a ser usado com a autenticação SASL_PLAIN. (Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). O padrão é vazio.
- kafkaReadPasswordSecretId: o ID do secret do Secret Manager do Google Cloud que contém a senha do Kafka a ser usada com a autenticação SASL_PLAIN. (Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). O padrão é vazio.
- kafkaReadKeystoreLocation: o caminho do Google Cloud Storage para o arquivo Java KeyStore (JKS) que contém o certificado TLS e a chave privada a serem usados na autenticação com o cluster do Kafka. Exemplo: gs://your-bucket/keystore.jks.
- kafkaReadTruststoreLocation: o caminho do Google Cloud Storage para o arquivo Java TrustStore (JKS) que contém os certificados confiáveis a serem usados para verificar a identidade do agente Kafka.
- kafkaReadTruststorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha a ser usada para acessar o arquivo Java TrustStore (JKS) para autenticação TLS do Kafka (exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
- kafkaReadKeystorePasswordSecretId: o ID secreto do Google Cloud Secret Manager que contém a senha usada para acessar o arquivo Java KeyStore (JKS) para a autenticação TLS do Kafka. Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
- kafkaReadKeyPasswordSecretId: o ID do secret do Secret Manager do Google Cloud que contém a senha usada para acessar a chave privada no arquivo Java KeyStore (JKS) para a autenticação TLS do Kafka. Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
- schemaFormat: o formato de esquema do Kafka. Pode ser fornecido como SINGLE_SCHEMA_FILE ou SCHEMA_REGISTRY. Se SINGLE_SCHEMA_FILE for especificado, todas as mensagens vão precisar ter o esquema mencionado no arquivo de esquema avro. Se SCHEMA_REGISTRY for especificado, as mensagens poderão ter um único esquema ou vários esquemas. O padrão é SINGLE_SCHEMA_FILE.
- confluentAvroSchemaPath: o caminho do Google Cloud Storage para o único arquivo de esquema Avro usado para decodificar todas as mensagens em um tópico. O padrão é vazio.
- schemaRegistryConnectionUrl: o URL da instância do Confluent Schema Registry usada para gerenciar esquemas Avro para decodificação de mensagens. O padrão é vazio.
- binaryAvroSchemaPath: o caminho do Google Cloud Storage para o arquivo de esquema do Avro usado para decodificar mensagens Avro codificadas em binário. O padrão é vazio.
- schemaRegistryAuthenticationMode : modo de autenticação do Schema Registry. Pode ser NONE, TLS ou OAUTH. O padrão é: NENHUM.
- schemaRegistryTruststoreLocation : local do certificado SSL em que o repositório de confiança para autenticação no Schema Registry é armazenado. Exemplo: /your-bucket/truststore.jks.
- schemaRegistryTruststorePasswordSecretId : o SecretId no Secret Manager, onde a senha para acessar o secret no truststore é armazenada. Exemplo: projects/your-project-id/secrets/your-secret-name/versions/your-secret-version.
- schemaRegistryKeystoreLocation : local do keystore que contém o certificado SSL e a chave privada. Exemplo: /your-bucket/keystore.jks.
- schemaRegistryKeystorePasswordSecretId : o SecretId no Secret Manager, em que a senha para acessar o arquivo de chaves (exemplo: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version).
- schemaRegistryKeyPasswordSecretId : o SecretId da senha necessária para acessar a chave privada do cliente armazenada no keystore. Por exemplo: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
- schemaRegistryOauthClientId : o ID do cliente usado para autenticar o cliente do Schema Registry no modo OAUTH. Obrigatório para o formato de mensagem AVRO_CONFLUENT_WIRE_FORMAT.
- schemaRegistryOauthClientSecretId : o ID do secret do Secret Manager do Google Cloud que contém a chave secreta do cliente a ser usada para autenticar o cliente do Schema Registry no modo OAUTH. Obrigatório para o formato de mensagem AVRO_CONFLUENT_WIRE_FORMAT. Exemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
- schemaRegistryOauthScope : o escopo do token de acesso usado para autenticar o cliente do Schema Registry no modo OAUTH. Esse campo é opcional, já que a solicitação pode ser feita sem um parâmetro de escopo transmitido. (Exemplo: openid).
- schemaRegistryOauthTokenEndpointUrl : o URL baseado em HTTP(S) do provedor de identidade OAuth/OIDC usado para autenticar o cliente do Schema Registry no modo OAUTH. Obrigatório para o formato de mensagem AVRO_CONFLUENT_WIRE_FORMAT.
- outputDeadletterTable : nome totalmente qualificado da tabela do BigQuery para mensagens com falha. As mensagens que não chegam à tabela de saída por diferentes motivos (por exemplo, esquema incompatível, json incorreto) são gravadas nesta tabela. A tabela será criada pelo modelo. (Exemplo: id-do-projeto:seu-conjunto-de-dados.nome-da-tabela).
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Kafka to Cloud Storage template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Opcional: para alternar do processamento "Exatamente uma vez" para o modo de streaming "Pelo menos uma vez", selecione Pelo menos uma vez.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaREGION_NAME
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: nome da tabela do Cloud StorageKAFKA_TOPICS
: a lista de tópicos do Apache Kakfa. Se vários tópicos forem fornecidos, use vírgulas. Consultegcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: O URI do Cloud Storage do arquivo.js
que define a função definida pelo usuário (UDF) do JavaScript que você quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usarPor exemplo, se o código de função do JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função serámyTransform
. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.KAFKA_SERVER_ADDRESSES
: a lista de endereços IP do servidor de gerenciamento do Apache Kafka. Cada endereço IP precisa ter o número da porta que o servidor pode acessar. Por exemplo,35.70.252.199:9092
. Se vários endereços forem fornecidos, use vírgulas para separá-los. Consultegcloud topic escaping
.
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex", } }
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaLOCATION
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: nome da tabela do Cloud StorageKAFKA_TOPICS
: a lista de tópicos do Apache Kakfa. Se vários tópicos forem fornecidos, use vírgulas. Consultegcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: O URI do Cloud Storage do arquivo.js
que define a função definida pelo usuário (UDF) do JavaScript que você quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usarPor exemplo, se o código de função do JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função serámyTransform
. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.KAFKA_SERVER_ADDRESSES
: a lista de endereços IP do servidor de gerenciamento do Apache Kafka. Cada endereço IP precisa ter o número da porta que o servidor pode acessar. Por exemplo,35.70.252.199:9092
. Se vários endereços forem fornecidos, use vírgulas para separá-los. Consultegcloud topic escaping
.
Para mais informações, consulte Gravar dados do Kafka no Cloud Storage com Dataflow.
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.