Usar o Dataflow com o serviço gerenciado para Apache Kafka

Esta página descreve como usar o Google Cloud Managed Service para Apache Kafka como origem ou coletor em um pipeline do Dataflow.

Use uma das seguintes abordagens:

Requisitos

  • Ative as APIs Cloud Storage, Dataflow e Managed Service for Apache Kafka no seu projeto. Consulte Como ativar APIs ou execute o seguinte comando da Google Cloud CLI:

    gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
    
  • A conta de serviço do worker do Dataflow precisa ter o papel do Identity and Access Management (IAM) do cliente gerenciado do Kafka (roles/managedkafka.client).

  • As VMs de worker do Dataflow precisam ter acesso à rede do servidor de bootstrap do Kafka. Para mais informações, consulte Configurar a rede do Serviço gerenciado para Apache Kafka.

Receber o endereço do servidor de inicialização

Para executar um pipeline que se conecta a um cluster do Managed Service para Apache Kafka, primeiro receba o endereço do servidor de inicialização do cluster. Você precisa desse endereço ao configurar o pipeline.

Use o console Google Cloud ou a Google Cloud CLI da seguinte maneira:

Console

  1. No console Google Cloud , acesse a página Clusters.

    Acessar Clusters

  2. Clique no nome do cluster.

  3. Clique na guia Configurações.

  4. Copie o endereço do servidor de inicialização em URL de inicialização.

gcloud

Use o comando managed-kafka clusters describe.

gcloud managed-kafka clusters describe CLUSTER_ID \
  --location=LOCATION \
  --format="value(bootstrapAddress)"

Substitua:

  • CLUSTER_ID: o ID ou nome do cluster.
  • LOCATION: o local do cluster

Para mais informações, consulte Ver um cluster do serviço gerenciado para Apache Kafka.

Usar o serviço gerenciado para Apache Kafka com um modelo do Dataflow

O Google oferece vários modelos do Dataflow que leem do Apache Kafka:

Esses modelos podem ser usados com o Managed Service para Apache Kafka. Se um deles corresponder ao seu caso de uso, considere usá-lo em vez de escrever um código de pipeline personalizado.

Console

  1. Acesse a página Dataflow > Jobs.

    Acessar "Jobs"

  2. Clique em Criar job usando um modelo.

  3. Em Nome do job, insira um nome.

  4. No menu suspenso Modelo do Dataflow, selecione o modelo que você quer executar.

  5. Na caixa Servidor de inicialização do Kafka, digite o endereço do servidor de inicialização.

  6. Na caixa Tópico do Kafka, insira o nome do tópico.

  7. Em Modo de autenticação do Kafka, selecione APPLICATION_DEFAULT_CREDENTIALS.

  8. Em Formato de mensagem do Kafka, selecione o formato das mensagens do Apache Kafka.

  9. Insira outros parâmetros conforme necessário. Os parâmetros aceitos são documentados para cada modelo.

  10. Executar job.

gcloud

Use o comando gcloud dataflow jobs run.

gcloud dataflow jobs run JOB_NAME \
  --gcs-location gs://TEMPLATE_FILE \
  --region REGION_NAME \
  --parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...

Substitua:

  • JOB_NAME: um nome para o job
  • TEMPLATE_FILE: o local do arquivo de modelo no Cloud Storage
  • REGION_NAME: a região em que você quer implantar o job
  • PROJECT_NAME: o nome do seu Google Cloud projeto
  • LOCATION: o local do cluster
  • CLUSTER_ID: o ID ou nome do cluster.
  • TOPIC: o nome do tópico do Kafka

Usar o serviço gerenciado para Apache Kafka com um pipeline do Beam

Esta seção descreve como usar o SDK do Apache Beam para criar e executar um pipeline do Dataflow que se conecta ao Managed Service para Apache Kafka.

Na maioria dos cenários, use a transformação de E/S gerenciada como sua origem ou destino do Kafka. Se você precisar de um ajuste de performance mais avançado, use o conector KafkaIO. Para mais informações sobre os benefícios de usar a E/S gerenciada, consulte E/S gerenciada do Dataflow.

Requisitos

  • Cliente Kafka versão 3.6.0 ou mais recente.

  • SDK do Apache Beam versão 2.61.0 ou mais recente.

  • A máquina em que você inicia o job do Dataflow precisa ter acesso à rede do servidor de inicialização do Apache Kafka. Por exemplo, inicie o job em uma instância do Compute Engine que possa acessar a VPC em que o cluster está acessível.

  • O principal que cria o job precisa ter os seguintes papéis do IAM:

    • Cliente Kafka gerenciado (roles/managedkafka.client) para acessar o cluster do Apache Kafka.
    • Usuário da conta de serviço (roles/iam.serviceAccountUser) para atuar como a conta de serviço do worker do Dataflow.
    • Administrador do Storage (roles/storage.admin) para fazer upload de arquivos de jobs no Cloud Storage.
    • Administrador do Dataflow (roles/dataflow.admin) para criar o job.

    Se você iniciar o job em uma instância do Compute Engine, poderá conceder esses papéis a uma conta de serviço anexada à VM. Para mais informações, consulte Criar uma VM que usa uma conta serviço gerenciado pelo usuário.

    Também é possível usar Application Default Credentials (ADC) com a representação de conta de serviço ao criar o job.

Configurar E/S gerenciada

Se o pipeline usar o Managed I/O para Apache Kafka, defina as seguintes opções de configuração para autenticar com o Serviço gerenciado para Apache Kafka:

  • "security.protocol": "SASL_SSL"
  • "sasl.mechanism": "OAUTHBEARER"
  • "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
  • "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"

Os exemplos a seguir mostram como configurar o Managed I/O para o Managed Service para Apache Kafka:

Java

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
      .put("bootstrap_servers", options.getBootstrapServer())
      .put("topic", options.getTopic())
      .put("data_format", "RAW")
      // Set the following fields to authenticate with Application Default
      // Credentials (ADC):
      .put("security.protocol", "SASL_SSL")
      .put("sasl.mechanism", "OAUTHBEARER")
      .put("sasl.login.callback.handler.class",
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
      .put("sasl.jaas.config",   "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
      .build();

Python

pipeline
| beam.managed.Read(
    beam.managed.KAFKA,
    config={
      "bootstrap_servers": options.bootstrap_server,
      "topic": options.topic,
      "data_format": "RAW",
      # Set the following fields to authenticate with Application Default
      # Credentials (ADC):
      "security.protocol": "SASL_SSL",
      "sasl.mechanism": "OAUTHBEARER",
      "sasl.login.callback.handler.class":
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
      "sasl.jaas.config":
          "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
    }
)

Configurar o conector KafkaIO

Os exemplos a seguir mostram como configurar o conector KafkaIO para o Serviço gerenciado para Apache Kafka:

Java

String bootstap = options.getBootstrap();
String topicName = options.getTopic();

// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
    .withBootstrapServers(bootstap)
    .withTopic(topicName)
    .withKeyDeserializer(IntegerSerializer.class)
    .withValueDeserializer(StringDeserializer.class)
    .withGCPApplicationDefaultCredentials())

// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
    .withBootstrapServers(bootstrap)
    .withTopic(topicName)
    .withKeySerializer(IntegerSerializer.class)
    .withValueSerializer(StringSerializer.class)
    .withGCPApplicationDefaultCredentials());

Python

WriteToKafka(
  producer_config={
    "bootstrap.servers": options.bootstrap_servers,
    "security.protocol": 'SASL_SSL',
    "sasl.mechanism": "OAUTHBEARER",
    "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
    "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
  },
  topic=options.topic,
  key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
  value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)

A seguir