가져오기 주제 만들기

가져오기 주제를 사용하면 외부 소스의 데이터를 Pub/Sub로 수집할 수 있습니다. 그런 다음 Pub/Sub에서 지원하는 대상으로 데이터를 스트리밍할 수 있습니다.

Pub/Sub에서는 가져오기 주제로 데이터를 수집하기 위해 외부 소스로 Amazon Kinesis Data Streams를 지원합니다.

가져오기 주제 개요

가져오기 주제에는 주제에 대한 수집이 속성으로 사용 설정되어 있습니다. 이를 통해 가져오기 주제에서 스트리밍 데이터를 수집할 수 있습니다. 콘솔, Google Cloud CLI, REST 호출, 클라이언트 라이브러리를 사용하여 주제에 대해 수집을 사용 설정할 수 있습니다. Google Cloud는 가져오기 주제를 관리하는 일환으로 수집 파이프라인의 모니터링 및 확장을 제공합니다.

가져오기 주제가 없으면 데이터 소스에서 Pub/Sub로 데이터를 스트리밍하려면 추가 서비스가 필요합니다. 이 추가 서비스는 원본 소스에서 데이터를 가져와 Pub/Sub에 게시합니다. 추가 서비스는 Apache Spark와 같은 스트리밍 엔진 또는 커스텀 작성 서비스일 수 있습니다. 또한 이 서비스를 구성, 배포, 실행, 확장, 모니터링해야 합니다.

다음은 가져오기 주제와 관련된 중요한 정보 목록입니다.

  • 표준 주제와 마찬가지로 가져오기 주제에 수동으로 게시할 수 있습니다.

  • 가져오기 주제에는 하나의 수집 소스만 연결할 수 있습니다.

스트리밍 데이터에 대한 가져오기 주제를 권장합니다. 스트리밍 데이터 수집 대신 BigQuery로 일괄 데이터 수집을 고려하려는 경우 BigQuery Data Transfer Service(BQ DTS)를 사용할 수 있습니다. Cloud Storage로 데이터를 수집하려면 Storage Transfer Service(STS)를 사용하는 것이 좋습니다.

시작하기 전에

가져오기 주제 관리를 위한 필수 역할 및 권한

가져오기 주제를 만들고 관리하는 데 필요한 권한을 얻으려면 관리자에게 주제 또는 프로젝트에 대한 Pub/Sub 편집자(roles/pubsub.editor) IAM 역할을 부여해 달라고 요청하세요. 역할 부여에 대한 자세한 내용은 액세스 관리를 참조하세요.

이 사전 정의된 역할에는 가져오기 주제 생성 및 관리를 위해 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.

필수 권한

가져오기 주제를 만들고 관리하려면 다음 권한이 필요합니다.

  • 가져오기 주제 만들기: pubsub.topics.create
  • 가져오기 주제 삭제: pubsub.topics.delete
  • 가져오기 주제 가져오기: pubsub.topics.get
  • 가져오기 주제 나열: pubsub.topics.list
  • 가져오기 주제에 게시: pubsub.topics.publish
  • 가져오기 주제 업데이트: pubsub.topics.update
  • 가져오기 주제의 IAM 정책 가져오기: pubsub.topics.getIamPolicy
  • 가져오기 주제의 IAM 정책 구성: pubsub.topics.setIamPolicy

커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.

프로젝트 수준 및 개별 리소스 수준에서 액세스 제어를 구성할 수 있습니다.

제휴 ID를 설정하여 Kinesis Data Streams에 액세스

워크로드 아이덴티티 제휴를 통해 Google Cloud 서비스는 Google Cloud 외부에서 실행되는 워크로드에 액세스할 수 있습니다. ID 제휴를 사용하면 다른 클라우드의 리소스에 액세스하기 위해 사용자 인증 정보를 유지하거나 Google Cloud에 전달할 필요가 없습니다. 대신 워크로드 자체의 ID를 사용하여 Google Cloud에 인증하고 리소스에 액세스할 수 있습니다.

Google Cloud에서 서비스 계정 만들기

