Develop a Python producer application

Learn how to create a Google Cloud Managed Service for Apache Kafka cluster and write a Python producer application that can use Application Default Credentials (ADC). ADC is a way for your applications running on Google Cloud to automatically find and use the right credentials for authenticating to Google Cloud services.

Before you begin

Follow these steps to set up the gcloud CLI and a Google Cloud project. These are required to complete this guide.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Managed Kafka, Compute Engine, and Cloud DNS APIs:

    gcloud services enable managedkafka.googleapis.com compute.googleapis.com dns.googleapis.com
  8. Install the Google Cloud CLI.

  9. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Verify that billing is enabled for your Google Cloud project.

  13. Enable the Managed Kafka, Compute Engine, and Cloud DNS APIs:

    gcloud services enable managedkafka.googleapis.com compute.googleapis.com dns.googleapis.com

Create a cluster

A Managed Service for Apache Kafka cluster is defined by its project, location, size, and networking configuration. The size of the cluster is the number of vCPUs and RAM across all brokers in the cluster. Setup of individual brokers and storage is automatic. Networking configuration is a set of subnets in which broker and bootstrap IP addresses are provisioned. Here we use the default Virtual Private Cloud and subnet in REGION.

To create a cluster, run the gcloud managed-kafka clusters create command.

gcloud managed-kafka clusters create CLUSTER_ID \
  --location=REGION \
  --cpu=3 \
  --memory=3GiB \
  --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/default \
  --async

Replace the following:

  • CLUSTER_ID: a name for the cluster. For guidelines on how to name a cluster, see Cluster, topic, or consumer group ID.

  • PROJECT_ID: the name of the project that you created in the previous section. For guidelines on how to find the project ID, see Find the project name, number, and ID. After you run the command, you should get a response similar to the following:

  • REGION: the name of a Compute Engine region, such as us-central1.

The response is similar to the following:

Create request issued for: [CLUSTER_ID]

This operation returns immediately, but cluster creation might take around half an hour. You can monitor the state of the cluster using the gcloud managed-kafka clusters describe command.

gcloud managed-kafka clusters describe CLUSTER_ID \
  --location=REGION

The output of the command is similar to the following:

bootstrapAddress: bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
capacityConfig:
  memoryBytes: '3221225472'
  vcpuCount: '3'
createTime: '2024-05-28T04:32:08.671168869Z'
gcpConfig:
  accessConfig:
    networkConfigs:
    - subnet: projects/PROJECT_ID/regions/REGION/subnetworks/default
name: projects/PROJECT_ID/locations/REGION/clusters/CLUSTER_ID
rebalanceConfig:
  mode: AUTO_REBALANCE_ON_SCALE_UP
state: CREATING
updateTime: '2024-05-28T04:32:08.671168869Z'

The state field is useful for monitoring the creation operation. You can use the cluster after the state turns to ACTIVE. The bootstrapAddress is the URL you use to connect to the cluster.

Set up a client VM

A producer application must run on a machine with network access to the cluster. We use a Compute Engine virtual machine instance (VM). This VM must be in the same region as the Kafka cluster. It must also be in the VPC containing the subnet that you've used in the cluster configuration.

  1. To create the client VM, run the following command:

    gcloud compute instances create test-instance \
      --scopes=https://www.googleapis.com/auth/cloud-platform \
      --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/default \
      --zone=REGION-c
    
  2. Give the Compute Engine default service account the necessary permissions to connect to the cluster and authenticate. You need to grant the Managed Kafka Client role (roles/managedkafka.client), the Service Account Token Creator role (roles/iam.serviceAccountTokenCreator), and the Service Account OpenID Token Creator role (roles/iam.serviceAccountOpenIdTokenCreator).

    gcloud projects add-iam-policy-binding PROJECT_ID \
       --member="serviceAccount:PROJECT_NUMBER-compute@\
         developer.gserviceaccount.com" \
       --role=roles/managedkafka.client
    
    gcloud projects add-iam-policy-binding PROJECT_ID\
       --member="serviceAccount:PROJECT_NUMBER-compute@\
         developer.gserviceaccount.com" \
       --role=roles/iam.serviceAccountTokenCreator
    
    gcloud projects add-iam-policy-binding PROJECT_ID \
       --member="serviceAccount:PROJECT_NUMBER-compute@\
         developer.gserviceaccount.com" \
       --role=roles/iam.serviceAccountOpenIdTokenCreator
    

    Replace PROJECT_NUMBER with the number of the project containing the cluster. You can look up this number using gcloud projects describe PROJECT_ID.

