A Confluent Cloud import topic lets you continuously ingest data from Confluent Cloud as an external source and into Pub/Sub. Then you can stream the data into any of the destinations that Pub/Sub supports.
This document shows you how to create and manage Confluent Cloud impor topics. To create a standard topic, see Create a standard topic.
For more information about import topics, see About import topics.
Before you begin
Know more about the Pub/Sub publish process.
Configure the required roles and permissions to manage Confluent Cloud import topics including the following:
Set up workload identity federation so that Google Cloud can access the external streaming service.
Required roles and permissions
To get the permissions that
you need to create and manage Confluent Cloud import topics,
ask your administrator to grant you the
Pub/Sub Editor (roles/pubsub.editor
) IAM role on your topic or project.
For more information about granting roles, see Manage access to projects, folders, and organizations.
This predefined role contains the permissions required to create and manage Confluent Cloud import topics. To see the exact permissions that are required, expand the Required permissions section:
Required permissions
The following permissions are required to create and manage Confluent Cloud import topics:
-
Create an import topic:
pubsub.topics.create
-
Delete an import topic:
pubsub.topics.delete
-
Get an import topic:
pubsub.topics.get
-
List an import topic:
pubsub.topics.list
-
Publish to an import topic:
pubsub.topics.publish
-
Update an import topic:
pubsub.topics.update
-
Get the IAM policy for an import topic:
pubsub.topics.getIamPolicy
-
Configure the IAM policy for an import topic:
pubsub.topics.setIamPolicy
You might also be able to get these permissions with custom roles or other predefined roles.
You can configure access control at the project level and the individual resource level.
Set up federated identity to access Confluent Cloud
Workload Identity Federation lets Google Cloud services access workloads running outside of Google Cloud. With identity federation, you don't need to maintain or pass credentials to Google Cloud to access your resources in other clouds. Instead, you can use the identities of the workloads themselves to authenticate to Google Cloud and access resources.
Create a service account in Google Cloud
This is an optional step. If you already have a service account, you can use it in this procedure instead of creating a new service account. If you are using an existing service account, go to Record the service account unique ID for the next step.
For Confluent Cloud import topics, Pub/Sub uses the service account as the identity to access resources from Confluent Cloud.
For more information about creating a service account, including prerequisites, required roles and permissions, and naming guidelines, see Create service accounts. After you create a service account, you might need to wait for 60 seconds or more before you use the service account. This behavior occurs because read operations are eventually consistent; it can take time for the new service account to become visible.
Record the service account unique ID
You need a service account unique ID to set up the identity provider and pool in the Confluent Cloud console.
In the Google Cloud console, go to the Service account details page.
Click the service account that you just created or the one that you are planning to use.
From the Service account details page, record the Unique ID number.
You need the ID as part of the workflow to set up the identity provider and pool in the Confluent Cloud console.
Add the service account token creator role to the Pub/Sub service account
The Service account token creator role (roles/iam.serviceAccountTokenCreator
)
lets principals create short-lived credentials
for a service account. These tokens or credentials are used to impersonate
the service account.
For more information about service account impersonation, see Service account impersonation.
You can also add the Pub/Sub publisher role (roles/pubsub.publisher
)
during this procedure. For more information about the role and why you are adding it,
see Add the Pub/Sub publisher role to the Pub/Sub service account.
In the Google Cloud console, go to the IAM page.
Click the Include Google-provided role grants checkbox.
Look for the service account that has the format
service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com
.For this service account, click the Edit Principal button.
If required, click Add another role.
Search and click the Service account token creator role (
roles/iam.serviceAccountTokenCreator
).Click Save.
Create an identity provider in Confluent Cloud
To authenticate to Confluent Cloud, the Google Cloud Service Account needs an identity pool. You must first create an identity provider in Confluent Cloud.
For more information on creating an identity provider in Confluent Cloud, visit the Add an OAuth/OIDC Identity Provider page.
Sign in to the Confluent Cloud console.
In the menu, click Accounts & Access.
Click Workload identities.
Click Add provider.
Click OAuth/OIDC, then click Next.
Click Other OIDC Provider, then click Next.
Provide a name and a description of the identity provider's purpose.
Click Show advanced configuration.
In the Issuer URI field, enter
https://accounts.google.com
.In the JWKS URI field, enter
https://www.googleapis.com/oauth2/v3/certs
.Click Validate & Save.
Create an identity pool and grant the appropriate roles in Confluent Cloud
You must create an identity pool under your identity profile and grant the necessary roles to allow the Pub/Sub Service Account to authenticate and read from Confluent Cloud Kafka topics.
Ensure that your cluster is created in Confluent Cloud before proceeding with creating an identity pool.
For more information on how to create an identity pool, visit the Use Identity Pools with Your OAuth/OIDC Identity Provider page.
Sign in to the Confluent Cloud console.
In the menu, click Accounts & Access.
Click Workload identities.
Click the identity provider you created in Create an identity provider in Confluent Cloud.
Click Add pool.
Provide a name and description for your identity pool.
Set Identity claim to
claims
.Under Set filters, click the Advanced tab. Enter the following code:
claims.iss=='https://accounts.google.com' && claims.sub=='<SERVICE_ACCOUNT_UNIQUE_ID>'
Replace
<SERVICE_ACCOUNT_UNIQUE_ID>
with your service account's unique ID found in Record the service account unique ID.Click Next.
Click Add new permission. Then, click Next.
In the relevant cluster, click Add role assignment.
Click the Operator role and click Add.
This role grants Pub/Sub. Service Account access to the cluster containing the Confluent Kafka topic that you want to ingest to Pub/Sub.
Under the cluster, click Topics. Then, click Add role assignment.
Select the DeveloperRead role.
Click the appropriate option and specify the topic or prefix. For example, Specific topic, Prefix rule, or All topics.
Click Add.
Click Next.
Click Validate & Save.
Add the Pub/Sub publisher role to the Pub/Sub principal
To enable publishing, you must assign a publisher role to the Pub/Sub service account so that Pub/Sub is able to publish to the Confluent Cloud import topic.
Enable publishing from all topics
Use this method if you have not created any Confluent Cloud import topics.
In the Google Cloud console, go to the IAM page.
Click the Include Google-provided role grants checkbox.
Look for the service account that has the format
service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com
.For this service account, click the Edit Principal button.
If required, click Add another role.
Search and click the Pub/Sub publisher role (
roles/pubsub.publisher
).Click Save.
Enable publishing from a single topic
Use this method only if the Confluent Cloud import topic already exists.
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Run the
gcloud pubsub topics add-iam-policy-binding
command:gcloud pubsub topics add-iam-policy-binding TOPIC_ID \ --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \ --role="roles/pubsub.publisher"
Replace the following:
TOPIC_ID
: the topic ID of the Confluent Cloud import topic.PROJECT_NUMBER
: the project number. To view the project number, see Identifying projects.
Add the service account user role to the service account
The Service Account User role (roles/iam.serviceAccountUser
) includes the
permission iam.serviceAccounts.actAs
that lets a principal attach a service
account to the Confluent Cloud import topic's ingestion settings and
use that service account for federated identity.
In the Google Cloud console, go to the IAM page.
For the principal that's issuing the create or update topic calls, click the Edit Principal button.
If required, click Add another role.
Search and click the Service account user role (
roles/iam.serviceAccountUser
).Click Save.
Use Confluent Cloud import topics
You can create a new import topic or edit an existing topic.
Considerations
Creating the topic and subscription separately, even if done in rapid succession, can lead to data loss. There's a short window where the topic exists without a subscription. If any data is sent to the topic during this time, it is lost. By creating the topic first, creating the subscription, and then converting the topic to an import topic, you guarantee that no messages are missed during the import process.
If you need to re-create the Kafka topic of an existing import topic with the same name, you can't just delete the Kafka topic and re-create it. This action can invalidate Pub/Sub's offset management, which can lead to data loss. To mitigate this, follow these steps:
- Delete the Pub/Sub import topic.
- Delete the Kafka topic.
- Create the Kafka topic.
- Create the Pub/Sub import topic.
Data from a Confluent Cloud Kafka topic is always read from the earliest offset.
Create a Confluent Cloud import topic
To know more about properties associated with a topic, see Properties of a topic.
Ensure that you have completed the following procedures:
To create a Confluent Cloud import topic, follow these steps:
Console
In the Google Cloud console, go to the Topics page.
Click Create topic.
In the Topic ID field, enter an ID for your import topic.
For more information about naming topics, see the naming guidelines.
Select Add a default subscription.
Select Enable ingestion.
For ingestion source, select Confluent Cloud.
Enter the following details:
Bootstrap Server: The bootstrap server of your cluster containing the Kafka topic that you are ingesting into Pub/Sub. The format is as follows:
hostname:port
.Cluster ID: The ID of your cluster containing the Kafka topic that you are ingesting into Pub/Sub.
Topic: The name of the Kafka topic that you are ingesting into Pub/Sub.
Identity pool ID: The pool ID of the identity pool used to authenticate with Confluent Cloud.
Service account: The service account that you created in Create a service account in Google Cloud.
Click Create topic.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Run the
gcloud pubsub topics create
command:gcloud pubsub topics create TOPIC_ID \ --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \ --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \ --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \ --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \ --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
Replace the following:
TOPIC_ID
: the name or ID of your Pub/Sub topic.CONFLUENT_BOOTSTRAP_SERVER
: the bootstrap server of your cluster containing the Kafka topic that you are ingesting into Pub/Sub. the format is as follows:hostname:port
.CONFLUENT_CLUSTER_ID
: the ID of your cluster containing the Kafka topic that you are ingesting into Pub/Sub.CONFLUENT_TOPIC
: the name of the Kafka topic that you are ingesting into Pub/Sub.CONFLUENT_IDENTITY_POOL_ID
: the pool ID of the identity pool used to authenticate with Confluent Cloud.PUBSUB_SERVICE_ACCOUNT
: The service account that you created in Create a service account in Google Cloud.
Go
Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Go API reference documentation.
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
Java
Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Java API reference documentation.
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
Node.js
Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Node.js API reference documentation.
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
Python
Before trying this sample, follow the Python setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Python API reference documentation.
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
C++
Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C++ API reference documentation.
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
Node.js (TypeScript)
Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Node.js API reference documentation.
To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
If you run into issues, see Troubleshooting a Confluent Cloud import topic.
Edit a Confluent Cloud Hubs import topic
To edit the ingestion data source settings of a Confluent Cloud import topic, follow these steps:
Console
In the Google Cloud console, go to the Topics page.
Click the Confluent Cloud import topic.
In the topic details page, click Edit.
Update the fields that you want to change.
Click Update.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
To avoid losing your settings for the import topic, make sure to include all of them every time you update the topic. If you leave something out, Pub/Sub resets the setting to its original default value.
Run the
gcloud pubsub topics update
command with all the flags mentioned in the following sample:gcloud pubsub topics update TOPIC_ID \ --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \ --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \ --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \ --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \ --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
Replace the following:
TOPIC_ID
: the name or ID of your Pub/Sub topic.CONFLUENT_BOOTSTRAP_SERVER
: the bootstrap server of your cluster containing the Kafka topic that you are ingesting into Pub/Sub. The format is as follows:hostname:port
.CONFLUENT_CLUSTER_ID
: The ID of your cluster containing the Kafka topic that you are ingesting into Pub/SubCONFLUENT_TOPIC
: The name of the Kafka topic that you are ingesting into Pub/Sub.CONFLUENT_IDENTITY_POOL_ID
: The pool ID of the identity pool used to authenticate with Confluent Cloud.CONFLUENT_IDENTITY_POOL_ID
: The service account that you created in Create a service account in Google Cloud.
Quotas and limits
The publisher throughput for import topics is bound by the publish quota of the topic. For more information, see Pub/Sub quotas and limits.
What's next
Choose the type of subscription for your topic.
Learn how to publish a message to a topic.
Create or modify a topic with gcloud CLI, REST APIs, or Client libraries.