Cloud Storage 가져오기 주제 만들기

Cloud Storage 가져오기 주제를 사용하면 Cloud Storage의 데이터를 Pub/Sub로 지속적으로 수집할 수 있습니다. 그런 다음 Pub/Sub에서 지원하는 대상으로 데이터를 스트리밍할 수 있습니다. Pub/Sub은 Cloud Storage 버킷에 추가된 새 객체를 자동으로 감지하고 처리합니다.

Cloud Storage는 Google Cloud에 객체를 저장하는 서비스입니다. 객체는 모든 형식의 파일로 구성된 변경할 수 없는 데이터 조각입니다. 객체는 버킷이라는 컨테이너에 저장합니다. 버킷에는 관리형 폴더도 포함될 수 있으며, 이 폴더를 사용하여 공유 이름 프리픽스가 있는 객체 그룹에 확장된 액세스 권한을 제공할 수 있습니다.

Cloud Storage에 대한 자세한 내용은 Cloud Storage 문서를 참고하세요.

가져오기 주제에 대한 자세한 내용은 가져오기 주제 정보를 참고하세요.

시작하기 전에

Cloud Storage 가져오기 주제를 관리하기 위한 필수 역할 및 권한

Cloud Storage 가져오기 주제를 만들고 관리하는 데 필요한 권한을 얻으려면 관리자에게 주제 또는 프로젝트에 대한 Pub/Sub 편집자 (roles/pubsub.editor) IAM 역할을 부여해 달라고 요청하세요. 역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

이 사전 정의된 역할에는 Cloud Storage 가져오기 주제를 만들고 관리하는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.

필수 권한

Cloud Storage 가져오기 주제를 만들고 관리하려면 다음 권한이 필요합니다.

  • 가져오기 주제 만들기: pubsub.topics.create
  • 가져오기 주제 삭제: pubsub.topics.delete
  • 가져오기 주제 가져오기: pubsub.topics.get
  • 가져오기 주제 나열: pubsub.topics.list
  • 가져오기 주제에 게시: pubsub.topics.publish
  • 가져오기 주제 업데이트: pubsub.topics.update
  • 가져오기 주제의 IAM 정책 가져오기: pubsub.topics.getIamPolicy
  • 가져오기 주제의 IAM 정책 구성: pubsub.topics.setIamPolicy

커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.

프로젝트 수준 및 개별 리소스 수준에서 액세스 제어를 구성할 수 있습니다.

메시지 저장용량 정책이 버킷 위치를 준수함

Pub/Sub 주제의 메시지 스토리지 정책이 Cloud Storage 버킷이 있는 리전과 겹쳐야 합니다. 이 정책은 Pub/Sub에서 메시지 데이터를 저장할 수 있는 위치를 지정합니다.

  • 위치 유형이 지역인 버킷의 경우: 정책에 해당 지역이 포함되어야 합니다. 예를 들어 버킷이 us-central1 리전에 있는 경우 메시지 저장소 정책에도 us-central1가 포함되어야 합니다.

  • 위치 유형이 이중 리전 또는 멀티 리전인 버킷의 경우: 정책에 이중 리전 또는 멀티 리전 위치 내에 하나 이상의 리전이 포함되어야 합니다. 예를 들어 버킷이 US multi-region에 있는 경우 메시지 저장소 정책에 us-central1, us-east1 또는 US multi-region 내의 다른 리전이 포함될 수 있습니다.

    정책에 버킷의 리전이 포함되지 않으면 주제 만들기에 실패합니다. 예를 들어 버킷이 europe-west1에 있고 메시지 저장소 정책에 asia-east1만 포함된 경우 오류가 발생합니다.

    메시지 스토리지 정책에 버킷 위치와 겹치는 리전이 하나만 포함되어 있으면 멀티 리전 중복이 손상될 수 있습니다. 단일 리전을 사용할 수 없게 되면 데이터에 액세스할 수 없을 수 있기 때문입니다. 완전한 중복을 보장하려면 메시지 스토리지 정책 내에 버킷의 멀티 리전 또는 이중 리전 위치에 포함된 리전을 2개 이상 포함하는 것이 좋습니다.