이 단계는 선택사항입니다. 이미 서비스 계정이 있는 경우 새 서비스 계정을 만드는 대신 이 절차에서 해당 계정을 사용할 수 있습니다. 기존 서비스 계정을 사용하는 경우 다음 단계를 위해 서비스 계정 고유 ID 기록으로 이동합니다.

가져오기 주제의 경우 Pub/Sub는 서비스 계정을 ID로 사용하여 AWS의 리소스에 액세스합니다.

기본 요건, 필수 역할 및 권한, 이름 지정 가이드라인을 포함하여 서비스 계정 만들기에 대한 자세한 내용은 서비스 계정 만들기를 참조하세요. 서비스 계정을 만든 후 60초 이상 기다려야 서비스 계정을 사용할 수 있습니다. 이는 읽기 작업이 eventual consistency를 가지기 때문에 발생합니다. 새 서비스 계정이 표시되는 데 다소 시간이 걸릴 수 있습니다.

서비스 계정 고유 ID 기록

AWS 콘솔에서 역할을 설정하려면 서비스 계정 고유 ID가 필요합니다.

  1. Google Cloud 콘솔에서 서비스 계정 세부정보 페이지로 이동합니다.

    서비스 계정으로 이동

  2. 방금 만든 서비스 계정 또는 사용하려는 서비스 계정을 클릭합니다.

  3. 서비스 계정 세부정보 페이지에서 고유 ID 번호를 기록합니다.

    커스텀 트러스트 정책을 사용하여 AWS에서 역할 만들기 섹션의 일부로 ID가 필요합니다.

Pub/Sub 서비스 계정에 서비스 계정 토큰 생성자 역할 추가

서비스 계정 토큰 생성자 역할(roles/iam.serviceAccountTokenCreator)을 받은 주 구성원은 서비스 계정의 단기 사용자 인증 정보 만들기를 할 수 있습니다. 이러한 토큰 또는 사용자 인증 정보는 서비스 계정을 가장하는 데 사용됩니다.

서비스 계정 가장에 대한 자세한 내용은 서비스 계정 가장을 참조하세요.

또한 이 절차 중에 Pub/Sub 게시자 역할(roles/pubsub.publisher)을 추가할 수 있습니다. 역할 및 역할을 추가하는 이유에 대한 자세한 내용은 Pub/Sub 서비스 계정에 Pub/Sub 게시자 역할 추가를 참조하세요.

  1. Google Cloud 콘솔에서 IAM 페이지로 이동합니다.

    IAM으로 이동

  2. Google 제공 역할 부여 포함 옵션을 사용 설정합니다.

  3. service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 형식의 서비스 계정을 찾습니다.

  4. 이 서비스 계정에서 주 구성원 수정 버튼을 클릭합니다.

  5. 필요한 경우 다른 역할 추가를 클릭합니다.

  6. 서비스 계정 토큰 생성자 역할(roles/iam.serviceAccountTokenCreator)을 검색하여 선택합니다.

  7. 저장을 클릭합니다.

AWS에서 정책 만들기

Pub/Sub가 AWS Kinesis 데이터 스트림에서 데이터를 수집할 수 있도록 Pub/Sub가 AWS로 인증하도록 하려면 AWS에 정책이 필요합니다. AWS 정책을 만들기 전에 Kinesis 데이터 스트림과 이에 등록된 소비자를 만듭니다. 권한을 특정 스트림으로 제한할 수 있도록 이 방법을 사용하는 것이 좋습니다.

  • AWS Kinesis 데이터 스트림을 만드는 방법에 대한 자세한 내용은 Kinesis 데이터 스트림을 참조하세요.

  • 소비자 등록에 사용되는 AWS Kinesis 데이터 스트림 API에 대한 자세한 내용은 RegisterStreamConsumer를 참조하세요.

  • AWS에서 정책을 만드는 방법에 대한 자세한 방법과 정보는 IAM 정책 만들기를 참조하세요.

