Use Kafka Connect with schema registry


This tutorial shows how to integrate Kafka Connect with schema registry, so that connectors can use registered schemas to serialize and deserialize messages.

Before you start this tutorial, complete the steps in Produce Avro messages with the schema registry. After you complete them, you have the following components that are needed for this tutorial:

  • A schema registry with a registered Avro schema.
  • A Kafka cluster with a topic that contains Avro messages.

Overview

A schema defines the structure of a message or data payload. A schema registry provides a centralized location to store schemas. By using a schema registry, you can ensure consistent encoding and decoding of your data as it moves from producers to consumers.

This tutorial demonstrates the following scenario:

  1. A Java producer client writes Apache Avro messages to a Managed Service for Apache Kafka cluster. The Avro message schema is stored in a schema registry.

  2. A BigQuery Sink connector reads the incoming messages from Kafka and writes them to BigQuery. The connector automatically creates a BigQuery table that is compatible with the Avro schema.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Managed Kafka API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Managed Kafka API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  8. Make sure that you have the following role or roles on the project: Managed Kafka Cluster Editor, Managed Kafka Connect Cluster Editor, Managed Kafka Connector Editor, and BigQuery Data Owner

    Check for the roles

    1. In the Google Cloud console, go to the IAM page.

      Go to IAM
    2. Select the project.
    3. In the Principal column, find all rows that identify you or a group that you're included in. To learn which groups you're included in, contact your administrator.

    4. For all rows that specify or include you, check the Role column to see whether the list of roles includes the required roles.

    Grant the roles

    1. In the Google Cloud console, go to the IAM page.

      Go to IAM
    2. Select the project.
    3. Click Grant access.
    4. In the New principals field, enter your user identifier. This is typically the email address for a Google Account.

    5. In the Select a role list, select a role.
    6. To grant additional roles, click Add another role and add each additional role.
    7. Click Save.
  9. Complete the steps in Produce Avro messages with the schema registry.

Create a Connect cluster

Perform the following steps to create a Connect cluster. Creating a Connect cluster can take up to 30 minutes.

Console

  1. Go to the Managed Service for Apache Kafka > Connect Clusters page.

    Go to Connect Clusters

  2. Click Create.

  3. For the Connect cluster name, enter a string. Example: my-connect-cluster.

  4. For Primary Kafka cluster, select the Kafka cluster.

  5. Click Create.

While the Connect cluster is being created, the cluster state is Creating. When the cluster has finished being created, the state is Active.

gcloud

To create a Connect cluster, run the gcloud alpha managed-kafka connect-clusters create command.

gcloud alpha managed-kafka connect-clusters create CONNECT_CLUSTER \
  --location=REGION \
  --cpu=12 \
  --memory=12GiB \
  --primary-subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
  --kafka-cluster=KAFKA_CLUSTER \
  --async

Replace the following:

  • CONNECT_CLUSTER: a name for the Connect cluster
  • REGION: the region where you created the Kafka cluster
  • PROJECT_ID: your project ID
  • SUBNET_NAME: the subnet where you created the Kafka cluster
  • KAFKA_CLUSTER: the name of your Kafka cluster

The command runs asynchronously and returns an operation ID:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

To track the progress of the create operation, use the gcloud managed-kafka operations describe command:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

For more information, see Monitor the cluster creation operation.

Grant IAM roles

Grant the BigQuery Data Editor Identity and Access Management (IAM) role to the Managed Kafka service account. This role allows the connector to write to BigQuery.

Console

  1. In the Google Cloud console, go to the IAM page.

    Go to IAM

  2. Select Include Google-provided role grants.

  3. Find the row for Managed Kafka Service Account and click Edit principal.

  4. Click Add another role and select the role BigQuery Data Editor.

  5. Click Save.

For more information about granting roles, see Grant an IAM role by using the console.

gcloud

To grant IAM roles to the service account, run the gcloud projects add-iam-policy-binding command.

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --role=roles/bigquery.dataEditor

Replace the following:

  • PROJECT_ID: your project ID
  • PROJECT_NUMBER: your project number

To find your project number, use the gcloud projects describe command.

Create a BigQuery dataset

In this step, you create a dataset to hold the BigQuery table. The BigQuery Sink connector automatically creates the table.

To create a dataset, perform the following steps.

Console

  1. Open the BigQuery page.

    Go to the BigQuery page

  2. In the Explorer panel, select the project where you want to create the dataset.

  3. Expand the View actions option and click Create dataset.

  4. On the Create dataset page:

    • For Dataset ID, enter a name for the dataset.

    • For Location type, choose a geographic location for the dataset.

gcloud

To create a new dataset, use the bq mk command with the --dataset flag.