버킷 위치에 관한 자세한 내용은 문서를 참고하세요.

Pub/Sub 서비스 계정에 Pub/Sub 게시자 역할 추가

Pub/Sub가 Cloud Storage 가져오기 주제에 게시할 수 있도록 Pub/Sub 서비스 계정에 Pub/Sub 게시자 역할을 할당해야 합니다.

  • 프로젝트의 모든 주제에 게시를 사용 설정하려면 모든 주제에 게시 사용 설정을 참고하세요. Cloud Storage 가져오기 주제를 만들지 않은 경우 이 메서드를 사용하세요.

  • 특정 주제에 게시를 사용 설정하려면 단일 주제에 게시 사용 설정을 참고하세요. Cloud Storage 가져오기 주제가 이미 있는 경우에만 이 메서드를 사용하세요.

모든 Cloud Storage 가져오기 주제에 게시 사용 설정

프로젝트에 사용할 수 있는 Cloud Storage 가져오기 주제가 없는 경우 이 옵션을 선택합니다.

  1. Google Cloud 콘솔에서 IAM 페이지로 이동합니다.

    IAM으로 이동

  2. Google 제공 역할 부여 포함 옵션을 사용 설정합니다.

  3. 다음과 같은 형식의 Pub/Sub 서비스 계정을 찾습니다.

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com

  4. 이 서비스 계정에서 주 구성원 수정 버튼을 클릭합니다.

  5. 필요한 경우 다른 역할 추가를 클릭합니다.

  6. Pub/Sub 게시자 역할(roles/pubsub.publisher)을 검색하여 선택합니다.

  7. 저장을 클릭합니다.

단일 Cloud Storage 가져오기 주제에 게시 사용 설정

이미 존재하는 특정 Cloud Storage 가져오기 주제에 게시할 수 있는 권한을 Pub/Sub에 부여하려면 다음 단계를 따르세요.

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud pubsub topics add-iam-policy-binding 명령어를 실행합니다.

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    다음을 바꿉니다.

    • TOPIC_ID은 Cloud Storage 가져오기 주제의 ID 또는 이름입니다.

    • PROJECT_NUMBER는 프로젝트 번호입니다. 프로젝트 번호를 보려면 프로젝트 식별을 참조하세요.

Pub/Sub 서비스 계정에 Cloud Storage 역할 할당

Cloud Storage 가져오기 주제를 만들려면 Pub/Sub 서비스 계정에 특정 Cloud Storage 버킷에서 읽을 수 있는 권한이 있어야 합니다. 다음 권한이 필요합니다.

  • storage.objects.list
  • storage.objects.get
  • storage.buckets.get

Pub/Sub 서비스 계정에 이러한 권한을 할당하려면 다음 절차 중 하나를 선택합니다.

  • 버킷 수준에서 권한을 부여합니다. 특정 Cloud Storage 버킷에서 스토리지 기존 객체 리더(roles/storage.legacyObjectReader) 역할과 스토리지 기존 버킷 리더 (roles/storage.legacyBucketReader) 역할을 Pub/Sub 서비스 계정에 부여합니다.

  • 프로젝트 수준에서 역할을 부여해야 하는 경우 대신 Cloud Storage 버킷이 포함된 프로젝트에 대한 스토리지 관리자(roles/storage.admin) 역할을 부여할 수 있습니다. Pub/Sub 서비스 계정에 이 역할을 부여합니다.

버킷 권한

