Dataflow は、データの変換と拡張を行うマネージド サービスです。Spanner の Dataflow コネクタを使用すると、Dataflow パイプラインで Spanner とデータの読み取り / 書き込みを行うことができます。また、必要に応じてデータを変換したり、変更したりできます。Spanner と他の Google Cloud プロダクトの間でデータを送受信するパイプラインを作成することもできます。
Spanner との間でデータを一括して効率的に送受信する場合と、パーティション化 DML でサポートされていないデータベースに大規模な変換(テーブルの移動や、JOIN を必要とする一括削除など)を行う場合には、Dataflow コネクタが推奨されます。個々のデータベースを操作する場合は、別の方法でデータをインポートまたはエクスポートできます。
- Google Cloud コンソールを使用して、Spanner から Cloud Storage に個々のデータベースを Avro 形式でAvroします。
- Google Cloud コンソールを使用して、Cloud Storage にエクスポートしたファイルから Spanner にデータベースをインポートします。
- REST API または Google Cloud CLI を使用して、Spanner と Cloud Storage 間でエクスポート ジョブまたはインポート ジョブを実行します(ここでも Avro 形式を使用します)。
Spanner 用の Dataflow コネクタは、Apache Beam Java SDK に含まれており、前述のアクションを実行する API を提供します。PCollection オブジェクトや変換など、ここで説明するコンセプトの詳細については、Apache Beam プログラミング ガイドをご覧ください。
Maven プロジェクトにコネクタを追加する
Google Cloud Dataflow コネクタを Maven プロジェクトに追加するには、beam-sdks-java-io-google-cloud-platform
Maven アーティファクトを pom.xml
ファイルに依存ファイルとして追加します。
たとえば、pom.xml
ファイルで、beam.version
に適切なバージョン番号を設定されていると仮定して、次の依存関係を追加します。
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
Spanner からデータを読み取る
Spanner から読み取るには、SpannerIO.read() 変換を適用します。SpannerIO.Read
クラスのメソッドを使用して読み取りを構成します。変換を適用すると PCollection<Struct>
が返されます。このコレクションの各要素は読み取りオペレーションによって返された個々の行を表します。出力によっては、特定の SQL クエリを指定または指定せずに Spanner から読み取ることができます。
SpannerIO.read()
変換を適用すると、強力な読み取りを実行して、一貫したデータビューを取得できます。特に指定のない限り、読み取りを開始したときの読み取りの結果がスナップショットされます。Spanner で実行可能な読み取りの種類については、読み取りをご覧ください。
クエリを使用してデータを読み取る
Spanner から特定のデータセットを読み取るには、SpannerIO.Read.withQuery()
メソッドで SQL クエリを指定し、変換を構成します。次に例を示します。
クエリを指定せずにデータを読み取る
クエリを使用せずにデータベースから読み取るには、SpannerIO.Read.withTable() メソッドを使用してテーブル名を指定し、 SpannerIO.Read.withColumns() メソッドを使用して読み込む列のリストを指定します。例:
GoogleSQL
PostgreSQL
読み取る行を制限する場合は、SpannerIO.Read.withKeySet() メソッドを使用して、読み取る主キーのセットを指定します。
指定したセカンダリ インデックスを使用して、テーブルを読み取ることもできます。readUsingIndex() API 呼び出しと同様に、インデックスには、クエリ結果に表示されるすべてのデータが含まれている必要があります。
これを行うには、前の例に示されているテーブルを指定し、SpannerIO.Read.withIndex()
を使用して目的の列の値を含むインデックスを指定します。インデックスには、変換で読み取る必要があるすべての列を保存する必要があります。ベーステーブルの主キーは暗黙的に保存されます。たとえば、インデックス SongsBySongName
を使用して Songs
テーブルを読み取るには、次のコードを使用します。
GoogleSQL
PostgreSQL
トランザクション データのステイルネスを制御する
変換は整合性のあるデータのスナップショットに実行されます。データのステイルネスを制御するには、SpannerIO.Read.withTimestampBound()
メソッドを使用します。詳細については、トランザクションをご覧ください。
同じトランザクションの複数のテーブルから読み取る
データの整合性を確保するために同じ時点で複数のテーブルからデータを読み取る場合は、すべての読み取りを 1 回のトランザクションで実行します。そのためには、createTransaction()
変換を適用して PCollectionView<Transaction>
オブジェクトを作成します。このオブジェクトによりトランザクションが作成されます。結果のビューは、SpannerIO.Read.withTransaction()
を使用して読み取りオペレーションに渡すことができます。
GoogleSQL
PostgreSQL
利用可能なすべてのテーブルからデータを読み取る
Spanner データベースで利用可能なすべてのテーブルからデータを読み取ることができます。
GoogleSQL
PostgreSQL
サポートされていないクエリのトラブルシューティング
Dataflow コネクタは、クエリ実行プランの最初の演算子が分散ユニオンである Spanner SQL クエリのみをサポートします。クエリで Spanner からデータを読み取るときに、クエリ does not have a DistributedUnion at
the root
の例外が発生した場合は、Spanner がクエリを実行する仕組みについて の手順に沿って、Google Cloud コンソールでクエリの実行プランを取得します。
SQL クエリがサポートされていない場合は、クエリ実行プランの最初の演算子を分散ユニオンにします。クエリの動作を妨げる可能性が高いため、集計関数、テーブル結合、DISTINCT
、GROUP BY
、ORDER
などの演算子は削除します。
書き込み用にミューテーションを作成する
Java パイプラインに必要不可欠な場合を除き、newInsertBuilder()
メソッドの代わりに、Mutation
クラスの newInsertOrUpdateBuilder()
メソッドを使用してください。Python パイプラインの場合は、SpannerInsert()
ではなく SpannerInsertOrUpdate()
を使用します。Dataflow には「最低 1 回」の保証があるため、ミューテーションが複数回書き込まれることがあります。その結果、INSERT
ミューテーションでのみ com.google.cloud.spanner.SpannerException: ALREADY_EXISTS
エラーが発生し、パイプラインが失敗する可能性があります。このエラーを回避するには、代わりに INSERT_OR_UPDATE
ミューテーションを使用します。これにより、新しい行が追加されるか、行がすでに存在している場合は列値が更新されます。INSERT_OR_UPDATE
ミューテーションを複数回適用できます。
Spanner にデータを書き込み、データを変換する
Dataflow コネクタで Spanner にデータを書き込むには、SpannerIO.write()
変換を使用して入力行のミューテーションのコレクションを実行します。処理効率を高めるため、Dataflow コネクタはこれらのミューテーションをグループ化します。
次の例では、ミューテーションの PCollection
に書き込み変換を適用しています。
GoogleSQL
PostgreSQL
変換が完了する前に予期せず停止した場合、すでに適用されているミューテーションはロールバックされません。
ミューテーションのグループをアトミックに適用する
MutationGroup
クラスを使用すると、ミューテーションのグループをまとめてアトミックに適用できます。MutationGroup
のミューテーションは同じトランザクションで送信されますが、トランザクションが再試行される可能性があります。
ミューテーション グループは、キー空間内で近接しているデータに影響を及ぼすミューテーションをグループ化すると最も効率的に処理されます。Spanner は、親テーブルのデータと子テーブルのデータを親テーブルにインターリーブするため、これらのデータは常にキー空間内で近接しています。ミューテーション グループを構成する際には、親テーブルに適用される 1 つのミューテーションと子テーブルに適用されるその他のミューテーションで構成するか、すべてのミューテーションがキー空間内で近接しているデータを変更できるように構成することをおすすめします。Spanner が親テーブルと子テーブルのデータを格納する方法については、スキーマとデータモデルをご覧ください。推奨のテーブル階層にミューテーション グループを編成していない場合や、アクセス対象のデータがキー空間内で近接していない場合は、Spanner が 2 フェーズ commit を実行する必要があるために、パフォーマンスが低下します。詳細については、局所性のトレードオフをご覧ください。
MutationGroup
を使用するには、SpannerIO.write()
変換を作成し、SpannerIO.Write.grouped()
メソッドを呼び出します。このメソッドは、MutationGroup
オブジェクトの PCollection
に適用できる変換を返します。
MutationGroup
を作成すると、リストの最初のミューテーションがプライマリ ミューテーションになります。ミューテーション グループが親テーブルと子テーブルの両方に影響を及ぼす場合、プライマリ ミューテーションは親テーブルのミューテーションでなければなりません。それ以外の場合、任意のミューテーションをプライマリとして使用できます。Dataflow コネクタは、プライマリ ミューテーションを使用してパーティションの境界を決定し、ミューテーションを効率的にバッチ処理します。
たとえば、アプリケーションがユーザーの動作をモニタリングし、問題のある動作にフラグを設定してレビューするとします。動作にフラグが設定されるたびに Users
テーブルを更新し、アプリケーションに対するユーザーのアクセスをブロックするには、PendingReviews
テーブルにインシデントを記録する必要があります。両方のテーブルがアトミックに更新されるようにするには、MutationGroup
を使用します。
GoogleSQL
PostgreSQL
ミューテーション グループを作成する場合、引数として渡される最初のミューテーションがプライマリ ミューテーションになります。この場合、2 つのテーブルは無関係であるため、明確なプライマリ ミューテーションは存在しません。ここでは、最初に配置することで userMutation
をプライマリにしています。2 つのミューテーションを別々に適用するほうが速くなりますが、アトミック性は保証されないため、この状況ではミューテーション グループを使用するのが最適な選択となります。
次のステップ
- Apache Beam のデータ パイプラインの設計について学習する。
- Google Cloud コンソールで Dataflow を使用して Spanner データベースをエクスポートおよびインポートします。