Dataflow コネクタの使用

Dataflow は、データの変換と拡張を行うマネージド サービスです。Cloud Spanner の Dataflow コネクタを使用すると、Dataflow パイプラインで Cloud Spanner とデータの読み取り / 書き込みを行うことができます。また、必要に応じてデータを変換したり、変更したりできます。Cloud Spanner と他の Google Cloud プロダクトの間でデータを送受信するパイプラインを作成することもできます。

Cloud Spanner との間でデータを一括して効率的に送受信するには、Dataflow コネクタがおすすめの方法です。個々のデータベースを操作する場合は、別の方法でデータをインポートまたはエクスポートできます。

  • Cloud Console で、Cloud Spanner から Cloud Storage に個々のデータベースを Avro 形式でエクスポートします。
  • Cloud Console で、Cloud Storage にエクスポートしたファイルから Cloud Spanner にデータベースをインポートします。
  • REST API または gcloud コマンドライン ツールを使用して、Cloud Spanner と Cloud Storage 間でエクスポートまたはインポート ジョブを実行します(ここでも Avro 形式を使用します)。

Cloud Spanner 用の Dataflow コネクタは、Apache Beam Java SDK に含まれており、前述のアクションを実行する API を提供します。PCollection オブジェクトや変換など、ここで説明するコンセプトの詳細については、Apache Beam プログラミング ガイドをご覧ください。

Maven プロジェクトにコネクタを追加する

Google Cloud Dataflow コネクタを Maven プロジェクトに追加するには、beam-sdks-java-io-google-cloud-platform Maven アーティファクトを pom.xml ファイルに依存ファイルとして追加します。

たとえば、pom.xml ファイルで、beam.version に適切なバージョン番号を設定されていると仮定して、次の依存関係を追加します。

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Cloud Spanner からデータを読み取る

Cloud Spanner から読み取るには、SpannerIO.read() 変換を適用します。SpannerIO.Read クラスのメソッドを使用して読み取りを構成します。変換を適用すると PCollection<Struct> が返されます。このコレクションの各要素は読み取りオペレーションによって返された個々の行を表します。出力によっては、特定の SQL クエリを指定または指定せずに Cloud Spanner から読み取ることができます。

SpannerIO.read() 変換を適用すると、強力な読み取りを実行して、一貫したデータビューを取得できます。特に指定のない限り、読み取りを開始したときの読み取りの結果がスナップショットされます。Cloud Spanner で実行可能な読み取りの種類については、読み取りをご覧ください。

クエリを使用して読み取る

Cloud Spanner から特定のデータセットを読み取るには、SpannerIO.Read.withQuery() メソッドで SQL クエリを指定し、変換を構成します。例:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = p.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

クエリを指定せずに読み取る

クエリを使用せずにデータベースから読み取るには、テーブル名と列のリストを指定するか、インデックスを使用して読み取りを行います。選択した列から読み取るには、SpannerIO.read() を使用して変換を構築するときに、テーブル名と列のリストを指定します。例:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = p.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

インデックス値として特定のキーセットを使用すると、テーブルから読み取ることができます。そのためには、SpannerIO.Read.withIndex() メソッドで目的のキー値を含むインデックスを使用して、読み取りを作成します。

トランザクション データのステイルネスの制御

変換は整合性のあるデータのスナップショットに実行されます。データのステイルネスを制御するには、SpannerIO.Read.withTimestampBound() メソッドを使用します。詳細については、トランザクションをご覧ください。

同じトランザクションの複数のテーブルから読み取る

データの整合性を確保するために同じ時点で複数のテーブルからデータを読み取る場合は、すべての読み取りを 1 回のトランザクションで実行します。そのためには、createTransaction() 変換を適用して PCollectionView<Transaction> オブジェクトを作成します。このオブジェクトによりトランザクションが作成されます。結果のビューは、SpannerIO.Read.withTransaction() を使用して読み取りオペレーションに渡すことができます。