다음 단계에 따라 버킷 수준에서 Pub/Sub 서비스 계정에 스토리지 기존 객체 리더(roles/storage.legacyObjectReader) 역할과 스토리지 기존 버킷 리더 (roles/storage.legacyBucketReader) 역할을 부여합니다.

  1. Google Cloud 콘솔에서 Cloud Storage 페이지로 이동합니다.

    Cloud Storage로 이동

  2. 메시지를 읽고 Cloud Storage 가져오기 주제에 가져올 Cloud Storage 버킷을 클릭합니다.

    버킷 세부정보 페이지가 열립니다.

  3. 버킷 세부정보 페이지에서 권한 탭을 클릭합니다.

  4. 권한 > 주 구성원별로 보기 탭에서 액세스 권한 부여를 클릭합니다.

    액세스 권한 부여 페이지가 열립니다.

  5. 주 구성원 추가 섹션에 Pub/Sub 서비스 계정의 이름을 입력합니다.

    서비스 계정 형식은 service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com입니다. 예를 들어 PROJECT_NUMBER=112233445566이 있는 프로젝트의 경우 서비스 계정은 service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com 형식입니다.

  6. 역할 할당 > 역할 선택 드롭다운에서 Object Reader을 입력하고 스토리지 기존 객체 리더 역할을 선택합니다.

  7. 다른 역할 추가를 클릭합니다.

  8. 역할 선택 드롭다운에서 Bucket Reader을 입력하고 스토리지 기존 버킷 리더 역할을 선택합니다.

  9. 저장을 클릭합니다.

프로젝트 권한

프로젝트 수준에서 스토리지 관리자(roles/storage.admin) 역할을 부여하려면 다음 단계를 따르세요.

  1. Google Cloud 콘솔에서 IAM 페이지로 이동합니다.

    IAM으로 이동

  2. 권한 > 주 구성원별로 보기 탭에서 액세스 권한 부여를 클릭합니다.

    액세스 권한 부여 페이지가 열립니다.

  3. 주 구성원 추가 섹션에 Pub/Sub 서비스 계정의 이름을 입력합니다.

    서비스 계정 형식은 service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com입니다. 예를 들어 PROJECT_NUMBER=112233445566이 있는 프로젝트의 경우 서비스 계정은 service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com 형식입니다.

  4. 역할 할당 > 역할 선택 드롭다운에서 Storage Admin을 입력하고 스토리지 관리자 역할을 선택합니다.

  5. 저장을 클릭합니다.

Cloud Storage IAM에 관한 자세한 내용은 Cloud Storage ID 및 액세스 관리를 참고하세요.

Cloud Storage 가져오기 주제의 속성

모든 주제의 공통 속성에 관한 자세한 내용은 주제 속성을 참고하세요.

버킷 이름

Pub/Sub이 Cloud Storage 가져오기 주제에 게시된 데이터를 읽는 Cloud Storage 버킷의 이름입니다.

입력 형식

Cloud Storage 가져오기 주제를 만들 때 처리할 객체의 형식을 텍스트, Avro 또는 Pub/Sub Avro로 지정할 수 있습니다.

  • 텍스트 객체는 일반 텍스트로 데이터를 보유한다고 가정됩니다. 이 입력 형식은 객체가 최소 객체 생성 시간을 충족하고 glob 패턴 기준과 일치하는 한 버킷의 모든 객체를 처리하려고 시도합니다.

    구분자. 객체를 메시지로 분할하는 구분 기호를 지정할 수도 있습니다. 설정하지 않으면 기본값은 줄바꿈 문자 (\n)입니다. 구분자는 단일 문자여야 합니다.

  • Avro. 객체는 Apache Avro 바이너리 형식입니다. 유효한 Apache Avro 형식이 아닌 객체는 처리되지 않습니다. Avro와 관련된 제한사항은 다음과 같습니다.

    • Avro 버전 1.1.0 및 1.2.0은 지원되지 않습니다.
    • Avro 블록의 최대 크기는 16MB입니다.
  • Pub/Sub Avro. 객체는 Apache Avro 바이너리 형식이며 Avro 파일 형식으로 Pub/Sub Cloud Storage 구독을 사용하여 Cloud Storage에 쓴 객체의 스키마와 일치하는 스키마가 있습니다. 다음은 Pub/Sub Avro에 관한 몇 가지 중요한 가이드라인입니다.

    • Avro 레코드의 데이터 필드는 생성된 Pub/Sub 메시지의 데이터 필드를 채우는 데 사용됩니다.

    • Cloud Storage 구독에 write_metadata 옵션이 지정된 경우 속성 필드의 모든 값이 생성된 Pub/Sub 메시지의 속성으로 채워집니다.

    • Cloud Storage에 작성된 원본 메시지에 순서 키가 지정된 경우 이 필드는 생성된 Pub/Sub 메시지에 original_message_ordering_key라는 이름의 속성으로 채워집니다.