bq mk --location REGION \
  --dataset PROJECT_ID:DATASET_NAME

Replace the following:

  • PROJECT_ID: your project ID
  • DATASET_NAME: the name of the dataset
  • REGION: the dataset's location

Create a BigQuery Sink connector

In this step, you create a BigQuery Sink connector, which reads from Kafka and writes to BigQuery. You configure the connector to deserialize the Kafka messages by using the Avro schema stored in your schema registry.

Console

  1. Go to the Managed Service for Apache Kafka > Connect Clusters page.

    Go to Connect Clusters

  2. Click the name of the Connect cluster.

  3. Click Create connector.

  4. For the Connector name, enter a string. Example: bigquery-connector.

  5. In the Connector plugin list, select BigQuery Sink.

  6. For Topics, select the Kafka topic named newUsers. This topic is created by the Java producer client.

  7. For Dataset, enter the name of the BigQuery dataset, in the following format: PROJECT_ID.DATASET_NAME. Example: my-project.dataset1.

  8. In the Configurations edit box, replace the existing configuration with the following:

    tasks.max=3
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.bearer.auth.credentials.source=GCP
    value.converter.schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGISTRY_REGION/schemaRegistries/REGISTRY_NAME
    

    Replace the following:

    • PROJECT_ID: your project ID
    • REGISTRY_REGION: the region where you created your schema registry
    • REGISTRY_NAME: the name of your schema registry
  9. Click Create.

gcloud

To create a BigQuery Sink connector, run the gcloud alpha managed-kafka connectors create command.


export REGISTRY_PATH=projects/PROJECT_ID/locations/REGISTRY_REGION/schemaRegistries/REGISTRY_NAME

gcloud alpha managed-kafka connectors create CONNECTOR_NAME \
  --location=REGION \
  --connect_cluster=CONNECT_CLUSTER \
  --configs=connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector,\
key.converter=org.apache.kafka.connect.storage.StringConverter,\
value.converter=io.confluent.connect.avro.AvroConverter,\
value.converter.bearer.auth.credentials.source=GCP,\
value.converter.schema.registry.url=https://managedkafka.googleapis.com/v1/$REGISTRY_PATH,\
tasks.max=3,\
project=PROJECT_ID,\
defaultDataset=DATASET_NAME,\
topics=newUsers

Replace the following:

  • REGISTRY_REGION: the region where you created your schema registry
  • REGISTRY_NAME: the name of your schema registry
  • CONNECTOR_NAME: a name for the connector, such as bigquery-connector
  • CONNECT_CLUSTER: the name of your Connect cluster
  • REGION: the region where you created the Connect cluster
  • PROJECT_ID: your project ID
  • DATASET_NAME: the name of your BigQuery dataset

View results

To view the results in BigQuery, run a query on the table as follows. It might take a few minutes for the first messages to be written to the table.

Console

  1. Open the BigQuery page.

    Go to the BigQuery page

  2. In the query editor, run the following query:

    SELECT * FROM `PROJECT_ID.DATASET_NAME.newUsers`
    

    Replace the following variables:

    • PROJECT_ID: the name of your Google Cloud project
    • DATASET_NAME: the name of your BigQuery dataset

gcloud

Use the bq query command to run a query on the table:

bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.newUsers`'

Replace the following variables:

  • PROJECT_ID: the name of your Google Cloud project
  • DATASET_NAME: the name of your BigQuery dataset

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

Console

  1. Delete the Connect cluster.

    1. Go to the Managed Service for Apache Kafka > Connect Clusters page.

      Go to Connect Clusters

    2. Select the Connect cluster and click Delete.

  2. Delete the Kafka cluster.

    1. Go to the Managed Service for Apache Kafka > Clusters page.

      Go to Clusters

    2. Select the Kafka cluster and click Delete.

  3. Delete the BigQuery table and dataset.

    1. Go to the BigQuery page.

      Go to the BigQuery page

    2. In the Explorer pane, expand your project and select a dataset.

    3. Expand the Actions option and click Delete.

    4. In the Delete dataset dialog, type delete into the field, and then click Delete.

gcloud

  1. To delete the Connect cluster, use the gcloud alpha managed-kafka connect-clusters delete command.

    gcloud alpha managed-kafka connect-clusters delete CONNECT_CLUSTER \
      --location=REGION --async
    
  2. To delete the Kafka cluster, use the gcloud managed-kafka clusters delete command.

    gcloud managed-kafka clusters delete KAFKA_CLUSTER \
      --location=REGION --async
    
  3. To delete both the BigQuery dataset and the BigQuery table, use the bq rm command.

    bq rm --recursive --dataset PROJECT_ID:DATASET_NAME
    

What's next