AWS에서 정책을 만들려면 다음 단계를 수행하세요.

  1. AWS 관리 콘솔에 로그인하고 IAM 콘솔을 엽니다.

  2. IAM 콘솔의 탐색창에서 액세스 관리 > 정책을 클릭합니다.

  3. 정책 만들기를 클릭합니다.

  4. 서비스 선택에서 Kinesis를 선택합니다.

  5. 허용된 작업에서 다음을 선택합니다.

    • 목록 > ListShards.

      이 작업은 스트림의 샤드를 나열할 수 있는 권한을 부여하고 각 샤드에 대한 정보를 제공합니다.

    • 읽기 > SubscribeToShard.

      이 작업은 향상된 팬아웃으로 특정 샤드를 리슨할 수 있는 권한을 부여합니다.

    • > DescribeStreamConsumer.

      이 작업은 등록된 스트림 소비자에 대한 설명을 가져올 수 있는 권한을 부여합니다.

    이러한 권한에는 스트림에서 읽기가 포함됩니다. Pub/Sub는 스트리밍 SubscribeToShard API를 사용하여 향상된 팬아웃으로 Kinesis 스트림에서 읽기만 지원합니다.

  6. 리소스의 경우 정책을 특정 스트림 또는 소비자(권장)로 제한하려면 소비자 ARN스트림 ARN을 지정합니다.

  7. 더 많은 권한 추가를 클릭합니다.

  8. 서비스 선택에서 STS를 입력하고 선택합니다.

  9. 허용된 작업에서 쓰기 > AssumeRoleWithWebIdentity를 선택합니다.

    이 작업은 ID 제휴를 사용하여 Kinesis 데이터 스트림에 인증하기 위해 Pub/Sub에 대한 임시 보안용 사용자 인증 정보 집합을 얻을 수 있는 권한을 부여합니다.

  10. 다음을 클릭합니다.

  11. 정책 이름과 설명을 입력합니다.

  12. 정책 만들기를 클릭합니다.

커스텀 트러스트 정책을 사용하여 AWS에서 역할 만들기

Pub/Sub가 AWS에 인증하여 Kinesis Data Streams에서 데이터를 수집할 수 있도록 AWS에서 역할을 만들어야 합니다.

커스텀 트러스트 정책을 사용하여 역할을 만들려면 다음 단계를 수행하세요.

  1. AWS 관리 콘솔에 로그인하고 IAM 콘솔을 엽니다.

  2. IAM 콘솔의 탐색창에서 역할을 클릭합니다.

  3. 역할 만들기를 클릭합니다.

  4. 신뢰할 수 있는 항목 선택에서 커스텀 트러스트 정책을 선택합니다.

  5. 커스텀 트러스트 정책 섹션에서 다음을 입력하거나 붙여넣습니다.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
         "Effect": "Allow",
         "Principal": {
            "Federated": "accounts.google.com"
         },
         "Action": "sts:AssumeRoleWithWebIdentity",
         "Condition": {
             "StringEquals": {
               "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>"
             }
          }
        }
      ]
    }
    

    <SERVICE_ACCOUNT_UNIQUE_ID>서비스 계정 고유 ID 기록에서 기록한 서비스 계정의 고유 ID로 바꿉니다.

  6. 다음을 클릭합니다.

  7. 권한 추가에서 방금 만든 커스텀 정책을 검색하여 선택합니다.

  8. 다음을 클릭합니다.

  9. 역할 이름과 설명을 입력합니다.

  10. 역할 만들기를 클릭합니다.

Pub/Sub 서비스 계정에 Pub/Sub 게시자 역할 추가

Pub/Sub가 AWS Kinesis Data Streams에서 가져오기 주제에 게시할 수 있도록 Pub/Sub 서비스 계정에 게시자 역할을 할당해야 합니다.