SpannerConfig spannerConfig = SpannerConfig.create()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId);
PCollectionView<Transaction> tx = p.apply(
    SpannerIO.createTransaction()
        .withSpannerConfig(spannerConfig)
        .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers = p.apply(SpannerIO.read()
    .withSpannerConfig(spannerConfig)
    .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
    .withTransaction(tx));
PCollection<Struct> albums = p.apply(SpannerIO.read().withSpannerConfig(spannerConfig)
    .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
    .withTransaction(tx));

利用可能なすべてのテーブルからデータを読み取る

Cloud Spanner データベースで利用可能なすべてのテーブルからデータを読み取ることができます。

PCollection<Struct> allRecords = p.apply(SpannerIO.read()
    .withSpannerConfig(spannerConfig)
    .withBatching(false)
    .withQuery("SELECT t.table_name FROM information_schema.tables AS t WHERE t"
        + ".table_catalog = '' AND t.table_schema = ''")).apply(
    MapElements.into(TypeDescriptor.of(ReadOperation.class))
        .via((SerializableFunction<Struct, ReadOperation>) input -> {
          String tableName = input.getString(0);
          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
        })).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

サポートされていないクエリのトラブルシューティング

Dataflow コネクタは、クエリ実行プランの最初の演算子が分散ユニオンである Cloud Spanner SQL クエリのみをサポートします。クエリで Cloud Spanner からデータを読み取るときに、クエリ does not have a DistributedUnion at the root の例外が発生した場合は、Cloud Spanner がクエリを実行する仕組みについて の手順に沿って、Cloud Console でクエリの実行プランを取得します。

SQL クエリがサポートされていない場合は、クエリ実行プランの最初の演算子を分散ユニオンにします。クエリの動作を妨げる可能性が高いため、集計関数や DISTINCTGROUP BYORDER などの演算子は削除します。

書き込み用にミューテーションを作成する

newInsertBuilder() メソッドが必要な場合以外は、Mutation クラスの newInsertOrUpdateBuilder() メソッドを使用してください。Dataflow には「最低 1 回」の保証があるため、ミューテーションが複数回書き込まれることが多くなります。その結果、挿入ミューテーションが原因でパイプラインが失敗する可能性があります。このようなエラーを防ぐには、複数回適用可能な挿入または更新のミューテーションを作成します。

Cloud Spanner への書き込みとデータの変換

Dataflow コネクタで Cloud Spanner にデータを書き込むには、SpannerIO.write() 変換を使用して入力行のミューテーションのコレクションを実行します。処理効率を高めるため、Dataflow コネクタはこれらのミューテーションをグループ化します。

次の例では、ミューテーションの PCollection に書き込み変換を適用しています。

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

変換が完了する前に予期せず停止した場合、すでに適用されているミューテーションはロールバックされません。

ミューテーションのグループをアトミックに適用する

MutationGroup クラスを使用すると、ミューテーションのグループをまとめてアトミックに適用できます。MutationGroup のミューテーションは同じトランザクションで送信されますが、トランザクションが再試行される可能性があります。

ミューテーション グループは、キー空間内で近接しているデータに影響を及ぼすミューテーションをグループ化すると最も効率的に処理されます。Cloud Spanner は、親テーブルのデータと子テーブルのデータを親テーブルにインターリーブするため、これらのデータは常にキー空間内で近接しています。ミューテーション グループを構成する際には、親テーブルに適用される 1 つのミューテーションと子テーブルに適用されるその他のミューテーションで構成するか、すべてのミューテーションがキー空間内で近接しているデータを変更できるように構成することをおすすめします。Cloud Spanner が親テーブルと子テーブルのデータを格納する方法については、スキーマとデータモデルをご覧ください。推奨のテーブル階層にミューテーション グループを編成していない場合や、アクセス対象のデータがキー空間内で近接していない場合は、Cloud Spanner が 2 フェーズ commit を実行する必要があるために、パフォーマンスが低下します。詳細については、局所性のトレードオフをご覧ください。

MutationGroup を使用するには、SpannerIO.write() 変換を作成し、SpannerIO.Write.grouped() メソッドを呼び出します。このメソッドは、MutationGroup オブジェクトの PCollection に適用できる変換を返します。

MutationGroup を作成すると、リストの最初のミューテーションがプライマリ ミューテーションになります。ミューテーション グループが親テーブルと子テーブルの両方に影響を及ぼす場合、プライマリ ミューテーションは親テーブルのミューテーションでなければなりません。それ以外の場合、任意のミューテーションをプライマリとして使用できます。Dataflow コネクタは、プライマリ ミューテーションを使用してパーティションの境界を決定し、ミューテーションを効率的にバッチ処理します。

たとえば、アプリケーションがユーザーの動作をモニタリングし、問題のある動作にフラグを設定してレビューするとします。動作にフラグが設定されるたびに Users テーブルを更新し、アプリケーションに対するユーザーのアクセスをブロックするには、PendingReviews テーブルにインシデントを記録する必要があります。両方のテーブルがアトミックに更新されるようにするには、MutationGroup を使用します。

PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

ミューテーション グループを作成する場合、引数として渡される最初のミューテーションがプライマリ ミューテーションになります。この場合、2 つのテーブルは無関係であるため、明確なプライマリ ミューテーションは存在しません。ここでは、最初に配置することで userMutation をプライマリにしています。2 つのミューテーションを別々に適用するほうが速くなりますが、アトミック性は保証されないため、この状況ではミューテーション グループを使用するのが最適な選択となります。

次のステップ