Spanner change streams to Pub/Sub 템플릿

Spanner change stream to Pub/Sub 템플릿은 Spanner 데이터 변경 레코드를 스트리밍하고 Dataflow Runner V2를 사용하여 Pub/Sub 주제에 쓰는 스트리밍 파이프라인입니다.

새 Pub/Sub 주제에 데이터를 출력하려면 먼저 주제를 만들어야 합니다. 만들어진 후에는 Pub/Sub에서 자동으로 구독을 생성하고 새 주제에 연결합니다. 데이터를 존재하지 않는 Pub/Sub 주제에 출력하려고 하면 Dataflow 파이프라인에서 예외가 발생하고 파이프라인이 연결을 계속 시도하므로 파이프라인이 중단됩니다.

필요한 Pub/Sub 주제가 이미 존재하면 데이터를 해당 주제로 출력할 수 있습니다.

자세한 내용은 변경 내역 정보, Dataflow를 사용하여 변경 내역 연결 빌드, 변경 내역 권장사항을 참조하세요.

파이프라인 요구사항

  • 파이프라인을 실행하기 전에 Spanner 인스턴스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 데이터베이스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 메타데이터 인스턴스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 메타데이터 데이터베이스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 변경 내역이 있어야 합니다.
  • 파이프라인을 실행하기 전에 Pub/Sub 주제가 있어야 합니다.

템플릿 매개변수

필수 매개변수

  • spannerInstanceId: 변경 스트림을 읽어올 Spanner 인스턴스입니다.
  • spannerDatabase: 변경 내역을 읽어올 Spanner 데이터베이스입니다.
  • spannerMetadataInstanceId: 변경 내역 커넥터 메타데이터 테이블에 사용할 Spanner 인스턴스입니다.
  • spannerMetadataDatabase: 변경 내역 커넥터 메타데이터 테이블에 사용할 Spanner 데이터베이스입니다.
  • spannerChangeStreamName: 읽어 올 Spanner 변경 내역의 이름입니다.
  • pubsubTopic: 변경 내역 출력을 위한 Pub/Sub 주제입니다.

