Connect Pub/Sub to Apache Kafka

This document describes how to integrate Apache Kafka and Pub/Sub by using the Pub/Sub Group Kafka Connector.

About the Pub/Sub Group Kafka Connector

Apache Kafka is an open source platform for streaming events. It is commonly used in distributed architectures to enable communication between loosely coupled components. Pub/Sub is a managed service for sending and receiving messages asynchronously. As with Kafka, you can use Pub/Sub to communicate between components in your cloud architecture.

The Pub/Sub Group Kafka Connector allows you to integrate these two systems. The following connectors are packaged in the Connector JAR:

  • The sink connector reads records from one or more Kafka topics and publishes them to Pub/Sub.
  • The source connector reads messages from a Pub/Sub topic and publishes them to Kafka.

Here are some scenarios in which you might use the Pub/Sub Group Kafka Connector:

  • You are migrating a Kafka-based architecture to Google Cloud.
  • You have a frontend system that stores events in Kafka outside of Google Cloud, but you also use Google Cloud to run some of your backend services, which need to receive the Kafka events.
  • You collect logs from an on-premises Kafka solution and send them to Google Cloud for data analytics.
  • You have a frontend system that uses Google Cloud, but you also store data on-premises using Kafka.

The connector requires Kafka Connect, which is a framework for streaming data between Kafka and other systems. To use the connector, you must run Kafka Connect alongside your Kafka cluster.

This document assumes that you are familiar with both Kafka and Pub/Sub. Before you read this document, it's a good idea to complete one of the Pub/Sub quickstarts.

The Pub/Sub connector does not support any integration between Google Cloud IAM and Kafka Connect ACLs.

Get started with the connector

This section walks you through the following tasks:

  1. Configure the Pub/Sub Group Kafka Connector.
  2. Send events from Kafka to Pub/Sub.
  3. Send messages from Pub/Sub to Kafka.

Prerequisites

Install Kafka

Follow the Apache Kafka quickstart to install a single-node Kafka on your local machine. Complete these steps in the quickstart:

  1. Download the latest Kafka release and extract it.
  2. Start the Kafka environment.
  3. Create a Kafka topic.

Authenticate

The Pub/Sub Group Kafka Connector must authenticate with Pub/Sub in order to send and receive Pub/Sub messages. To set up authentication, perform the following steps:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Create local authentication credentials for your Google Account:

    gcloud auth application-default login
  6. Grant roles to your Google Account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace EMAIL_ADDRESS with your email address.
    • Replace ROLE with each individual role.
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Create local authentication credentials for your Google Account:

    gcloud auth application-default login
  11. Grant roles to your Google Account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace EMAIL_ADDRESS with your email address.
    • Replace ROLE with each individual role.

Download the connector JAR

Download the connector JAR file to your local machine. For more information, see Acquire the connector in the GitHub readme.

