Dataflow を使用したデータのインポート、エクスポート、変更

Dataflow は、データの変換と拡張を行うマネージド サービスです。Spanner の Dataflow コネクタを使用すると、Dataflow パイプラインで Spanner とデータの読み取り / 書き込みを行うことができます。また、必要に応じてデータを変換したり、変更したりできます。Spanner と他の Google Cloud プロダクトの間でデータを送受信するパイプラインを作成することもできます。

Spanner との間でデータを一括して効率的に送受信する場合と、パーティション化 DML でサポートされていないデータベースに大規模な変換(テーブルの移動や、JOIN を必要とする一括削除など)を行う場合には、Dataflow コネクタが推奨されます。個々のデータベースを操作する場合は、別の方法でデータをインポートまたはエクスポートできます。

  • Google Cloud コンソールを使用して、Spanner から Cloud Storage に個々のデータベースを Avro 形式でエクスポートします。
  • Google Cloud コンソールを使用して、Cloud Storage にエクスポートしたファイルから Spanner にデータベースをインポートします。
  • REST API または Google Cloud CLI を使用して、Spanner と Cloud Storage 間でエクスポート ジョブまたはインポート ジョブを実行します(ここでも Avro 形式を使用します)。

Spanner 用の Dataflow コネクタは、Apache Beam Java SDK に含まれており、前述のアクションを実行する API を提供します。PCollection オブジェクトや変換など、ここで説明するコンセプトの詳細については、Apache Beam プログラミング ガイドをご覧ください。

Maven プロジェクトにコネクタを追加する

Google Cloud Dataflow コネクタを Maven プロジェクトに追加するには、beam-sdks-java-io-google-cloud-platform Maven アーティファクトを pom.xml ファイルに依存ファイルとして追加します。

たとえば、pom.xml ファイルで、beam.version に適切なバージョン番号を設定されていると仮定して、次の依存関係を追加します。

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Spanner からデータを読み取る

Spanner から読み取るには、SpannerIO.read() 変換を適用します。SpannerIO.Read クラスのメソッドを使用して読み取りを構成します。変換を適用すると PCollection<Struct> が返されます。このコレクションの各要素は読み取りオペレーションによって返された個々の行を表します。出力によっては、特定の SQL クエリを指定または指定せずに Spanner から読み取ることができます。

SpannerIO.read() 変換を適用すると、強力な読み取りを実行して、一貫したデータビューを取得できます。特に指定のない限り、読み取りを開始したときの読み取りの結果がスナップショットされます。Spanner で実行可能な読み取りの種類については、読み取りをご覧ください。

クエリを使用してデータを読み取る

Spanner から特定のデータセットを読み取るには、SpannerIO.Read.withQuery() メソッドで SQL クエリを指定し、変換を構成します。次に例を示します。

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

クエリを指定せずにデータを読み取る

クエリを使用せずにデータベースから読み取るには、SpannerIO.Read.withTable() メソッドを使用してテーブル名を指定し、 SpannerIO.Read.withColumns() メソッドを使用して読み込む列のリストを指定します。例:

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

読み取る行を制限する場合は、SpannerIO.Read.withKeySet() メソッドを使用して、読み取る主キーのセットを指定します。

指定したセカンダリ インデックスを使用して、テーブルを読み取ることもできます。readUsingIndex() API 呼び出しと同様に、インデックスには、クエリ結果に表示されるすべてのデータが含まれている必要があります。

これを行うには、前の例に示されているテーブルを指定し、SpannerIO.Read.withIndex() を使用して目的の列の値を含むインデックスを指定します。インデックスには、変換で読み取る必要があるすべての列を保存する必要があります。ベーステーブルの主キーは暗黙的に保存されます。たとえば、インデックス SongsBySongName を使用して Songs テーブルを読み取るには、次のコードを使用します。

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

トランザクション データのステイルネスを制御する

変換は整合性のあるデータのスナップショットに実行されます。データのステイルネスを制御するには、SpannerIO.Read.withTimestampBound() メソッドを使用します。詳細については、トランザクションをご覧ください。

同じトランザクションの複数のテーブルから読み取る

データの整合性を確保するために同じ時点で複数のテーブルからデータを読み取る場合は、すべての読み取りを 1 回のトランザクションで実行します。そのためには、createTransaction() 変換を適用して PCollectionView<Transaction> オブジェクトを作成します。このオブジェクトによりトランザクションが作成されます。結果のビューは、SpannerIO.Read.withTransaction() を使用して読み取りオペレーションに渡すことができます。

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

利用可能なすべてのテーブルからデータを読み取る

Spanner データベースで利用可能なすべてのテーブルからデータを読み取ることができます。

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

サポートされていないクエリのトラブルシューティング

