Crea un tema que transfiera mensajes de Kinesis

Crea un tema que ingiera mensajes de Kinesis

Explora más

Para obtener documentación en la que se incluye esta muestra de código, consulta lo siguiente:

Muestra de código

C++

Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido de Pub/Sub sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis =
      request.mutable_ingestion_data_source_settings()->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

C#

Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la guía de inicio rápido de Pub/Sub sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


using Google.Cloud.PubSub.V1;
using System;

public class CreateTopicWithKinesisIngestionSample
{
    public Topic CreateTopicWithKinesisIngestion(string projectId, string topicId, string streamArn, string consumerArn, string awsRoleArn, string gcpServiceAccount)
    {
        // Define settings for Kinesis ingestion
        IngestionDataSourceSettings ingestionDataSourceSettings = new IngestionDataSourceSettings
        {
            AwsKinesis = new IngestionDataSourceSettings.Types.AwsKinesis
            {
                AwsRoleArn = awsRoleArn,
                ConsumerArn = consumerArn,
                GcpServiceAccount = gcpServiceAccount,
                StreamArn = streamArn
            }
        };

        PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
        Topic topic = new Topic()
        {
            Name = TopicName.FormatProjectTopic(projectId, topicId),
            IngestionDataSourceSettings = ingestionDataSourceSettings
        };
        Topic createdTopic = publisher.CreateTopic(topic);
        Console.WriteLine($"Topic {topic.Name} created.");

        return createdTopic;
    }
}

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido de Pub/Sub sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import (
	"context"
	"fmt"
	"io"

	pubsub "cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createTopicWithKinesisIngestion(w io.Writer, projectID, topic string) error {
	// projectID := "my-project-id"
	// topicID := "projects/my-project-id/topics/my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	topicpb := &pubsubpb.Topic{
		Name: topic,
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
					StreamArn:         streamARN,
					ConsumerArn:       consumerARN,
					AwsRoleArn:        awsRoleARN,
					GcpServiceAccount: gcpServiceAccount,
				},
			},
		},
	}
	topicpb, err = client.TopicAdminClient.CreateTopic(ctx, topicpb)
	if err != nil {
		return fmt.Errorf("failed to create topic with kinesis: %w", err)
	}
	fmt.Fprintf(w, "Kinesis topic created: %v\n", topicpb)
	return nil
}

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido de Pub/Sub sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithKinesisIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithKinesisIngestionExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void createTopicWithKinesisIngestionExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println("Created topic with Kinesis ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido de Pub/Sub sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn,
) {
  // Creates a new topic with Kinesis ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string,
) {
  // Creates a new topic with Kinesis ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

PHP

Antes de probar esta muestra, sigue las instrucciones de configuración de PHP en la guía de inicio rápido de Pub/Sub sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para PHP.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub topic with AWS Kinesis ingestion.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $streamArn  The Kinesis stream ARN to ingest data from.
 * @param string $consumerArn  The Kinesis consumer ARN to used for ingestion in Enhanced Fan-Out mode. The consumer must be already created and ready to be used.
 * @param string $awsRoleArn  AWS role ARN to be used for Federated Identity authentication with Kinesis. Check the Pub/Sub docs for how to set up this role and the required permissions that need to be attached to it.
 * @param string $gcpServiceAccount  The GCP service account to be used for Federated Identity authentication with Kinesis (via a AssumeRoleWithWebIdentity call for the provided role). The $awsRoleArn must be set up with accounts.google.com:sub equals to this service account number.
 */
function create_topic_with_kinesis_ingestion(
    string $projectId,
    string $topicName,
    string $streamArn,
    string $consumerArn,
    string $awsRoleArn,
    string $gcpServiceAccount
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->createTopic($topicName, [
        'ingestionDataSourceSettings' => [
            'aws_kinesis' => [
                'stream_arn' => $streamArn,
                'consumer_arn' => $consumerArn,
                'aws_role_arn' => $awsRoleArn,
                'gcp_service_account' => $gcpServiceAccount
            ]
        ]
    ]);

    printf('Topic created: %s' . PHP_EOL, $topic->name());
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido de Pub/Sub sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
            stream_arn=stream_arn,
            consumer_arn=consumer_arn,
            aws_role_arn=aws_role_arn,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings")

Ruby

Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido de Pub/Sub sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

# topic_id = "your-topic-id"
# stream_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/stream-name"
# consumer_arn =
# "arn:aws:kinesis:us-west-2:111111111111:stream/x/consumer/y:1111111111"
# aws_role_arn = "arn:aws:iam::111111111111:role/role-name"
# gcp_service_account = "service-account@project.iam.gserviceaccount.com"

pubsub = Google::Cloud::Pubsub.new
topic_admin = pubsub.topic_admin

topic = topic_admin.create_topic name: pubsub.topic_path(topic_id),
                                 ingestion_data_source_settings: {
                                   aws_kinesis: {
                                     stream_arn: stream_arn,
                                     consumer_arn: consumer_arn,
                                     aws_role_arn: aws_role_arn,
                                     gcp_service_account: gcp_service_account
                                   }
                                 }

puts "Topic with Kinesis Ingestion #{topic.name} created."

¿Qué sigue?

Para buscar y filtrar muestras de código para otros productos Google Cloud , consulta el Google Cloud navegador de muestras.