최소 객체 생성 시간

Cloud Storage 가져오기 주제를 만들 때 원하는 경우 최소 객체 생성 시간을 지정할 수 있습니다. 이 타임스탬프 이후에 생성된 객체만 처리됩니다. 이 타임스탬프는 YYYY-MM-DDThh:mm:ssZ와 같은 형식으로 제공해야 합니다. 0001-01-01T00:00:00Z부터 9999-12-31T23:59:59Z까지의 모든 날짜(과거 또는 미래)가 유효합니다.

glob 패턴 일치

Cloud Storage 가져오기 주제를 만들 때 선택적으로 일치 글러브 패턴을 지정할 수 있습니다. 이 패턴과 일치하는 이름의 객체만 처리됩니다. 예를 들어 접미사가 .txt인 모든 객체를 처리하려면 glob 패턴을 **.txt로 지정하면 됩니다.

glob 패턴에 지원되는 문법에 관한 자세한 내용은 Cloud Storage 문서를 참고하세요.

Cloud Storage 가져오기 주제 만들기

다음 절차를 완료했는지 확인하세요.

주제와 구독을 개별적으로 만들면 빠르게 연속으로 만들더라도 데이터가 손실될 수 있습니다. 구독 없이도 잠시 동안은 주제가 표시됩니다. 이 기간에 주제에 데이터가 전송되면 데이터가 손실됩니다. 먼저 주제를 만들고 구독을 만든 다음 주제를 가져오기 주제로 변환하면 가져오기 프로세스 중에 메시지가 누락되지 않습니다.

Cloud Storage 가져오기 주제를 만들려면 다음 단계를 따르세요.