Copy the connector configuration files

  1. Clone or download the GitHub repository for the connector.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Copy the contents of the config directory into the config subdirectory of your Kafka installation.

    cp config/* [path to Kafka installation]/config/
    

These files contain configuration settings for the connector.

Update your Kafka Connect configuration

  1. Navigate to the directory that contains the Kafka Connect binary that you downloaded.
  2. In the Kafka Connect binary directory, open the file named config/connect-standalone.properties in a text editor.
  3. If the plugin.path property is commented out, uncomment it.
  4. Update the plugin.path property to include the path to the connector JAR.

    Example:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Set the offset.storage.file.filename property to a local file name. In standalone mode, Kafka uses this file to store offset data.

    Example:

    offset.storage.file.filename=/tmp/connect.offsets
    

Forward events from Kafka to Pub/Sub

This section describes how to start the sink connector, publish events to Kafka, and then read the forwarded messages from Pub/Sub.

  1. Use the Google Cloud CLI to create a Pub/Sub topic with a subscription.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Replace the following:

    • PUBSUB_TOPIC: The name of a Pub/Sub topic to receive the messages from Kafka.
    • PUBSUB_SUBSCRIPTION: The name of a Pub/Sub subscription for the topic.
  2. Open the file /config/cps-sink-connector.properties in a text editor. Add values for the following properties, which are marked "TODO" in the comments:

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC
    

    Replace the following:

    • KAFKA_TOPICS: A comma-separated list of Kafka topics to read from.
    • PROJECT_ID: The Google Cloud project that contains your Pub/Sub topic.
    • PUBSUB_TOPIC: The Pub/Sub topic to receive the messages from Kafka.
  3. From the Kafka directory, run the following command:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Follow the steps in the Apache Kafka quickstart to write some events to your Kafka topic.

  5. Use the gcloud CLI to read the events from Pub/Sub.

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
    

Forward messages from Pub/Sub to Kafka

This section describes how to start the source connector, publish messages to Pub/Sub, and read the forwarded messages from Kafka.

  1. Use the gcloud CLI to create a Pub/Sub topic with a subscription.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Replace the following:

    • PUBSUB_TOPIC: The name of a Pub/Sub topic.
    • PUBSUB_SUBSCRIPTION: The name of a Pub/Sub subscription.
  2. Open the file named /config/cps-source-connector.properties in a text editor. Add values for the following properties, which are marked "TODO" in the comments:

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION
    

    Replace the following:

    • KAFKA_TOPIC: The Kafka topics to receive the Pub/Sub messages.
    • PROJECT_ID: The Google Cloud project that contains your Pub/Sub topic.
    • PUBSUB_TOPIC: The Pub/Sub topic.
  3. From the Kafka directory, run the following command:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. Use the gcloud CLI to publish a message to Pub/Sub.

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    
  5. Read the message from Kafka. Follow the steps in the Apache Kafka quickstart to read the messages from the Kafka topic.

Message conversion

A Kafka record contains a key and a value, which are variable-length byte arrays. Optionally, a Kafka record can also have headers, which are key-value pairs. A Pub/Sub message has two main parts: the message body and zero or more key-value attributes.

Kafka Connect uses converters to serialize keys and values to and from Kafka. To control the serialization, set the following properties in the connector configuration files:

  • key.converter: The converter used to serialize record keys.
  • value.converter: The converter used to serialize record values.

The body of a Pub/Sub message is a ByteString object, so the most efficient conversion is to copy the payload directly. For that reason, we recommend using a converter that produces primitive data types (integer, float, string, or bytes schema) where possible, to prevent deserializing and reserializing the same message body.

Conversion from Kafka to Pub/Sub

The sink connector converts Kafka records to Pub/Sub messages as follows:

  • The Kafka record key is stored as an attribute named "key" in the Pub/Sub message.
  • By default, the connector drops any headers in the Kafka record. However, if you set the headers.publish configuration option to true, the connector writes the headers as Pub/Sub attributes. The connector skips any headers that exceed the Pub/Sub limits on message attributes.
  • For integer, float, string, and bytes schemas, the connector passes the bytes of the Kafka record value directly into the Pub/Sub message body.
  • For struct schemas, the connector writes each field as an attribute of the Pub/Sub message. For example, if the field is { "id"=123 }, the resulting Pub/Sub message has an attribute "id"="123". The field value is always converted to a string. Map and struct types are not supported as field types within a struct.
  • For map schemas, the connector writes each key-value pair as an attribute of the Pub/Sub message. For example, if the map is {"alice"=1,"bob"=2}, the resulting Pub/Sub message has two attributes, "alice"="1" and "bob"="2". The keys and values are converted to strings.

Struct and map schemas have some additional behaviors:

  • Optionally, you can specify a particular struct field or map key to be the message body, by setting the messageBodyName configuration property. The value of the field or key is stored as a ByteString in the message body. If you don't set messageBodyName, then the message body is empty for struct and map schemas.

  • For array values, the connector supports only primitive array types. The sequence of values in the array is concatenated into a single ByteString object.

Conversion from Pub/Sub to Kafka

The source connector converts Pub/Sub messages to Kafka records as follows:

  • Kafka record key: By default, the key is set to null. Optionally, you can specify a Pub/Sub message attribute to use as the key, by setting the kafka.key.attribute configuration option. In that case, the connector looks for an attribute with that name and sets the record key to the attribute value. If the specified attribute is not present, the record key is set to null.

  • Kafka record value. The connector writes the record value as follows:

    • If the Pub/Sub message has no custom attributes, the connector writes the Pub/Sub message body directly to the Kafka record value as a byte[] type, using the converter specified by value.converter.

    • If the Pub/Sub message has custom attributes and kafka.record.headers is false, the connector writes a struct to the record value. The struct contains one field for each attribute, and a field named "message" whose value is the Pub/Sub message body (stored as bytes):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      In this case, you must use a value.converter that is compatible with struct schemas, such as org.apache.kafka.connect.json.JsonConverter.

    • If the Pub/Sub message has custom attributes and kafka.record.headers is true, the connector writes the attributes as Kafka record headers. It writes the Pub/Sub message body directly to the Kafka record value as a byte[] type, using the converter specified by value.converter.

  • Kafka record headers. By default, the headers are empty, unless you set kafka.record.headers to true.

Configuration options

In addition to the configurations provided by the Kafka Connect API, the Pub/Sub Group Kafka Connector supports the following configurations.

Sink connector configuration options

The sink connector supports the following configuration options.

Setting Data type Description
connector.class String Required. The Java class for the connector. For the Pub/Sub sink connector, the value must be com.google.pubsub.kafka.sink.CloudPubSubSinkConnector.
cps.endpoint String

The Pub/Sub endpoint to use.

Default: "pubsub.googleapis.com:443".

cps.project String Required. The Google Cloud that contains the Pub/Sub topic.
cps.topic String Required. The Pub/Sub topic to publish Kafka records to.
gcp.credentials.file.path String Optional. The path to a file that stores Google Cloud credentials for authenticating Pub/Sub Lite.
gcp.credentials.json String Optional. A JSON blob that contains Google Cloud for authenticating Pub/Sub Lite.
headers.publish Boolean

When true, include any Kafka record headers as Pub/Sub message attributes.

Default: false.

maxBufferBytes Long

The maximum number of bytes to receive on a topic Kafka partition before publishing them to Pub/Sub.

Default: 10000000.

maxBufferSize Integer

The maximum number of records to receive on a Kafka topic partition before publishing them to Pub/Sub.

Default: 100.

maxDelayThresholdMs Integer

The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding records to Pub/Sub, in milliseconds.

Default: 100.

maxOutstandingMessages Long

The maximum number of records that can be outstanding, including incomplete and pending batches, before the publisher blocks further publishing.

Default: Long.MAX_VALUE.

maxOutstandingRequestBytes Long

The maximum number of total bytes that can be outstanding, including incomplete and pending batches, before the publisher blocks further publishing.

Default: Long.MAX_VALUE.

maxRequestTimeoutMs Integer

The timeout for individual publish requests to Pub/Sub, in milliseconds.

Default: 10000.

maxTotalTimeoutMs Integer

The total timeout, in milliseconds, for a call to publish to Pub/Sub, including retries.

Default: 60000.

metadata.publish Boolean

When true, include the Kafka topic, partition, offset, and timestamp as Pub/Sub message attributes.

Default: false.

messageBodyName String

When using a struct or map value schema, specifies the name of a field or key to use as the Pub/Sub message body. See Conversion from Kafka to Pub/Sub.

Default: "cps_message_body".

orderingKeySource String

Specifies how to set the ordering key in the Pub/Sub message. Can be one of the following values:

  • none: Do not set the ordering key.
  • key: Use the Kafka record key as the ordering key.
  • partition: Use the partition number, converted to a string, as the ordering key. Only use this setting for low-throughput topics or topics with thousands of partitions.

Default: none.

topics String Required. A comma-separated list of Kafka topics to read from.

Source connector configuration options

The source connector supports the following configuration options.

Setting Data type Description
connector.class String Required. The Java class for the connector. For the Pub/Sub source connector, the value must be com.google.pubsub.kafka.source.CloudPubSubSourceConnector.
cps.endpoint String

The Pub/Sub endpoint to use.

Default: "pubsub.googleapis.com:443".

cps.makeOrderingKeyAttribute Boolean

When true, write the ordering key to the Kafka record, using the same format as the Pub/Sub message attributes. See Conversion from Pub/Sub to Kafka records.

Default: false.

cps.maxBatchSize Integer

The maximum number of messages to batch per pull request to Pub/Sub.

Default: 100

cps.project String Required. The Google Cloud project that contains the Pub/Sub topic.
cps.subscription String Required. The name of the Pub/Sub subscription to pull messages from.
gcp.credentials.file.path String Optional. The path to a file that stores Google Cloud credentials for authenticating Pub/Sub Lite.
gcp.credentials.json String Optional. A JSON blob that contains Google Cloud for authenticating Pub/Sub Lite.
kafka.key.attribute String

The Pub/Sub message attribute to use as a key for messages published to Kafka. If set to "orderingKey", use the message's ordering key. If null, the Kafka records don't have a key.

Default: null.

kafka.partition.count Integer

The number of Kafka partitions for the Kafka topic where messages are published. This parameter is ignored if the partition scheme is "kafka_partitioner".

Default: 1.

kafka.partition.scheme String

The scheme for assigning a message to a partition in Kafka. Can be one of the following values:

  • round_robin: Assign partitions in a round robin fashion.
  • hash_key: Find the partition by hashing the record key.
  • hash_value: Find the partition by hashing the record value.
  • kafka_partitioner: Delegate partitioning logic to the Kafka producer. By default, the Kafka producer automatically detects the number of partitions and performs either murmur hash-based partition mapping or round robin, depending on whether a record key is provided.
  • ordering_key: Use the hash code of a message's ordering key. If no ordering key is present, use round_robin.

Default: round_robin.

kafka.record.headers Boolean

If true, write Pub/Sub message attributes as Kafka headers.

kafka.topic String Required. The Kafka topic that receives messages from Pub/Sub.

Getting support

If you need help, create a support ticket. For general questions and discussions, create an issue in the GitHub repository.

What's next