Apache Kafka
The Apache Kafka connector lets you perform insert, delete, update, and read operations on Apache Kafka database.
Supported versions
The Apache Kafka connector leverages the native client libraries to establish a connection to a given Kafka cluster, and the connector works with client version 3.3.1. However, the connector can establish connectivity to Kafka clusters from versions 3.0 to 3.3.1.
Before you begin
Before using the Apache Kafka connector, do the following tasks:
- In your Google Cloud project:
- Grant the roles/connectors.admin IAM role to the user configuring the connector.
- Grant the following IAM roles to the service account that you want to use for the connector:
roles/secretmanager.viewer
roles/secretmanager.secretAccessor
A service account is a special type of Google account intended to represent a non-human user that needs to authenticate and be authorized to access data in Google APIs. If you don't have a service account, you must create a service account. For more information, see Creating a service account.
- Enable the following services:
secretmanager.googleapis.com
(Secret Manager API)connectors.googleapis.com
(Connectors API)
To understand how to enable services, see Enabling services.
If these services or permissions have not been enabled for your project previously, you are prompted to enable them when configuring the connector.
Configure the connector
Configuring the connector requires you to create a connection to your data source (backend system). A connection is specific to a data source. It means that if you have many data sources, you must create a separate connection for each data source. To create a connection, do the following steps:
- In the Cloud console, go to the Integration Connectors > Connections page and then select or create a Google Cloud project.
- Click + CREATE NEW to open the Create Connection page.
- In the Location step, choose the location for the new Apache Kafka connection:
- Region: Select a region from the list.
- Click Next.
- In the Connection Details step, provide details about the new Apache Kafka connection:
- Connector version: Choose an available version of the Apache Kafka connector from the list.
- Connection Name: Enter a name for the Apache Kafka connection.
- (Optional) Description: Enter a description for the connection.
- (Optional) Enable Cloud Logging: Select this checkbox to store all log data of the connection.
- Service Account: Select a service account with the required IAM roles for the Apache Kafka connection.
- The Enable event subscription, entity and actions option is selected by default for the Apache Kafka connection.
- Type Detection Scheme: Select
MessageOnly
. - Registry Service: The Schema Registry service used for working with topic schemas.
- Registry Type: Type of the schema specified for the a specific topic.
-
Registry Version: Version of the schema read from
RegistryUrl
for the specified topic. -
Registry User: Username or Access Key value to authorize with the server specified in
RegistryUrl
. -
Registry Password: Secret Manager Secret containing the password/secret Key value to authorize with the server specified in
RegistryUrl
. - Optionally, configure the Connection node settings:
- Minimum number of nodes: Enter the minimum number of connection nodes.
- Maximum number of nodes: Enter the maximum number of connection nodes.
A node is a unit (or replica) of a connection that processes transactions. More nodes are required to process more transactions for a connection and conversely, fewer nodes are required to process fewer transactions. To understand how the nodes affect your connector pricing, see Pricing for connection nodes. If you don't enter any values, by default the minimum nodes are set to 2 (for better availability) and the maximum nodes are set to 50.
- Optionally, click + ADD LABEL to add a label to the Connection in the form of a key and value pair.
- Enable SSL: This field sets whether SSL is enabled.
- Click Next.
- In the Destinations section, enter details of the remote host (backend system) you want to connect to.
- Destination Type: Select a Destination Type.
- Select Host address from the list to specify the hostname or IP address of the destination.
- If you want to establish a private connection to your backend systems, select Endpoint attachment from the list, and then select the required endpoint attachment from the Endpoint Attachment list.
If you want to establish a public connection to your backend systems with additional security, you can consider configuring static outbound IP addresses for your connections, and then configure your firewall rules to allowlist only the specific static IP addresses.
To enter additional destinations, click +ADD DESTINATION.
- Click Next.
- Destination Type: Select a Destination Type.
-
In the Authentication section, enter the authentication details.
- Select an Authentication type and enter the relevant details.
The following authentication types are supported by the Apache Kafka connection:
-
Username and password
- Username: The Apache Kafka username to use for the connection.
- Password: Secret Manager Secret containing the password associated with the Apache Kafka username.
- Auth Scheme: The scheme used for authentication.
The following Auth Scheme are supported by the Apache Kafka connection:
- Plain
- SCRAM-SHA-1
- SCRAM-SHA-256
-
Not Available
If you want to use anonymous login, select Not Available.
-
Username and password
- Click Next.
- Select an Authentication type and enter the relevant details.
- Enter the dead-letter configuration. If you configure dead-letter, the connection writes
the unprocessed events to the specified Pub/Sub topic. Enter the following details:
- Dead-letter project ID: The Google Cloud project ID where you have configured the dead-letter Pub/Sub topic.
- Dead-letter topic: The Pub/Sub topic where you want to write the details of the unprocessed event.
- Click Next.
- Review: Review your connection and authentication details.
- Click Create.
Entities, operations, and actions
All the Integration Connectors provide a layer of abstraction for the objects of the connected application. You can access an application's objects only through this abstraction. The abstraction is exposed to you as entities, operations, and actions.
- Entity: An entity can be thought of as an object, or a collection of properties, in the
connected application or service. The definition of an entity differs from a connector to a
connector. For example, in a database connector, tables are the entities, in a
file server connector, folders are the entities, and in a messaging system connector,
queues are the entities.
However, it is possible that a connector doesn't support or have any entities, in which case the
Entities
list will be empty. - Operation: An operation is the activity that you can perform on an entity. You can perform
any of the following operations on an entity:
Selecting an entity from the available list, generates a list of operations available for the entity. For a detailed description of the operations, see the Connectors task's entity operations. However, if a connector doesn't support any of the entity operations, such unsupported operations aren't listed in the
Operations
list. - Action: An action is a first class function that is made available to the integration
through the connector interface. An action lets you make changes to an entity or entities, and
vary from connector to connector. Normally, an action will have some input parameters, and an output
parameter. However, it is possible
that a connector doesn't support any action, in which case the
Actions
list will be empty.
System limitations
The Apache Kafka connector can process a maximum of 50 transactions per second, per node, and throttles any transactions beyond this limit. By default, Integration Connectors allocates 2 nodes (for better availability) for a connection.
For information on the limits applicable to Integration Connectors, see Limits.
Actions
PublishMessage action
This action publishes a message to a Apache Kafka topic. The following tables describes the input and output parameters of the PublishMessage
action.
Input parameters
Parameter name | Required | Data type | Description |
---|---|---|---|
Topic | Yes | String | Name of the topic to which you want to publish the message. |
Partition | No | String | The partition to which the message is assigned. The value must be valid for the given topic. If you don't set this value, it is automatically set by the native client. |
Key | No | String | The message key. |
Message | Yes | String | The message you want to publish. The message should be a stringified JSON and the maximum message size supported is 10MB. |
HasBytes | No | Boolean | Specifies if the message is in binary format. |
MessageBytes | No | String | The message in the form of a Base64 encoded string. |
Validate | No | Boolean | Specifies if the message to be published must be validated against the message schema
defined in the schema registry for the topic. If you have specified the schema registry when
creating a connection, then the topic's schema definition from the registry is used for the
purpose of validation. The default value for this field is false . |
Output parameters
Parameter name | Data type | Description |
---|---|---|
PartitionWritten | Integer | The partition that the message was written to. |
OffsetWritten | Long | The position in the partition that the message was written to. |
TimestampWritten | Long | The time (Unix timestamp) at which the message was committed to the partition. |
KeyWritten | String | The value of the message key that was written. The value is NULL if no message key was provided when writing the message. |
Success | Boolean | Specifies if the message was published. |
A sample response of the PublishMessage
action is as follows:
{Success: true, PartitionWritten: 1, OffsetWritten: 22301, KeyWritten: "dGVzdA==", TimestampWritten: 1690806748}
Configuration for Confluent Cloud
Configuration for the Confluent Cloud differs slightly from the previously documented steps for Apache Kafka. Consider the following points when creating a connection for Confluent Cloud:
- The Confluent Cloud cluster API key is used as the username, and the key's Secret Manager Secret is used as the password for connecting to the bootstrap servers. You need to create an API key in Confluent Cloud if you don't have it already.
- Select Use SSL in the
Connection Details
section. - If you are using schema registry, configure the following values:
- In the
Connection Details
section:- Registry Version: Enter the registry version number. If you want to use the latest
version, enter
latest
. - Registry User: Enter the schema registry API key. You need to create a schema registry API key if you don't have it already.
- Registry Password: Enter the Secret Manager Secret of the registry password.
- Secret Version: Select the secret version number.
- Registry Type: Select
Confluent
. - Type Detection Scheme: Select
MessageOnly
- Registry Version: Enter the registry version number. If you want to use the latest
version, enter
- In the
Destinations
section, enter the registry URL in the hostname field.
Use terraform to create connections
You can use the Terraform resource to create a new connection.To learn how to apply or remove a Terraform configuration, see Basic Terraform commands.
To view a sample terraform template for connection creation, see sample template.
When creating this connection by using Terraform, you must set the following variables in your Terraform configuration file:
Parameter name Data type Required Description type_detection_scheme ENUM True The scheme used for authentication with the Apache Kafka broker. Supported values are: MessageOnly registry_service ENUM False The Schema Registry service used for working with topic schemas. Supported values are: Confluent registry_type ENUM False Type of the schema specified for the a specific topic. Supported values are: AVRO, JSON registry_version STRING False Version of the schema read from RegistryUrl for the specified topic. Valid values for registry version are between [1,2^31-1] or the string "latest", which returns the last registered schema. registry_user STRING False Username to authorize with the server specified in RegistryUrl . registry_password SECRET False Secret Manager Secret containing the password/secret Key value to authorize with the server specified in RegistryUrl. usessl BOOLEAN False This field sets whether SSL is enabled. Use the Apache Kafka connection in an integration
After you create the connection, it becomes available in both Apigee Integration and Application Integration. You can use the connection in an integration through the Connectors task.
- To understand how to create and use the Connectors task in Apigee Integration, see Connectors task.
- To understand how to create and use the Connectors task in Application Integration, see Connectors task.
Get help from the Google Cloud community
You can post your questions and discuss this connector in the Google Cloud community at Cloud Forums.What's next
- Understand how to suspend and resume a connection.
- Understand how to monitor connector usage.
- Understand how to view connector logs.
- In the