콘솔

  1. Google Cloud 콘솔에서 IAM 페이지로 이동합니다.

    주제로 이동

  2. 주제 만들기를 클릭합니다.

    주제 세부정보 페이지가 열립니다.

  3. 주제 ID 필드에 Cloud Storage 가져오기 주제의 ID를 입력합니다.

    주제 이름 지정에 대한 자세한 내용은 이름 지정 가이드라인을 참조하세요.

  4. 기본 구독 추가를 선택합니다.

  5. 수집 사용 설정을 선택합니다.

  6. 처리 소스로 Google Cloud Storage를 선택합니다.

  7. Cloud Storage 버킷에서 찾아보기를 클릭합니다.

    버킷 선택 페이지가 열립니다. 다음 옵션 중 하나를 선택합니다.

    • 적절한 프로젝트에서 기존 버킷을 선택합니다.

    • 만들기 아이콘을 클릭하고 화면에 표시된 안내에 따라 새 버킷을 만듭니다. 버킷을 만든 후 Cloud Storage 가져오기 주제의 버킷을 선택합니다.

  8. 버킷을 지정하면 Pub/Sub에서 Pub/Sub 서비스 계정의 버킷에 적절한 권한이 있는지 확인합니다. 권한 문제가 있으면 다음과 유사한 메시지가 표시됩니다.

    Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

    권한 문제가 발생하면 권한 설정을 클릭합니다. 자세한 내용은 Pub/Sub 서비스 계정에 Cloud Storage 권한 부여를 참고하세요.

  9. 객체 형식에서 텍스트, Avro 또는 Pub/Sub 아브로를 선택합니다.

    텍스트를 선택하는 경우 원하는 경우 객체를 메시지로 분할하는 구분자를 지정할 수 있습니다.

    이러한 옵션에 대한 자세한 내용은 입력 형식을 참고하세요.

  10. 선택사항입니다. 주제의 최소 객체 생성 시간을 지정할 수 있습니다. 이 속성을 설정하면 최소 객체 생성 시간 이후에 생성된 객체만 처리됩니다.

    자세한 내용은 최소 객체 생성 시간을 참고하세요.

  11. glob 패턴을 지정해야 합니다. 버킷의 모든 객체를 수집하려면 **를 glob 패턴으로 사용하세요. 설정하면 지정된 패턴과 일치하는 객체만 처리됩니다.

    자세한 내용은 글로브 패턴 일치를 참고하세요.

  12. 다른 기본 설정은 유지합니다.
  13. 주제 만들기를 클릭합니다.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud pubsub topics create 명령어를 실행합니다.

    gcloud pubsub topics create TOPIC_ID\
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    명령어에서 TOPIC_ID, --cloud-storage-ingestion-bucket 플래그, --cloud-storage-ingestion-input-format 플래그만 필요합니다. 나머지 플래그는 선택사항이며 생략할 수 있습니다.

    다음을 바꿉니다.

    • TOPIC_ID: 주제의 이름 또는 ID

    • BUCKET_NAME: 기존 버킷의 이름을 지정합니다. 예를 들면 prod_bucket입니다. 버킷 이름에 프로젝트 ID를 포함해서는 안 됩니다. 버킷을 만들려면 버킷 만들기를 참조하세요.

    • INPUT_FORMAT: 처리되는 객체의 형식을 지정합니다. 이 값은 text, avro 또는 pubsub_avro입니다. 이러한 옵션에 대한 자세한 내용은 입력 형식을 참고하세요.

    • TEXT_DELIMITER: 텍스트 객체를 Pub/Sub 메시지로 분할하는 구분자를 지정합니다. 단일 문자여야 하며 INPUT_FORMATtext인 경우에만 설정해야 합니다. 기본값은 줄바꿈 문자 (\n)입니다.

      gcloud CLI를 사용하여 구분자를 지정할 때는 새 줄 \n과 같은 특수 문자의 처리에 각별히 주의하세요. 구분자가 올바르게 해석되도록 '\n' 형식을 사용하세요. 따옴표나 이스케이프 없이 \n를 사용하면 구분자가 "n"이 됩니다.

    • MINIMUM_OBJECT_CREATE_TIME: 객체를 처리하기 위해 객체가 생성된 최소 시간을 지정합니다. UTC 기준 YYYY-MM-DDThh:mm:ssZ 형식이어야 합니다. 예를 들면 2024-10-14T08:30:30Z입니다.

      0001-01-01T00:00:00Z9999-12-31T23:59:59Z 사이의 모든 날짜(과거 또는 미래)가 유효합니다.

    • MATCH_GLOB: 객체를 처리하기 위해 일치시킬 glob 패턴을 지정합니다. gcloud CLI를 사용하는 경우 * 문자가 포함된 일치 글러브에는 * 문자가 \*\*.txt 형식으로 이스케이프 처리된 형식이거나 전체 일치 글러브가 따옴표 "**.txt" 또는 '**.txt'로 묶여 있어야 합니다. glob 패턴에 지원되는 문법에 관한 자세한 내용은 Cloud Storage 문서를 참고하세요.

Go

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// bucket := "my-bucket"
	// matchGlob := "**.txt"
	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
	if err != nil {
		return err
	}

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceCloudStorage{
				Bucket: bucket,
				// Alternatively, can be Avro or PubSubAvro formats. See
				InputFormat: &pubsub.IngestionDataSourceCloudStorageTextFormat{
					Delimiter: ",",
				},
				MatchGlob:               matchGlob,
				MinimumObjectCreateTime: minCreateTime,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
	return nil
}

Java

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.text.ParseException;

public class CreateTopicWithCloudStorageIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Cloud Storage ingestion settings.
    // bucket and inputFormat are required arguments.
    String bucket = "your-bucket";
    String inputFormat = "text";
    String textDelimiter = "\n";
    String matchGlob = "**.txt";
    String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";

    createTopicWithCloudStorageIngestionExample(
        projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
  }

  public static void createTopicWithCloudStorageIngestionExample(
      String projectId,
      String topicId,
      String bucket,
      String inputFormat,
      String textDelimiter,
      String matchGlob,
      String minimumObjectCreateTime)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
          IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
      switch (inputFormat) {
        case "text":
          cloudStorageBuilder.setTextFormat(
              IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                  .setDelimiter(textDelimiter)
                  .build());
          break;
        case "avro":
          cloudStorageBuilder.setAvroFormat(
              IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
          break;
        case "pubsub_avro":
          cloudStorageBuilder.setPubsubAvroFormat(
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
          break;
        default:
          throw new IllegalArgumentException(
              "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
      }

      if (matchGlob != null && !matchGlob.isEmpty()) {
        cloudStorageBuilder.setMatchGlob(matchGlob);
      }

      if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
        try {
          cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
        } catch (ParseException e) {
          System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
        }
      }

      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder()
              .setCloudStorage(cloudStorageBuilder.build())
              .build();

      TopicName topicName = TopicName.of(projectId, topicId);

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId,
  bucket,
  inputFormat,
  textDelimiter,
  matchGlob,
  minimumObjectCreateTime
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Python

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

from google.cloud import pubsub_v1
from google.protobuf import timestamp_pb2
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bucket = "your-bucket"
# input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
# text_delimiter = "\n"
# match_glob = "**.txt"
# minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
    bucket=bucket,
)
if input_format == "text":
    cloud_storage_settings.text_format = (
        IngestionDataSourceSettings.CloudStorage.TextFormat(
            delimiter=text_delimiter
        )
    )
elif input_format == "avro":
    cloud_storage_settings.avro_format = (
        IngestionDataSourceSettings.CloudStorage.AvroFormat()
    )
elif input_format == "pubsub_avro":
    cloud_storage_settings.pubsub_avro_format = (
        IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
    )
else:
    print(
        "Invalid input_format: "
        + input_format
        + "; must be in ('text', 'avro', 'pubsub_avro')"
    )
    return

if match_glob:
    cloud_storage_settings.match_glob = match_glob

if minimum_object_create_time:
    try:
        minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
        minimum_object_create_time_timestamp.FromJsonString(
            minimum_object_create_time
        )
        cloud_storage_settings.minimum_object_create_time = (
            minimum_object_create_time_timestamp
        )
    except ValueError:
        print("Invalid minimum_object_create_time: " + minimum_object_create_time)
        return

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        cloud_storage=cloud_storage_settings,
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

C++

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string bucket, std::string const& input_format,
   std::string text_delimiter, std::string match_glob,
   std::string const& minimum_object_create_time) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                             ->mutable_cloud_storage();
  cloud_storage.set_bucket(std::move(bucket));
  if (input_format == "text") {
    cloud_storage.mutable_text_format()->set_delimiter(
        std::move(text_delimiter));
  } else if (input_format == "avro") {
    cloud_storage.mutable_avro_format();
  } else if (input_format == "pubsub_avro") {
    cloud_storage.mutable_pubsub_avro_format();
  } else {
    std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                 "got value: "
              << input_format << std::endl;
    return;
  }

  if (!match_glob.empty()) {
    cloud_storage.set_match_glob(std::move(match_glob));
  }

  if (!minimum_object_create_time.empty()) {
    google::protobuf::Timestamp timestamp;
    if (!google::protobuf::util::TimeUtil::FromString(
            minimum_object_create_time,
            cloud_storage.mutable_minimum_object_create_time())) {
      std::cout << "Invalid minimum object create time: "
                << minimum_object_create_time << std::endl;
    }
  }

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId: string,
  bucket: string,
  inputFormat: string,
  textDelimiter: string,
  matchGlob: string,
  minimumObjectCreateTime: string
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata: TopicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