Create a Python producer application

  1. Connect to the client VM using SSH. One way to do this is to run the following command:

    gcloud compute ssh --project=PROJECT_ID \
        --zone=REGION-f test-instance
    

    Replace PROJECT_ID with your Google Cloud project name.

    For more information about connecting using SSH, see About SSH connections.

  2. Install pip, a Python package manager and the virtual environment manager:

    sudo apt install python3-pip -y
    sudo apt install python3-venv -y
    
  3. Create a new virtual environment (venv) and activate it:

    python3 -m venv kafka
    source kafka/bin/activate
    
  4. Install the confluent-kafka client and other dependencies:

    pip install confluent-kafka google-auth urllib3 packaging
    
  5. Copy the following producer client code into a file called producer.py

    import confluent_kafka
    import argparse
    from tokenprovider import TokenProvider
    
    parser = argparse.ArgumentParser()
    parser.add_argument('-b', '--bootstrap-servers', dest='bootstrap', type=str, required=True)
    parser.add_argument('-t', '--topic-name', dest='topic_name', type=str, default='example-topic', required=False)
    parser.add_argument('-n', '--num_messages', dest='num_messages', type=int, default=1, required=False)
    args = parser.parse_args()
    
    token_provider = TokenProvider()
    
    config = {
        'bootstrap.servers': args.bootstrap,
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'OAUTHBEARER',
        'oauth_cb': token_provider.get_token,
    }
    
    producer = confluent_kafka.Producer(config)
    
    def callback(error, message):
        if error is not None:
            print(error)
            return
        print("Delivered a message to {}[{}]".format(message.topic(), message.partition()))
    
    for i in range(args.num_messages):
    
      message = f"{i} hello world!".encode('utf-8')
      producer.produce(args.topic_name, message, callback=callback)
    
    producer.flush()
    
  6. You now need an implementation of the OAuth token provider. Save the following code in a file called tokenprovider.py:

    import base64
    import datetime
    import http.server
    import json
    import google.auth
    from google.auth.transport.urllib3 import Request
    import urllib3
    import time
    
    def encode(source):
      """Safe base64 encoding."""
      return base64.urlsafe_b64encode(source.encode('utf-8')).decode('utf-8').rstrip('=')
    
    class TokenProvider(object):
      """
      Provides OAuth tokens from Google Cloud Application Default credentials.
      """
      HEADER = json.dumps({'typ':'JWT', 'alg':'GOOG_OAUTH2_TOKEN'})
    
      def __init__(self, **config):
        self.credentials, _project = google.auth.default()
        self.http_client = urllib3.PoolManager()
    
      def get_credentials(self):
        if not self.credentials.valid:
          self.credentials.refresh(Request(self.http_client))
        return self.credentials
    
      def get_jwt(self, creds):
        token_data = dict(
                exp=creds.expiry.timestamp(),
                iat=datetime.datetime.now(datetime.timezone.utc).timestamp(),
                iss='Google',
                scope='kafka',
                sub=creds.service_account_email,
        )
        return json.dumps(token_data)
    
      def get_token(self, args):
        creds = self.get_credentials()
        token = '.'.join([
          encode(self.HEADER),
          encode(self.get_jwt(creds)),
          encode(creds.token)
        ])
    
        # compute expiry time explicitly
        utc_expiry = creds.expiry.replace(tzinfo=datetime.timezone.utc)
        expiry_seconds = (utc_expiry - datetime.datetime.now(datetime.timezone.utc)).total_seconds()
    
        return token, time.time() + expiry_seconds
    
  7. You are now ready to run the application:

    python producer.py -b bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.

To delete the cluster, run the gcloud managed-kafka clusters delete command:

gcloud managed-kafka clusters delete CLUSTER_ID --location=REGION

To delete the VM, run the gcloud compute instances delete command:

gcloud instances delete test-instance \
  --zone=REGION-c

What's next

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.