Dataflow 커넥터 사용

Dataflow는 데이터를 변환하고 강화하는 관리형 서비스입니다. Cloud Spanner용 Dataflow 커넥터를 사용하면 Dataflow 파이프라인에서 Cloud Spanner의 데이터를 읽거나 쓸 수 있으며 원하는 경우 데이터를 변환하거나 수정할 수 있습니다. Cloud Spanner와 다른 Google Cloud 제품 간에 데이터를 전송하는 파이프라인을 만들 수도 있습니다.

Dataflow 커넥터는 Cloud Spanner 안팎으로 대량의 데이터를 효율적으로 이동하는 데 권장되는 방법입니다. 개별 데이터베이스 작업 시에는 다음과 같은 다른 방법으로 데이터를 가져오고 내보낼 수 있습니다.

  • Cloud Console을 사용하여 Cloud Spanner의 개별 데이터베이스를 Avro 형식으로 Cloud Storage로 내보냅니다.
  • Cloud Console을 사용하여 Cloud Storage로 내보낸 파일에서 Cloud Spanner로 데이터베이스를 다시 가져옵니다.
  • REST API 또는 gcloud 명령줄 도구를 사용하여 Cloud Spanner와 Cloud Storage 간에 내보내기 또는 가져오기 작업을 Avro 형식으로 실행합니다.

Cloud Spanner용 Dataflow 커넥터는 Apache Beam 자바 SDK의 일부이며 위 작업을 수행할 수 있도록 API를 제공합니다. PCollection 객체 및 변환 등 아래에서 설명하는 일부 개념에 대한 자세한 내용은 Apache Beam 프로그래밍 가이드를 참조하세요.

Maven 프로젝트에 커넥터 추가

Google Cloud Dataflow 커넥터를 Maven 프로젝트에 추가하려면 beam-sdks-java-io-google-cloud-platform Maven 아티팩트를 pom.xml 파일에 종속 항목으로 추가합니다.

예를 들어 pom.xml 파일이 beam.version을 적절한 버전 번호로 설정했다고 가정하면 다음 종속 항목을 추가합니다.

<dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
        <version>${beam.version}</version>
    </dependency>
    

Cloud Spanner에서 데이터 읽기

Cloud Spanner에서 읽으려면 SpannerIO.read() 변환을 적용합니다. SpannerIO.Read 클래스의 메서드를 사용하여 읽기를 구성합니다. 변환을 적용하면 PCollection<Struct>가 반환됩니다. 여기서 이 컬렉션의 각 요소는 읽기 작업에서 반환된 개별 행을 나타냅니다. 원하는 출력에 따라 특정 SQL 쿼리를 사용하거나 사용하지 않고 Cloud Spanner에서 읽을 수 있습니다.

SpannerIO.read() 변환을 적용하면 강력한 읽기를 수행하여 일관된 데이터 뷰가 반환됩니다. 달리 지정하지 않는 한 읽기를 시작한 시점에 읽은 결과가 스냅샷으로 생성됩니다. Cloud Spanner가 수행할 수 있는 다양한 읽기 유형에 대한 자세한 내용은 읽기를 참조하세요.

쿼리를 사용하여 읽기

Cloud Spanner에서 데이터의 특정 세트를 읽으려면 SpannerIO.Read.withQuery() 메서드로 변환을 구성하여 SQL 쿼리를 지정합니다. 예:

// Query for all the columns and rows in the specified Spanner table
    PCollection<Struct> records = p.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withQuery("SELECT * FROM " + options.getTable()));

쿼리를 지정하지 않고 읽기

쿼리를 사용하지 않고 데이터베이스에서 읽으려면 테이블 이름과 열 목록을 지정하거나 색인을 사용하여 읽을 수 있습니다. 선택한 열에서 읽으려면 SpannerIO.read()를 사용하여 변환을 구성할 때 테이블 이름과 열 목록을 지정합니다. 예:

// Query for all the columns and rows in the specified Spanner table
    PCollection<Struct> records = p.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Singers")
            .withColumns("singerId", "firstName", "lastName"));

특정 키 조합을 색인 값으로 사용하여 테이블에서 읽을 수도 있습니다. 이렇게 하려면 SpannerIO.Read.withIndex() 메서드로 원하는 키 값이 포함된 색인을 사용하여 읽기를 빌드합니다.