선택적 매개변수

  • spannerProjectId: 변경 내역을 읽어 올 프로젝트입니다. 변경 내역 커넥터 메타데이터 테이블이 생성되는 프로젝트이기도 합니다. 이 매개변수의 기본값은 Dataflow 파이프라인이 실행되는 프로젝트입니다.
  • spannerDatabaseRole: 템플릿을 실행할 때 사용할 Spanner 데이터베이스 역할입니다. 이 매개변수는 템플릿을 실행하는 IAM 주 구성원이 세분화된 액세스 제어 사용자인 경우에만 필요합니다. 데이터베이스 역할에는 변경 내역에 대한 SELECT 권한과 변경 내역의 읽기 함수에 대한 EXECUTE 권한이 있어야 합니다. 자세한 내용은 변경 내역에 대한 세분화된 액세스 제어(https://cloud.google.com/spanner/docs/fgac-change-streams)를 참조하세요.
  • spannerMetadataTableName: 사용할 Spanner 변경 내역 커넥터 메타데이터 테이블 이름입니다. 제공하지 않으면 Spanner는 파이프라인 흐름 변경 중에 스트림 커넥터 메타데이터 테이블을 자동으로 만듭니다. 기존 파이프라인을 업데이트할 때 이 매개변수를 제공해야 합니다. 다른 경우에는 이 매개변수를 사용하지 마세요.
  • startTimestamp: 변경 내역을 읽는 데 사용할 시작 DateTime (https://tools.ietf.org/html/rfc3339)입니다. 예를 들면 ex- 2021-10-12T07:20:50.52Z입니다. 기본값은 파이프라인이 시작되는 시점의 타임스탬프, 즉 현재 시간입니다.
  • endTimestamp: 변경 내역을 읽는 데 사용할 종료 DateTime (https://tools.ietf.org/html/rfc3339)입니다. 예를 들면 ex- 2021-10-12T07:20:50.52Z입니다. 기본값은 미래의 무한대 시간입니다.
  • spannerHost: 템플릿에서 호출할 Cloud Spanner 엔드포인트입니다. 테스트에만 사용됩니다. 예를 들면 https://spanner.googleapis.com입니다. 기본값은 https://spanner.googleapis.com입니다.
  • outputDataFormat: 출력 형식입니다. 출력은 여러 PubsubMessages로 래핑되고 Pub/Sub 주제로 전송됩니다. 허용되는 형식은 JSON 및 AVRO입니다. 기본값은 JSON입니다.
  • pubsubAPI: 파이프라인을 구현하는 데 사용되는 Pub/Sub API입니다. 허용되는 API는 pubsubionative_client입니다. 소수의 초당 쿼리 수 (QPS)에서는 native_client 지연 시간이 더 짧습니다. 많은 수의 QPS에서 pubsubio가 더 우수하고 안정적인 성능을 제공합니다. 기본값은 pubsubio입니다.
  • pubsubProjectId: Pub/Sub 주제의 프로젝트입니다. 이 매개변수의 기본값은 Dataflow 파이프라인이 실행되는 프로젝트입니다.
  • rpcPriority: Spanner 호출의 요청 우선순위입니다. 허용되는 값은 HIGH, MEDIUM, LOW입니다. 기본값은 HIGH입니다.
  • includeSpannerSource: 변경 스트림을 읽을 Spanner 데이터베이스 ID 및 인스턴스 ID를 출력 메시지 데이터에 포함할지 여부입니다. 기본값은 false입니다.
  • outputMessageMetadata: 출력 Pub/Sub 메시지의 맞춤 필드 outputMessageMetadata의 문자열 값입니다. 기본값은 비어 있으며 이 값이 비어 있지 않은 경우에만 outputMessageMetadata 필드가 채워집니다. 여기에 값을 입력할 때 특수문자(예: 큰따옴표)를 이스케이프 처리하세요.

템플릿 실행

콘솔gcloudAPI
  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Spanner change streams to Pub/Sub template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • SPANNER_INSTANCE_ID: Spanner 인스턴스 ID입니다.
  • SPANNER_DATABASE: Spanner 데이터베이스입니다.
  • SPANNER_METADATA_INSTANCE_ID: Spanner 메타데이터 인스턴스 ID입니다.
  • SPANNER_METADATA_DATABASE: Spanner 메타데이터 데이터베이스입니다.
  • SPANNER_CHANGE_STREAM: Spanner 변경 내역입니다.
  • PUBSUB_TOPIC: 변경 내역 출력을 위한 Pub/Sub 주제

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • SPANNER_INSTANCE_ID: Spanner 인스턴스 ID입니다.
  • SPANNER_DATABASE: Spanner 데이터베이스입니다.
  • SPANNER_METADATA_INSTANCE_ID: Spanner 메타데이터 인스턴스 ID입니다.
  • SPANNER_METADATA_DATABASE: Spanner 메타데이터 데이터베이스입니다.
  • SPANNER_CHANGE_STREAM: Spanner 변경 내역입니다.
  • PUBSUB_TOPIC: 변경 내역 출력을 위한 Pub/Sub 주제
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToPubSubOptions;
import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreamsToPubSub;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link SpannerChangeStreamsToPubSub} pipeline streams change stream record(s) and stores to
 * pubsub topic in user specified format. The sink data can be stored in a JSON Text or Avro data
 * format.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Spanner_Change_Streams_to_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Spanner_Change_Streams_to_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Spanner change streams to Pub/Sub",
    description = {
      "The Cloud Spanner change streams to the Pub/Sub template is a streaming pipeline that streams Cloud Spanner data change records and writes them into Pub/Sub topics using Dataflow Runner V2.\n",
      "To output your data to a new Pub/Sub topic, you need to first create the topic. After creation, Pub/Sub automatically generates and attaches a subscription to the new topic. "
          + "If you try to output data to a Pub/Sub topic that doesn't exist, the dataflow pipeline throws an exception, and the pipeline gets stuck as it continuously tries to make a connection.\n",
      "If the necessary Pub/Sub topic already exists, you can output data to that topic.",
      "Learn more about <a href=\"https://cloud.google.com/spanner/docs/change-streams\">change streams</a>, <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow\">how to build change streams Dataflow pipelines</a>, and <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow#best_practices\">best practices</a>."
    },
    optionsClass = SpannerChangeStreamsToPubSubOptions.class,
    flexContainerName = "spanner-changestreams-to-pubsub",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner instance must exist before running the pipeline.",
      "The Cloud Spanner database must exist prior to running the pipeline.",
      "The Cloud Spanner metadata instance must exist prior to running the pipeline.",
      "The Cloud Spanner metadata database must exist prior to running the pipeline.",
      "The Cloud Spanner change stream must exist prior to running the pipeline.",
      "The Pub/Sub topic must exist prior to running the pipeline."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class SpannerChangeStreamsToPubSub {
  private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamsToPubSub.class);
  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    LOG.info("Starting Input Messages to Pub/Sub");

    SpannerChangeStreamsToPubSubOptions options =
        PipelineOptionsFactory.fromArgs(args).as(SpannerChangeStreamsToPubSubOptions.class);

    run(options);
  }

  private static String getSpannerProjectId(SpannerChangeStreamsToPubSubOptions options) {
    return options.getSpannerProjectId().isEmpty()
        ? options.getProject()
        : options.getSpannerProjectId();
  }

  private static String getPubsubProjectId(SpannerChangeStreamsToPubSubOptions options) {
    return options.getPubsubProjectId().isEmpty()
        ? options.getProject()
        : options.getPubsubProjectId();
  }

  public static boolean isValidAsciiString(String outputMessageMetadata) {
    if (outputMessageMetadata != null
        && !StandardCharsets.US_ASCII.newEncoder().canEncode(outputMessageMetadata)) {
      return false;
    }
    return true;
  }

  public static PipelineResult run(SpannerChangeStreamsToPubSubOptions options) {
    LOG.info("Requested Message Format is " + options.getOutputDataFormat());
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    final Pipeline pipeline = Pipeline.create(options);
    // Get the Spanner project, instance, database, metadata instance, metadata database
    // change stream, pubsub topic, and pubsub api parameters.
    String spannerProjectId = getSpannerProjectId(options);
    String instanceId = options.getSpannerInstanceId();
    String databaseId = options.getSpannerDatabase();
    String metadataInstanceId = options.getSpannerMetadataInstanceId();
    String metadataDatabaseId = options.getSpannerMetadataDatabase();
    String changeStreamName = options.getSpannerChangeStreamName();
    String pubsubProjectId = getPubsubProjectId(options);
    String pubsubTopicName = options.getPubsubTopic();
    String pubsubAPI = options.getPubsubAPI();
    Boolean includeSpannerSource = options.getIncludeSpannerSource();
    String outputMessageMetadata = options.getOutputMessageMetadata();

    // Ensure outputMessageMetadata only contains valid ascii characters
    if (!isValidAsciiString(outputMessageMetadata)) {
      throw new RuntimeException("outputMessageMetadata contains non ascii characters.");
    }

    // Retrieve and parse the start / end timestamps.
    Timestamp startTimestamp =
        options.getStartTimestamp().isEmpty()
            ? Timestamp.now()
            : Timestamp.parseTimestamp(options.getStartTimestamp());
    Timestamp endTimestamp =
        options.getEndTimestamp().isEmpty()
            ? Timestamp.MAX_VALUE
            : Timestamp.parseTimestamp(options.getEndTimestamp());

    // Add use_runner_v2 to the experiments option, since Change Streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    if (!experiments.contains(USE_RUNNER_V2_EXPERIMENT)) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);

    String metadataTableName =
        options.getSpannerMetadataTableName() == null
            ? null
            : options.getSpannerMetadataTableName();

    final RpcPriority rpcPriority = options.getRpcPriority();
    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
            .withProjectId(spannerProjectId)
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId);
    // Propagate database role for fine-grained access control on change stream.
    if (options.getSpannerDatabaseRole() != null) {
      spannerConfig =
          spannerConfig.withDatabaseRole(
              ValueProvider.StaticValueProvider.of(options.getSpannerDatabaseRole()));
    }
    pipeline
        .apply(
            SpannerIO.readChangeStream()
                .withSpannerConfig(spannerConfig)
                .withMetadataInstance(metadataInstanceId)
                .withMetadataDatabase(metadataDatabaseId)
                .withChangeStreamName(changeStreamName)
                .withInclusiveStartAt(startTimestamp)
                .withInclusiveEndAt(endTimestamp)
                .withRpcPriority(rpcPriority)
                .withMetadataTable(metadataTableName))
        .apply(
            "Convert each record to a PubsubMessage",
            FileFormatFactorySpannerChangeStreamsToPubSub.newBuilder()
                .setOutputDataFormat(options.getOutputDataFormat())
                .setProjectId(pubsubProjectId)
                .setPubsubAPI(pubsubAPI)
                .setPubsubTopicName(pubsubTopicName)
                .setIncludeSpannerSource(includeSpannerSource)
                .setSpannerDatabaseId(databaseId)
                .setSpannerInstanceId(instanceId)
                .setOutputMessageMetadata(outputMessageMetadata)
                .build());
    return pipeline.run();
  }
}

다음 단계