문제가 발생하면 Cloud Storage 가져오기 주제 문제 해결을 참고하세요.

Cloud Storage 가져오기 주제 수정

Cloud Storage 가져오기 주제를 수정하여 속성을 업데이트할 수 있습니다.

예를 들어 처리를 다시 시작하려면 버킷을 변경하거나 최소 객체 생성 시간을 업데이트하면 됩니다.

Cloud Storage 가져오기 주제를 수정하려면 다음 단계를 따르세요.

콘솔

  1. Google Cloud 콘솔에서 IAM 페이지로 이동합니다.

    주제로 이동

  2. Cloud Storage 가져오기 주제를 클릭합니다.

  3. 주제 세부정보 페이지에서 수정을 클릭합니다.

  4. 변경하려는 필드를 업데이트합니다.

  5. 업데이트를 클릭합니다.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 가져오기 주제의 설정이 손실되지 않도록 주제를 업데이트할 때마다 모든 설정을 포함해야 합니다. 누락된 항목이 있으면 Pub/Sub에서 설정을 원래 기본값으로 재설정합니다.

    다음 샘플에 언급된 모든 플래그를 사용하여 gcloud pubsub topics update 명령어를 실행합니다.

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    다음을 바꿉니다.

    • TOPIC_ID는 주제 ID 또는 이름입니다. 이 필드는 업데이트할 수 없습니다.

    • BUCKET_NAME: 기존 버킷의 이름을 지정합니다. 예를 들면 prod_bucket입니다. 버킷 이름에 프로젝트 ID를 포함해서는 안 됩니다. 버킷을 만들려면 버킷 만들기를 참조하세요.

    • INPUT_FORMAT: 처리되는 객체의 형식을 지정합니다. 이 값은 text, avro 또는 pubsub_avro입니다. 이러한 옵션에 관한 자세한 내용은 입력 형식을 참고하세요.

    • TEXT_DELIMITER: 텍스트 객체를 Pub/Sub 메시지로 분할하는 구분자를 지정합니다. 단일 문자여야 하며 INPUT_FORMATtext인 경우에만 설정해야 합니다. 기본값은 줄바꿈 문자 (\n)입니다.

      gcloud CLI를 사용하여 구분자를 지정할 때는 새 줄 \n과 같은 특수 문자의 처리에 각별히 주의하세요. 구분자가 올바르게 해석되도록 '\n' 형식을 사용합니다. 따옴표나 이스케이프 없이 \n를 사용하면 구분자가 "n"이 됩니다.

    • MINIMUM_OBJECT_CREATE_TIME: 객체를 처리하기 위해 객체가 생성된 최소 시간을 지정합니다. UTC 기준 YYYY-MM-DDThh:mm:ssZ 형식이어야 합니다. 예를 들면 2024-10-14T08:30:30Z입니다.

      0001-01-01T00:00:00Z9999-12-31T23:59:59Z 사이의 모든 날짜(과거 또는 미래)가 유효합니다.

    • MATCH_GLOB: 객체를 처리하기 위해 일치시킬 glob 패턴을 지정합니다. gcloud CLI를 사용하는 경우 * 문자가 포함된 일치 글로브는 * 문자가 \*\*.txt 형식으로 이스케이프 처리된 형식이거나 전체 일치 글로브가 따옴표 "**.txt" 또는 '**.txt'로 묶여야 합니다. glob 패턴에 지원되는 문법에 관한 자세한 내용은 Cloud Storage 문서를 참고하세요.

Cloud Storage 주제 가져오기의 할당량 및 한도

가져오기 주제의 게시자 처리량은 주제의 게시 할당량에 따라 제한됩니다. 자세한 내용은 Pub/Sub 할당량 및 한도를 참조하세요.

다음 단계