将主题类型更改为提取和非提取
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
C++
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
std::string topic_id, std::string stream_arn, std::string consumer_arn,
std::string aws_role_arn, std::string gcp_service_account) {
google::pubsub::v1::UpdateTopicRequest request;
request.mutable_topic()->set_name(
pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
auto* aws_kinesis = request.mutable_topic()
->mutable_ingestion_data_source_settings()
->mutable_aws_kinesis();
aws_kinesis->set_stream_arn(stream_arn);
aws_kinesis->set_consumer_arn(consumer_arn);
aws_kinesis->set_aws_role_arn(aws_role_arn);
aws_kinesis->set_gcp_service_account(gcp_service_account);
*request.mutable_update_mask()->add_paths() =
"ingestion_data_source_settings";
auto topic = client.UpdateTopic(request);
if (!topic) throw std::move(topic).status();
std::cout << "The topic was successfully updated: " << topic->DebugString()
<< "\n";
}
Go
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
)
func updateTopicType(w io.Writer, projectID, topicID string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
streamARN := "stream-arn"
consumerARN := "consumer-arn"
awsRoleARN := "aws-role-arn"
gcpServiceAccount := "gcp-service-account"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer client.Close()
updateCfg := pubsub.TopicConfigToUpdate{
// If wanting to clear ingestion settings, set this to zero value: &pubsub.IngestionDataSourceSettings{}
IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
Source: &pubsub.IngestionDataSourceAWSKinesis{
StreamARN: streamARN,
ConsumerARN: consumerARN,
AWSRoleARN: awsRoleARN,
GCPServiceAccount: gcpServiceAccount,
},
},
}
topicCfg, err := client.Topic(topicID).Update(ctx, updateCfg)
if err != nil {
return fmt.Errorf("topic.Update: %w", err)
}
fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
return nil
}
Java
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Java API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;
public class UpdateTopicTypeExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
// Kinesis ingestion settings.
String streamArn = "stream-arn";
String consumerArn = "consumer-arn";
String awsRoleArn = "aws-role-arn";
String gcpServiceAccount = "gcp-service-account";
UpdateTopicTypeExample.updateTopicTypeExample(
projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
}
public static void updateTopicTypeExample(
String projectId,
String topicId,
String streamArn,
String consumerArn,
String awsRoleArn,
String gcpServiceAccount)
throws IOException {
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);
IngestionDataSourceSettings.AwsKinesis awsKinesis =
IngestionDataSourceSettings.AwsKinesis.newBuilder()
.setStreamArn(streamArn)
.setConsumerArn(consumerArn)
.setAwsRoleArn(awsRoleArn)
.setGcpServiceAccount(gcpServiceAccount)
.build();
IngestionDataSourceSettings ingestionDataSourceSettings =
IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();
// Construct the topic with Kinesis ingestion settings.
Topic topic =
Topic.newBuilder()
.setName(topicName.toString())
.setIngestionDataSourceSettings(ingestionDataSourceSettings)
.build();
// Construct a field mask to indicate which field to update in the topic.
FieldMask updateMask =
FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();
UpdateTopicRequest request =
UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();
Topic response = topicAdminClient.updateTopic(request);
System.out.println(
"Updated topic with Kinesis ingestion settings: " + response.getAllFields());
}
}
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function updateTopicIngestionType(
topicNameOrId,
awsRoleArn,
gcpServiceAccount,
streamArn,
consumerArn
) {
const metadata = {
ingestionDataSourceSettings: {
awsKinesis: {
awsRoleArn,
gcpServiceAccount,
streamArn,
consumerArn,
},
},
};
await pubSubClient.topic(topicNameOrId).setMetadata(metadata);
console.log('Topic updated with Kinesis source successfully.');
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';
// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function updateTopicIngestionType(
topicNameOrId: string,
awsRoleArn: string,
gcpServiceAccount: string,
streamArn: string,
consumerArn: string
) {
const metadata: TopicMetadata = {
ingestionDataSourceSettings: {
awsKinesis: {
awsRoleArn,
gcpServiceAccount,
streamArn,
consumerArn,
},
},
};
await pubSubClient.topic(topicNameOrId).setMetadata(metadata);
console.log('Topic updated with Kinesis source successfully.');
}
Python
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Python API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
update_request = UpdateTopicRequest(
topic=Topic(
name=topic_path,
ingestion_data_source_settings=IngestionDataSourceSettings(
aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
stream_arn=stream_arn,
consumer_arn=consumer_arn,
aws_role_arn=aws_role_arn,
gcp_service_account=gcp_service_account,
)
),
),
update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)
topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。