Esta página descreve como usar o Google Cloud Managed Service para Apache Kafka como origem ou coletor em um pipeline do Dataflow.
Use uma das seguintes abordagens:
Requisitos
Ative as APIs Cloud Storage, Dataflow e Managed Service for Apache Kafka no seu projeto. Consulte Como ativar APIs ou execute o seguinte comando da Google Cloud CLI:
gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
A conta de serviço do worker do Dataflow precisa ter o papel do Identity and Access Management (IAM) do cliente gerenciado do Kafka (
roles/managedkafka.client
).As VMs de worker do Dataflow precisam ter acesso à rede do servidor de bootstrap do Kafka. Para mais informações, consulte Configurar a rede do Serviço gerenciado para Apache Kafka.
Receber o endereço do servidor de inicialização
Para executar um pipeline que se conecta a um cluster do Managed Service para Apache Kafka, primeiro receba o endereço do servidor de inicialização do cluster. Você precisa desse endereço ao configurar o pipeline.
Use o console Google Cloud ou a Google Cloud CLI da seguinte maneira:
Console
No console Google Cloud , acesse a página Clusters.
Clique no nome do cluster.
Clique na guia Configurações.
Copie o endereço do servidor de inicialização em URL de inicialização.
gcloud
Use o comando managed-kafka clusters describe
.
gcloud managed-kafka clusters describe CLUSTER_ID \
--location=LOCATION \
--format="value(bootstrapAddress)"
Substitua:
- CLUSTER_ID: o ID ou nome do cluster.
- LOCATION: o local do cluster
Para mais informações, consulte Ver um cluster do serviço gerenciado para Apache Kafka.
Usar o serviço gerenciado para Apache Kafka com um modelo do Dataflow
O Google oferece vários modelos do Dataflow que leem do Apache Kafka:
Esses modelos podem ser usados com o Managed Service para Apache Kafka. Se um deles corresponder ao seu caso de uso, considere usá-lo em vez de escrever um código de pipeline personalizado.
Console
Acesse a página Dataflow > Jobs.
Clique em Criar job usando um modelo.
Em Nome do job, insira um nome.
No menu suspenso Modelo do Dataflow, selecione o modelo que você quer executar.
Na caixa Servidor de inicialização do Kafka, digite o endereço do servidor de inicialização.
Na caixa Tópico do Kafka, insira o nome do tópico.
Em Modo de autenticação do Kafka, selecione APPLICATION_DEFAULT_CREDENTIALS.
Em Formato de mensagem do Kafka, selecione o formato das mensagens do Apache Kafka.
Insira outros parâmetros conforme necessário. Os parâmetros aceitos são documentados para cada modelo.
Executar job.
gcloud
Use o comando
gcloud dataflow jobs run
.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://TEMPLATE_FILE \
--region REGION_NAME \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...
Substitua:
- JOB_NAME: um nome para o job
- TEMPLATE_FILE: o local do arquivo de modelo no Cloud Storage
- REGION_NAME: a região em que você quer implantar o job
- PROJECT_NAME: o nome do seu Google Cloud projeto
- LOCATION: o local do cluster
- CLUSTER_ID: o ID ou nome do cluster.
- TOPIC: o nome do tópico do Kafka
Usar o serviço gerenciado para Apache Kafka com um pipeline do Beam
Esta seção descreve como usar o SDK do Apache Beam para criar e executar um pipeline do Dataflow que se conecta ao Managed Service para Apache Kafka.
Na maioria dos cenários, use a
transformação de E/S gerenciada como sua
origem ou destino do Kafka. Se você precisar de um ajuste de performance mais avançado, use o conector KafkaIO
.
Para mais informações sobre os benefícios de usar a E/S gerenciada, consulte
E/S gerenciada do Dataflow.
Requisitos
Cliente Kafka versão 3.6.0 ou mais recente.
SDK do Apache Beam versão 2.61.0 ou mais recente.
A máquina em que você inicia o job do Dataflow precisa ter acesso à rede do servidor de inicialização do Apache Kafka. Por exemplo, inicie o job em uma instância do Compute Engine que possa acessar a VPC em que o cluster está acessível.
O principal que cria o job precisa ter os seguintes papéis do IAM:
- Cliente Kafka gerenciado (
roles/managedkafka.client
) para acessar o cluster do Apache Kafka. - Usuário da conta de serviço (
roles/iam.serviceAccountUser
) para atuar como a conta de serviço do worker do Dataflow. - Administrador do Storage (
roles/storage.admin
) para fazer upload de arquivos de jobs no Cloud Storage. - Administrador do Dataflow (
roles/dataflow.admin
) para criar o job.
Se você iniciar o job em uma instância do Compute Engine, poderá conceder esses papéis a uma conta de serviço anexada à VM. Para mais informações, consulte Criar uma VM que usa uma conta serviço gerenciado pelo usuário.
Também é possível usar Application Default Credentials (ADC) com a representação de conta de serviço ao criar o job.
- Cliente Kafka gerenciado (
Configurar E/S gerenciada
Se o pipeline usar o Managed I/O para Apache Kafka, defina as seguintes opções de configuração para autenticar com o Serviço gerenciado para Apache Kafka:
"security.protocol"
:"SASL_SSL"
"sasl.mechanism"
:"OAUTHBEARER"
"sasl.login.callback.handler.class"
:"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
"sasl.jaas.config"
:"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
Os exemplos a seguir mostram como configurar o Managed I/O para o Managed Service para Apache Kafka:
Java
// Create configuration parameters for the Managed I/O transform.
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("bootstrap_servers", options.getBootstrapServer())
.put("topic", options.getTopic())
.put("data_format", "RAW")
// Set the following fields to authenticate with Application Default
// Credentials (ADC):
.put("security.protocol", "SASL_SSL")
.put("sasl.mechanism", "OAUTHBEARER")
.put("sasl.login.callback.handler.class",
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
.build();
Python
pipeline
| beam.managed.Read(
beam.managed.KAFKA,
config={
"bootstrap_servers": options.bootstrap_server,
"topic": options.topic,
"data_format": "RAW",
# Set the following fields to authenticate with Application Default
# Credentials (ADC):
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class":
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config":
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
}
)
Configurar o conector KafkaIO
Os exemplos a seguir mostram como configurar o conector KafkaIO
para o
Serviço gerenciado para Apache Kafka:
Java
String bootstap = options.getBootstrap();
String topicName = options.getTopic();
// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers(bootstap)
.withTopic(topicName)
.withKeyDeserializer(IntegerSerializer.class)
.withValueDeserializer(StringDeserializer.class)
.withGCPApplicationDefaultCredentials())
// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
.withBootstrapServers(bootstrap)
.withTopic(topicName)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(StringSerializer.class)
.withGCPApplicationDefaultCredentials());
Python
WriteToKafka(
producer_config={
"bootstrap.servers": options.bootstrap_servers,
"security.protocol": 'SASL_SSL',
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
},
topic=options.topic,
key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)
A seguir
- Saiba mais sobre o Serviço gerenciado para Apache Kafka.
- Gravar dados do serviço gerenciado do Apache Kafka no BigQuery.
- Ler do Apache Kafka para o Dataflow.
- Gravar do Dataflow para o Apache Kafka.