모든 주제에서 게시 사용 설정

  1. Google Cloud 콘솔에서 IAM 페이지로 이동합니다.

    IAM으로 이동

  2. Google 제공 역할 부여 포함 옵션을 사용 설정합니다.

  3. service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 형식의 서비스 계정을 찾습니다.

  4. 이 서비스 계정에서 주 구성원 수정 버튼을 클릭합니다.

  5. 필요한 경우 다른 역할 추가를 클릭합니다.

  6. Pub/Sub 게시자 역할(roles/pubsub.publisher)을 검색하여 선택합니다.

  7. 저장을 클릭합니다.

단일 주제에서 게시 사용 설정

특정 가져오기 주제에 대해서만 게시 권한을 부여하려면 다음 단계를 따르세요.

  1. Google Cloud 콘솔에서 Cloud Shell을 활성화합니다.

    Cloud Shell 활성화

    Google Cloud 콘솔 하단에서 Cloud Shell 세션이 시작되고 명령줄 프롬프트가 표시됩니다. Cloud Shell은 Google Cloud CLI가 사전 설치된 셸 환경으로, 현재 프로젝트의 값이 이미 설정되어 있습니다. 세션이 초기화되는 데 몇 초 정도 걸릴 수 있습니다.

  2. gcloud pubsub topics add-iam-policy-binding 명령어를 실행합니다.

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"
    

    다음을 바꿉니다.

    • TOPIC_ID는 가져오기 주제의 주제 ID입니다.

    • PROJECT_NUMBER는 프로젝트 번호입니다. 프로젝트 번호를 보려면 프로젝트 식별을 참조하세요.

서비스 계정에 서비스 계정 사용자 역할 추가

서비스 계정 사용자 역할(roles/iam.serviceAccountUser)에는 주 구성원이 서비스 계정을 가져오기 주제의 수집 설정에 연결하고 제휴 ID에 해당 서비스를 사용할 수 있게 해주는 iam.serviceAccounts.actAs 권한이 포함되어 있습니다.

다음 단계를 수행합니다.

  1. Google Cloud 콘솔에서 IAM 페이지로 이동합니다.

    IAM으로 이동

  2. 주제 만들기 또는 업데이트 호출을 실행하는 주 구성원에 대해 주 구성원 수정 버튼을 클릭합니다.

  3. 필요한 경우 다른 역할 추가를 클릭합니다.

  4. 서비스 계정 사용자 역할(roles/iam.serviceAccountUser)을 검색하여 선택합니다.

  5. 저장을 클릭합니다.

가져오기 주제 만들기

주제와 연결된 속성에 대해 자세히 알아보려면 주제 속성을 참조하세요.

다음 절차를 완료했는지 확인하세요.

가져오기 주제를 만들려면 다음 단계를 따르세요.

Console

  1. Google Cloud console에서 IAM 페이지로 이동합니다.

    주제로 이동

  2. 주제 만들기를 클릭합니다.

  3. 주제 ID 필드에 가져오기 주제의 ID를 입력합니다.

    주제 이름 지정에 대한 자세한 내용은 이름 지정 가이드라인을 참조하세요.

  4. 기본 구독 추가를 선택합니다.

  5. 수집 사용 설정을 선택합니다.

  6. 수집 소스의 경우 Amazon Kinesis Data Streams를 선택합니다.

  7. 다음 세부정보를 입력합니다.

    • Kinesis 스트림 ARN: Pub/Sub로 수집하려는 Kinesis Data Stream의 ARN입니다. ARN 형식은 arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}입니다.

    • Kinesis 소비자 ARN: AWS Kinesis Data Stream에 등록된 소비자 리소스의 ARN입니다. ARN 형식은 arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}입니다.

    • AWS 역할 ARN: AWS 역할의 ARN입니다. 역할의 ARN 형식은 arn:aws:iam:${Account}:role/${RoleName}입니다.

    • 서비스 계정: Google Cloud에서 서비스 계정 만들기에서 만든 서비스 계정입니다.

  8. 주제 만들기를 클릭합니다.