트랜잭션 데이터 비활성 제어

변환은 데이터의 일관된 스냅샷에서 실행되도록 보장됩니다. 데이터 비활성을 제어하려면 SpannerIO.Read.withTimestampBound() 메서드를 사용합니다. 자세한 내용은 트랜잭션을 참조하세요.

동일한 트랜잭션의 여러 테이블에서 읽기

데이터 일관성을 보장하기 위해 동일한 시점에 여러 테이블의 데이터를 읽으려면 단일 트랜잭션으로 모든 읽기를 수행합니다. 이렇게 하려면 createTransaction() 변환을 적용하여 PCollectionView<Transaction> 객체를 만듭니다. 그러면 트랜잭션이 생성됩니다. 결과 뷰는 SpannerIO.Read.withTransaction()을 사용하여 읽기 작업에 전달될 수 있습니다.

SpannerConfig spannerConfig = SpannerConfig.create()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId);
    PCollectionView<Transaction> tx = p.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
    PCollection<Struct> singers = p.apply(SpannerIO.read()
        .withSpannerConfig(spannerConfig)
        .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
        .withTransaction(tx));
    PCollection<Struct> albums = p.apply(SpannerIO.read().withSpannerConfig(spannerConfig)
        .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
        .withTransaction(tx));

사용할 수 있는 모든 테이블에서 데이터 읽기

Cloud Spanner 데이터베이스의 사용할 수 있는 모든 테이블에서 데이터를 읽을 수 있습니다.

PCollection<Struct> allRecords = p.apply(SpannerIO.read()
        .withSpannerConfig(spannerConfig)
        .withBatching(false)
        .withQuery("SELECT t.table_name FROM information_schema.tables AS t WHERE t"
            + ".table_catalog = '' AND t.table_schema = ''")).apply(
        MapElements.into(TypeDescriptor.of(ReadOperation.class))
            .via((SerializableFunction<Struct, ReadOperation>) input -> {
              String tableName = input.getString(0);
              return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
            })).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

지원되지 않는 쿼리 문제해결

Dataflow 커넥터는 쿼리 실행 계획의 첫 번째 연산자가 분산 통합인 Cloud Spanner SQL 쿼리만 지원합니다. 쿼리를 사용하여 Cloud Spanner에서 데이터를 읽으려고 할 때 쿼리 does not have a DistributedUnion at the root라는 예외가 발생하면 Cloud Spanner의 쿼리 실행 방법 이해의 단계를 따라 Cloud Console을 사용하여 쿼리의 실행 계획을 검색합니다.

SQL 쿼리가 지원되지 않으면 쿼리 실행 계획의 첫 번째 연산자로 분산 통합이 포함된 쿼리로 이 쿼리를 단순화합니다. 쿼리 작동을 막을 가능성이 가장 높은 DISTINCT, GROUP BY, ORDER 연산자와 집계 함수를 제거합니다.

쓰기 변형 생성

반드시 필요한 경우가 아니라면 newInsertBuilder() 메서드 대신 Mutation 클래스의 newInsertOrUpdateBuilder() 메서드를 사용합니다. Dataflow는 at-least-once를 보장하므로 변형을 여러 번 쓸 수 있습니다. 결과적으로 삽입 변형은 파이프라인에 결함을 유발하는 오류를 생성할 수 있습니다. 이러한 오류를 방지하려면 두 번 이상 적용할 수 있는 insert-or-update 변형을 만듭니다.

Cloud Spanner에 쓰기 및 데이터 변환

SpannerIO.write() 변환을 사용하여 입력 행 변형 컬렉션을 실행하면 Dataflow 커넥터를 사용하여 Cloud Spanner에 데이터를 쓸 수 있습니다. Dataflow 커넥터는 효율성을 높이기 위해 변형을 배치로 그룹화합니다.

다음 예시에서는 쓰기 변환을 변형의 PCollection에 적용하는 방법을 보여줍니다.

albums
        // Spanner expects a Mutation object, so create it using the Album's data
        .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            Album album = c.element();
            c.output(Mutation.newInsertOrUpdateBuilder("albums")
                .set("singerId").to(album.singerId)
                .set("albumId").to(album.albumId)
                .set("albumTitle").to(album.albumTitle)
                .build());
          }
        }))
        // Write mutations to Spanner
        .apply("WriteAlbums", SpannerIO.write()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId));

