Dataflow 是一种用于对数据进行转换并丰富数据内容的托管服务。借助适用于 Spanner 的 Dataflow 连接器,您可以读取数据 在 Dataflow 流水线中将数据写入 Spanner,并向其写入数据; 选择性地转换或修改数据。你还可以创建流水线 在 Spanner 与其他 Google Cloud 产品之间传输数据的工作流。
推荐使用 Dataflow 连接器高效地将数据批量移入和移出 Spanner,以及对分区 DML 不支持的数据库执行大型转换,例如表移动、需要 JOIN 的批量删除等。工作时 对于单个数据库,您还可以使用其他方法导入和 导出数据:
- 使用 Google Cloud 控制台从以下位置导出单个数据库: Spanner 到 Cloud Storage(采用 Avro 格式)。
- 使用 Google Cloud 控制台将数据库import回 Spanner。
- 使用 REST API 或 Google Cloud CLI 运行导出或 import将作业从 Spanner 导入 Cloud Storage,然后再回到 Cloud Storage 使用 Avro 格式)。
Spanner 的 Dataflow 连接器是 Apache Beam Java SDK 的组件,它提供了用于执行上述操作的 API。如需详细了解以下讨论的某些概念(例如 PCollection 对象和转换),请参阅 Apache Beam 编程指南。
将连接器添加到 Maven 项目
要将 Google Cloud Dataflow 连接器添加到 Maven 项目,请将 beam-sdks-java-io-google-cloud-platform
Maven 软件工件作为依赖项添加到 pom.xml
文件中。
例如,假设您的 pom.xml
文件将 beam.version
设置为适当的版本号,您将添加以下依赖项:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
从 Spanner 读取数据
如需从 Spanner 读取数据,请应用 SpannerIO.read() 转换。使用 SpannerIO.Read
类中的方法配置读取操作。应用转换后将返回 PCollection<Struct>
,其中集合中的每个元素表示读取操作返回的单个行。无论是否使用特定 SQL,您都可以从 Spanner 读取数据
查询,具体取决于您想要的输出。
应用 SpannerIO.read()
转换后将通过执行强读来返回一致的数据视图。除非另行指定,否则将在您开始读取时对读取结果截取快照。查看阅读以了解详情
有关 Spanner 可执行的不同类型读取的信息。
使用查询读取数据
如需从 Spanner 读取特定的一组数据,请使用 SpannerIO.Read.withQuery()
方法指定 SQL 查询,以此来配置转换。例如:
在不指定查询的情况下读取数据
如需在不使用查询的情况下从数据库读取数据,可以指定一个表 SpannerIO.Read.withTable() 方法指定名称,并指定一个 使用 SpannerIO.Read.withColumns() 读取的列的列表 方法。例如:
GoogleSQL
PostgreSQL
要限制读取的行数,您可以使用 SpannerIO.Read.withKeySet() 方法。
您还可以使用指定的二级索引来读取表。与 readUsingIndex() API 调用,则索引必须包含 是否出现在查询结果中。
为此,请按上例所示指定表,并使用 SpannerIO.Read.withIndex()
方法指定包含所需列值的索引。该索引必须存储转换需要读取的所有列。基表的主键是
隐式存储。例如,使用索引读取 Songs
表
SongsBySongName
,您需要使用
以下代码:
GoogleSQL
PostgreSQL
控制事务数据的过时
转换必定在一致的数据快照上执行。要控制数据的过时,请使用 SpannerIO.Read.withTimestampBound()
方法。如需了解详细信息,请参阅事务。
在同一事务中从多个表读取数据
要在同一时间从多个表读取数据以确保数据一致性,请在单个事务中执行所有读取操作。为此,请应用 createTransaction()
转换创建 PCollectionView<Transaction>
对象,然后由该对象创建事务。您可以使用 SpannerIO.Read.withTransaction()
将结果视图传递给读取操作。
GoogleSQL
PostgreSQL
从所有可用表中读取数据
您可以从 Spanner 数据库的所有可用表中读取数据。
GoogleSQL
PostgreSQL
对不受支持的查询进行问题排查
Dataflow 连接器仅支持满足条件的 Spanner SQL 查询,条件是查询执行计划中的第一个运算符是分布式联合运算符。如果您尝试使用查询从 Spanner 读取数据,并且
您收到一个异常,指出查询 does not have a DistributedUnion at
the root
,请按照了解 Spanner 如何执行的步骤进行操作
查询,以便使用
Google Cloud 控制台。
如果 SQL 查询不受支持,请将其简化为满足条件的查询,条件是查询执行计划中的第一个运算符是分布式联合运算符。请删除聚合函数、表连接以及运算符 DISTINCT
、GROUP BY
和 ORDER
,因为这些运算符最有可能会阻止查询。
创建要写入的变更
使用 Mutation
类的
newInsertOrUpdateBuilder()
方法,而不是
newInsertBuilder()
方法
除非对于 Java 流水线绝对必要对于 Python 流水线,请使用
SpannerInsertOrUpdate()
(原价)
SpannerInsert()
。Dataflow 提供
“至少一次”保证,这意味着可以将变更写入
多次进行。因此,只有 INSERT
变更可能生成
com.google.cloud.spanner.SpannerException: ALREADY_EXISTS
个错误会导致
导致流水线出现故障为防止出现此错误,请使用 INSERT_OR_UPDATE
变更,它会添加新行或更新列值(如果该行
已存在。可以多次应用 INSERT_OR_UPDATE
变更。
写入 Spanner 并转换数据
您可以通过 Dataflow 将数据写入 Spanner
使用 SpannerIO.write()
转换执行
输入行变更的集合。Dataflow 连接器组
批量变更,以提高效率。
以下示例说明了如何将写入转换应用于变更的 PCollection
:
GoogleSQL
PostgreSQL
如果转换在完成之前意外停止,则已经 将不会回滚。
以原子方式应用变更组
您可以使用 MutationGroup
类确保以原子方式同时应用一组变更。MutationGroup
中的变更必定会在同一个事务中提交,但可能会重试该事务。
当变更组包含的变更会影响在键空间中紧密存储的数据时,变更组的效果最佳。由于 Spanner 将父表和子表数据交错在父表中, 在键空间中始终相距很近。我们建议您在构建变更组时,让它包含一个应用于父表的变更以及其他应用于子表的变更,或者使其所有变更都将修改在键空间中紧密存储的数据。如需详细了解 Spanner 如何存储父级和 子表数据,请参阅架构和数据模型。如果您不整理 更改组。 在键空间内不相邻,Spanner 可能会 需要执行两阶段提交,这会导致性能下降。如需了解详细信息,请参阅位置权衡。
要使用 MutationGroup
,请构建一个 SpannerIO.write()
转换并调用 SpannerIO.Write.grouped()
方法,该方法返回一个转换,然后您可以将该转换应用于 MutationGroup
对象的 PCollection
。
在创建 MutationGroup
时,列出的第一个变更变为主要变更。如果您的变更组同时影响父表和子表,那么主要变更应该是对父表的变更。否则,您可以使用任何变更作为主要变更。Dataflow 连接器使用主要变更来确定分区边界,以便高效地一起批处理变更。
例如,假设您的应用监视行为并标记有问题的用户行为以供审核。对于每个标记的行为,您需要更新 Users
表以阻止该用户访问您的应用,并且您还需要在 PendingReviews
表中记录该事件。为了确保两个表都以原子方式更新,请使用 MutationGroup
:
GoogleSQL
PostgreSQL
创建变更组时,作为参数提供的第一个变更为主要变更。在这种情况下,这两个表不相关,所以没有明确的主要变更。我们已将 userMutation
放在前面,选择它作为主要变更。虽然分别应用这两个变更会更快,但不能保证原子性,所以在这种情况下建立变更组是最好的选择。
后续步骤
- 详细了解设计 Apache Beam 数据流水线。
- 使用 Dataflow 在 Google Cloud 控制台中导出和导入 Spanner 数据库。