Dataflow コネクタは、クエリ実行プランの最初の演算子が分散ユニオンである Spanner SQL クエリのみをサポートします。クエリで Spanner からデータを読み取るときに、クエリ does not have a DistributedUnion at the root の例外が発生した場合は、Spanner がクエリを実行する仕組みについて の手順に沿って、Google Cloud コンソールでクエリの実行プランを取得します。

SQL クエリがサポートされていない場合は、クエリ実行プランの最初の演算子を分散ユニオンにします。クエリの動作を妨げる可能性が高いため、集計関数、テーブル結合、DISTINCTGROUP BYORDER などの演算子は削除します。

書き込み用にミューテーションを作成する

Java パイプラインに必要不可欠な場合を除き、newInsertBuilder() メソッドの代わりに、Mutation クラスの newInsertOrUpdateBuilder() メソッドを使用してください。Python パイプラインの場合は、SpannerInsert() ではなく SpannerInsertOrUpdate() を使用します。Dataflow には「最低 1 回」の保証があるため、ミューテーションが複数回書き込まれることがあります。その結果、INSERT ミューテーションでのみ com.google.cloud.spanner.SpannerException: ALREADY_EXISTS エラーが発生し、パイプラインが失敗する可能性があります。このエラーを回避するには、代わりに INSERT_OR_UPDATE ミューテーションを使用します。これにより、新しい行が追加されるか、行がすでに存在している場合は列値が更新されます。INSERT_OR_UPDATE ミューテーションを複数回適用できます。

Spanner にデータを書き込み、データを変換する

Dataflow コネクタで Spanner にデータを書き込むには、SpannerIO.write() 変換を使用して入力行のミューテーションのコレクションを実行します。処理効率を高めるため、Dataflow コネクタはこれらのミューテーションをグループ化します。

次の例では、ミューテーションの PCollection に書き込み変換を適用しています。

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

変換が完了する前に予期せず停止した場合、すでに適用されているミューテーションはロールバックされません。

ミューテーションのグループをアトミックに適用する

MutationGroup クラスを使用すると、ミューテーションのグループをまとめてアトミックに適用できます。MutationGroup のミューテーションは同じトランザクションで送信されますが、トランザクションが再試行される可能性があります。

ミューテーション グループは、キー空間内で近接しているデータに影響を及ぼすミューテーションをグループ化すると最も効率的に処理されます。Spanner は、親テーブルのデータと子テーブルのデータを親テーブルにインターリーブするため、これらのデータは常にキー空間内で近接しています。ミューテーション グループを構成する際には、親テーブルに適用される 1 つのミューテーションと子テーブルに適用されるその他のミューテーションで構成するか、すべてのミューテーションがキー空間内で近接しているデータを変更できるように構成することをおすすめします。Spanner が親テーブルと子テーブルのデータを格納する方法については、スキーマとデータモデルをご覧ください。推奨のテーブル階層にミューテーション グループを編成していない場合や、アクセス対象のデータがキー空間内で近接していない場合は、Spanner が 2 フェーズ commit を実行する必要があるために、パフォーマンスが低下します。詳細については、局所性のトレードオフをご覧ください。

MutationGroup を使用するには、SpannerIO.write() 変換を作成し、SpannerIO.Write.grouped() メソッドを呼び出します。このメソッドは、MutationGroup オブジェクトの PCollection に適用できる変換を返します。

MutationGroup を作成すると、リストの最初のミューテーションがプライマリ ミューテーションになります。ミューテーション グループが親テーブルと子テーブルの両方に影響を及ぼす場合、プライマリ ミューテーションは親テーブルのミューテーションでなければなりません。それ以外の場合、任意のミューテーションをプライマリとして使用できます。Dataflow コネクタは、プライマリ ミューテーションを使用してパーティションの境界を決定し、ミューテーションを効率的にバッチ処理します。

たとえば、アプリケーションがユーザーの動作をモニタリングし、問題のある動作にフラグを設定してレビューするとします。動作にフラグが設定されるたびに Users テーブルを更新し、アプリケーションに対するユーザーのアクセスをブロックするには、PendingReviews テーブルにインシデントを記録する必要があります。両方のテーブルがアトミックに更新されるようにするには、MutationGroup を使用します。

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

ミューテーション グループを作成する場合、引数として渡される最初のミューテーションがプライマリ ミューテーションになります。この場合、2 つのテーブルは無関係であるため、明確なプライマリ ミューテーションは存在しません。ここでは、最初に配置することで userMutation をプライマリにしています。2 つのミューテーションを別々に適用するほうが速くなりますが、アトミック性は保証されないため、この状況ではミューテーション グループを使用するのが最適な選択となります。

次のステップ