변환이 완료되기 전에 예기치 않게 중지되더라도 이미 적용된 변형은 롤백되지 않습니다.

변형 그룹을 원자적으로 적용

MutationGroup 클래스를 사용하면 변형 그룹 하나가 원자적으로 함께 적용되도록 할 수 있습니다. MutationGroup의 변형은 동일한 트랜잭션에서 제출되도록 보장되지만 트랜잭션이 다시 시도될 수 있습니다.

변형 그룹은 키 공간에 함께 저장된 데이터에 영향을 미치는 변형을 그룹화하는 데 사용될 때 최고의 성능을 발휘합니다. Cloud Spanner는 상위 테이블에 상위 테이블 데이터와 하위 테이블 데이터를 함께 인터리브 처리하므로 키 공간에서 데이터가 항상 서로 가깝게 위치합니다. 상위 테이블에 적용되는 변형 하나와 하위 테이블에 적용되는 추가 변형을 포함하도록 또는 모든 변형이 키 공간에서 서로 가깝게 위치한 데이터를 수정하도록 변형 그룹을 구성하는 것이 좋습니다. Cloud Spanner가 상위 및 하위 테이블 데이터를 저장하는 방법에 대한 자세한 내용은 스키마 및 데이터 모델을 참조하세요. 권장되는 테이블 계층 주위에 변형 그룹을 구성하지 않거나 액세스 대상 데이터가 키 공간에서 서로 가깝게 위치하지 않는 경우 Cloud Spanner는 2단계 커밋을 수행해야 할 수 있고, 이로 인해 성능이 저하될 수 있습니다. 자세한 내용은 지역성의 장단점을 참조하세요.

MutationGroup을 사용하려면 SpannerIO.write() 변환을 빌드하고 SpannerIO.Write.grouped() 메서드를 호출합니다. 그러면 MutationGroup 객체의 PCollection에 적용할 수 있는 변환이 반환됩니다.

MutationGroup을 만들 때 나열된 첫 번째 변형이 기본 변형이 됩니다. 변형 그룹이 상위 및 하위 테이블 모두에 영향을 미치는 경우 기본 변형은 상위 테이블에 대한 변형이어야 합니다. 그렇지 않으면 모든 변형을 기본 변형으로 사용할 수 있습니다. Dataflow 커넥터는 변형의 효율적인 배치 작업을 위해 기본 변형을 사용하여 파티션 경계를 결정합니다.

예를 들어 동작을 모니터링하고 검토할 문제가 있는 사용자 동작을 신고하는 애플리케이션이 있다고 가정합니다. 신고된 각 동작에 대해 사용자 애플리케이션의 액세스를 차단하도록 Users 테이블을 업데이트하고 이 이슈를 PendingReviews 테이블에 기록해야 합니다. 두 테이블 모두 원자적으로 업데이트하려면 MutationGroup을 사용합니다.

PCollection<MutationGroup> mutations = suspiciousUserIds
        .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

          @Override
          public MutationGroup apply(String userId) {
            // Immediately block the user.
            Mutation userMutation = Mutation.newUpdateBuilder("Users")
                .set("id").to(userId)
                .set("state").to("BLOCKED")
                .build();
            long generatedId = Hashing.sha1().newHasher()
                .putString(userId, Charsets.UTF_8)
                .putLong(timestamp.getSeconds())
                .putLong(timestamp.getNanos())
                .hash()
                .asLong();

            // Add an entry to pending review requests.
            Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
                .set("id").to(generatedId)  // Must be deterministically generated.
                .set("userId").to(userId)
                .set("action").to("REVIEW ACCOUNT")
                .set("note").to("Suspicious activity detected.")
                .build();

            return MutationGroup.create(userMutation, pendingReview);
          }
        }));

    mutations.apply(SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .grouped());

변형 그룹을 만들 때 인수로 제공된 첫 번째 변형이 기본 변형이 됩니다. 이 경우에는 두 테이블이 서로 관련이 없으므로 명확한 기본 변형이 없습니다. 여기서는 userMutation을 먼저 배치하여 기본 변형으로 선택했습니다. 두 변형을 개별적으로 적용하는 것이 더 빠르지만 원자성을 보장하지 않으므로 이 상황에서는 변형 그룹이 최선의 선택입니다.

다음 단계