gcloud

  1. Google Cloud 콘솔에서 Cloud Shell을 활성화합니다.

    Cloud Shell 활성화

    Google Cloud 콘솔 하단에서 Cloud Shell 세션이 시작되고 명령줄 프롬프트가 표시됩니다. Cloud Shell은 Google Cloud CLI가 사전 설치된 셸 환경으로, 현재 프로젝트의 값이 이미 설정되어 있습니다. 세션이 초기화되는 데 몇 초 정도 걸릴 수 있습니다.

  2. gcloud pubsub topics create 명령어를 실행합니다.

    gcloud pubsub topics create TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    다음을 바꿉니다.

    • TOPIC_ID는 주제 ID입니다.

    • KINESIS_STREAM_ARN은 Pub/Sub로 수집하려는 Kinesis Data Streams의 ARN입니다. ARN 형식은 arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}입니다.

    • KINESIS_CONSUMER_ARN은 AWS Kinesis Data Streams에 등록된 소비자 리소스의 ARN입니다. ARN 형식은 arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}입니다.

    • KINESIS_ROLE_ARN은 AWS 역할의 ARN입니다. 역할의 ARN 형식은 arn:aws:iam:${Account}:role/${RoleName}입니다.

    • PUBSUB_SERVICE_ACCOUNTGoogle Cloud에서 서비스 계정 만들기에서 만든 서비스 계정입니다.

Go

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithKinesisIngestion(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Kinesis topic created: %v\n", t)
	return nil
}

Java

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithKinesisIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithKinesisIngestionExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void createTopicWithKinesisIngestionExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println("Created topic with Kinesis ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

Python

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
            stream_arn=stream_arn,
            consumer_arn=consumer_arn,
            aws_role_arn=aws_role_arn,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis =
      request.mutable_ingestion_data_source_settings()->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참조 문서를 참조하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

ARN에 관한 자세한 내용은 Amazon 리소스 이름(ARN)IAM 식별자를 참조하세요.

문제가 발생하면 가져오기 주제 문제 해결을 참조하세요.

가져오기 주제 수정

가져오기 주제의 수집 데이터 소스 설정을 수정할 수 있습니다. 다음 단계를 수행합니다.

Console

  1. Google Cloud console에서 IAM 페이지로 이동합니다.

    주제로 이동

  2. 가져오기 주제를 클릭합니다.

  3. 주제 세부정보 페이지에서 수정을 클릭합니다.

  4. 변경하려는 필드를 업데이트합니다.

  5. 업데이트를 클릭합니다.

gcloud

  1. Google Cloud 콘솔에서 Cloud Shell을 활성화합니다.

    Cloud Shell 활성화

    Google Cloud 콘솔 하단에서 Cloud Shell 세션이 시작되고 명령줄 프롬프트가 표시됩니다. Cloud Shell은 Google Cloud CLI가 사전 설치된 셸 환경으로, 현재 프로젝트의 값이 이미 설정되어 있습니다. 세션이 초기화되는 데 몇 초 정도 걸릴 수 있습니다.

  2. 다음 샘플에 언급된 모든 플래그를 사용하여 gcloud pubsub topics update 명령어를 실행합니다.

      gcloud pubsub topics update TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    다음을 바꿉니다.

    • TOPIC_ID는 주제 ID입니다. 이 필드는 업데이트할 수 없습니다.

    • KINESIS_STREAM_ARN은 Pub/Sub로 수집하려는 Kinesis Data Streams의 ARN입니다. ARN 형식은 arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}입니다.

    • KINESIS_CONSUMER_ARN은 AWS Kinesis Data Streams에 등록된 소비자 리소스의 ARN입니다. ARN 형식은 arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}입니다.

    • KINESIS_ROLE_ARN은 AWS 역할의 ARN입니다. 역할의 ARN 형식은 arn:aws:iam:${Account}:role/${RoleName}입니다.

    • PUBSUB_SERVICE_ACCOUNTGoogle Cloud에서 서비스 계정 만들기에서 만든 서비스 계정입니다.

주제 가져오기의 할당량 및 한도

가져오기 주제의 게시자 처리량은 주제의 게시 할당량에 따라 제한됩니다. 자세한 내용은 Pub/Sub 할당량 및 한도를 참조하세요.

다음 단계