Dataflow 是一种用于对数据进行转换并丰富数据内容的托管服务。借助适用于 Spanner 的 Dataflow 连接器,您可以在 Dataflow 流水线中从 Spanner 读取数据和向其中写入数据,还可以选择转换或修改数据。您还可以创建在 Spanner 与其他 Google Cloud 产品之间传输数据的流水线。
建议使用 Dataflow 连接器高效地将数据批量移入和移出 Spanner,以及对分区 DML 不支持的数据库执行大型转换(例如移动表、需要联接的批量删除等)。使用单个数据库时,可以使用其他方法导入和导出数据:
- 使用 Google Cloud 控制台以 Avro 格式将单个数据库从 Spanner 导出到 Cloud Storage。
- 使用 Google Cloud 控制台将数据库从导出到 Cloud Storage 的文件重新import Spanner。
- 使用 REST API 或 Google Cloud CLI 运行从 Spanner 到 Cloud Storage 的导出作业或import作业(也采用 Avro 格式)。
适用于 Spanner 的 Dataflow 连接器是 Apache Beam Java SDK 的一部分,它提供了一个用于执行上述操作的 API。如需详细了解下文讨论的一些概念(例如 PCollection 对象和转换),请参阅 Apache Beam 编程指南。
将连接器添加到您的 Maven 项目
要将 Google Cloud Dataflow 连接器添加到 Maven 项目,请将 beam-sdks-java-io-google-cloud-platform
Maven 软件工件作为依赖项添加到 pom.xml
文件中。
例如,假设您的 pom.xml
文件将 beam.version
设置为适当的版本号,您将添加以下依赖项:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
从 Spanner 读取数据
要从 Spanner 读取数据,请应用 SpannerIO.read() 转换。使用 SpannerIO.Read
类中的方法配置读取操作。应用转换后将返回 PCollection<Struct>
,其中集合中的每个元素表示读取操作返回的单个行。从 Spanner 中读取数据时,可以使用和不使用特定的 SQL 查询,具体取决于所需的输出。
应用 SpannerIO.read()
转换后将通过执行强读来返回一致的数据视图。除非另行指定,否则将在您开始读取时对读取结果截取快照。如需详细了解 Spanner 可执行的不同类型的读取操作,请参阅读取。
使用查询读取数据
如需从 Spanner 中读取特定的一组数据,请使用 SpannerIO.Read.withQuery()
方法配置转换,以指定 SQL 查询。例如:
在不指定查询的情况下读取数据
如需在不使用查询的情况下从数据库读取数据,您可以使用 SpannerIO.Read.withTable() 方法指定表名称,并使用 SpannerIO.Read.withColumns() 方法指定要读取的列的列表。例如:
GoogleSQL
PostgreSQL
如需限制读取的行数,您可以使用 SpannerIO.Read.withKeySet() 方法指定要读取的一组主键。
您还可以使用指定的二级索引来读取表。与 readUsingIndex() API 调用一样,索引必须包含查询结果中显示的所有数据。
为此,请按照上例所示指定表,并使用 SpannerIO.Read.withIndex()
方法指定包含所需列值的索引。索引必须存储转换需要读取的所有列。基表的主键是隐式存储的。例如,如需读取使用索引 SongsBySongName
的 Songs
表,请使用以下代码:
GoogleSQL
PostgreSQL
控制事务数据的过时
转换必定在一致的数据快照上执行。要控制数据的过时,请使用 SpannerIO.Read.withTimestampBound()
方法。如需了解详细信息,请参阅事务。
在同一事务中从多个表读取数据
要在同一时间从多个表读取数据以确保数据一致性,请在单个事务中执行所有读取操作。为此,请应用 createTransaction()
转换创建 PCollectionView<Transaction>
对象,然后由该对象创建事务。您可以使用 SpannerIO.Read.withTransaction()
将结果视图传递给读取操作。
GoogleSQL
PostgreSQL
从所有可用表中读取数据
您可以从 Spanner 数据库的所有可用表中读取数据。
GoogleSQL
PostgreSQL
对不受支持的查询进行问题排查
Dataflow 连接器仅支持满足以下条件的 Spanner SQL 查询:查询执行计划中的第一个运算符是分布式联合运算符。如果您尝试使用查询从 Spanner 读取数据,但收到一个指出查询 does not have a DistributedUnion at
the root
的异常,请按照了解 Spanner 如何执行查询中的步骤,使用 Google Cloud 控制台检索查询的执行计划。
如果 SQL 查询不受支持,请将其简化为满足条件的查询,条件是查询执行计划中的第一个运算符是分布式联合运算符。请删除聚合函数、表连接以及运算符 DISTINCT
、GROUP BY
和 ORDER
,因为这些运算符最有可能会阻止查询。
创建要写入的变更
除非对 Java 流水线来说绝对必要,否则请使用 Mutation
类的 newInsertOrUpdateBuilder()
方法,而不要使用 newInsertBuilder()
方法。对于 Python 流水线,请使用 SpannerInsertOrUpdate()
而非 SpannerInsert()
。Dataflow 提供“至少一次”保证,这意味着可以多次写入变更。因此,只有 INSERT
变更可能会产生 com.google.cloud.spanner.SpannerException: ALREADY_EXISTS
错误,导致流水线失败。为防止出现此错误,请改用 INSERT_OR_UPDATE
变更,该变更会添加新行或更新列值(如果该行已经存在)。可以多次应用 INSERT_OR_UPDATE
变更。
写入 Spanner 并转换数据
您可以使用 SpannerIO.write()
转换来执行一系列输入行变更,通过 Dataflow 连接器将数据写入 Spanner。Dataflow 连接器将多个变更组合为批次以提高效率。
以下示例说明了如何将写入转换应用于变更的 PCollection
:
GoogleSQL
PostgreSQL
如果转换在完成之前意外停止,则已经应用的变更将不会回滚。
以原子方式应用变更组
您可以使用 MutationGroup
类确保以原子方式同时应用一组变更。MutationGroup
中的变更必定会在同一个事务中提交,但可能会重试该事务。
当变更组包含的变更会影响在键空间中紧密存储的数据时,变更组的效果最佳。由于 Spanner 在父表中交错父表和子表数据,因此这些数据在键空间中始终相邻。我们建议您在构建变更组时,让它包含一个应用于父表的变更以及其他应用于子表的变更,或者使其所有变更都将修改在键空间中紧密存储的数据。如需详细了解 Spanner 如何存储父表和子表数据,请参阅架构和数据模型。如果您未围绕建议的表层次结构整理变更组,或者要访问的数据在键空间中并非彼此靠近,则 Spanner 可能需要执行两阶段提交,这会导致性能下降。如需了解详细信息,请参阅位置权衡。
要使用 MutationGroup
,请构建一个 SpannerIO.write()
转换并调用 SpannerIO.Write.grouped()
方法,该方法返回一个转换,然后您可以将该转换应用于 MutationGroup
对象的 PCollection
。
在创建 MutationGroup
时,列出的第一个变更变为主要变更。如果您的变更组同时影响父表和子表,那么主要变更应该是对父表的变更。否则,您可以使用任何变更作为主要变更。Dataflow 连接器使用主要变更来确定分区边界,以便高效地一起批处理变更。
例如,假设您的应用监视行为并标记有问题的用户行为以供审核。对于每个标记的行为,您需要更新 Users
表以阻止该用户访问您的应用,并且您还需要在 PendingReviews
表中记录该事件。为了确保两个表都以原子方式更新,请使用 MutationGroup
:
GoogleSQL
PostgreSQL
创建变更组时,作为参数提供的第一个变更为主要变更。在这种情况下,这两个表不相关,所以没有明确的主要变更。我们已将 userMutation
放在前面,选择它作为主要变更。虽然分别应用这两个变更会更快,但不能保证原子性,所以在这种情况下建立变更组是最好的选择。
后续步骤
- 详细了解设计 Apache Beam 数据管道。
- 使用 Dataflow 在 Google Cloud 控制台中导出和import Spanner 数据库。