使用 Dataflow 导入、导出和修改数据

Dataflow 是一种用于对数据进行转换并丰富数据内容的托管服务。借助 Spanner 的 Dataflow 连接器,您可以在 Dataflow 流水线中从 Spanner 读取数据和将数据写入 Spanner,并且可以选择转换或修改数据。您还可以创建在 Spanner 与其他 Google Cloud 产品之间传输数据的流水线。

建议使用 Dataflow 连接器高效地将数据批量移入和移出 Spanner,并对分区 DML 不支持的数据库执行大规模转换(例如移动表、需要进行 JOIN 连接的批量删除等)。在使用单个数据库时,您还可以使用其他方法来导入和导出数据:

  • 使用 Google Cloud 控制台将单个数据库从 Spanner 导出到 Cloud Storage(采用 Avro 格式)。
  • 使用 Google Cloud 控制台将数据库从导出到 Cloud Storage 的文件import回 Spanner。
  • 使用 REST API 或 Google Cloud CLI 运行从 Spanner 到 Cloud Storage 之间的导出import作业(同样采用 Avro 格式)。

Spanner 的 Dataflow 连接器是 Apache Beam Java SDK 的一部分,它提供了一个用于执行上述操作的 API。如需详细了解下文讨论的某些概念(例如 PCollection 对象和转换),请参阅 Apache Beam 编程指南

将连接器添加到 Maven 项目中

要将 Google Cloud Dataflow 连接器添加到 Maven 项目,请将 beam-sdks-java-io-google-cloud-platform Maven 软件工件作为依赖项添加到 pom.xml 文件中。

例如,假设您的 pom.xml 文件将 beam.version 设置为适当的版本号,您将添加以下依赖项:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

从 Spanner 读取数据

如需从 Spanner 读取数据,请应用 SpannerIO.read() 转换。使用 SpannerIO.Read 类中的方法配置读取操作。应用转换后将返回 PCollection<Struct>,其中集合中的每个元素表示读取操作返回的单个行。从 Spanner 中读取数据时,可选择使用或不使用特定 SQL 查询,具体取决于所需输出。

应用 SpannerIO.read() 转换后将通过执行强读来返回一致的数据视图。除非另行指定,否则将在您开始读取时对读取结果截取快照。如需详细了解 Spanner 可以执行的不同类型的读取,请参阅读取

使用查询读取数据

如需从 Spanner 读取特定的一组数据,请使用 SpannerIO.Read.withQuery() 方法指定 SQL 查询,以配置转换。例如:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

在不指定查询的情况下读取数据

如需在不使用查询的情况下从数据库中读取数据,您可以使用 SpannerIO.Read.withTable() 方法指定表名称,并使用 SpannerIO.Read.withColumns() 方法指定要读取的列的列表。例如:

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

如需限制读取的行数,您可以使用 SpannerIO.Read.withKeySet() 方法指定要读取的一组主键。

您还可以使用指定的二级索引来读取表。与 readUsingIndex() API 调用一样,索引必须包含查询结果中显示的所有数据。

为此,请按照上例所示指定该表,并使用 SpannerIO.Read.withIndex() 方法指定包含所需列值的索引。索引必须存储转换需要读取的所有列。基表的主键是隐式存储的。例如,如需使用索引 SongsBySongName 读取 Songs 表,请使用以下代码:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

控制事务数据的过时

转换必定在一致的数据快照上执行。要控制数据的过时,请使用 SpannerIO.Read.withTimestampBound() 方法。如需了解详细信息,请参阅事务

从同一事务中的多个表读取数据

要在同一时间从多个表读取数据以确保数据一致性,请在单个事务中执行所有读取操作。为此,请应用 createTransaction() 转换创建 PCollectionView<Transaction> 对象,然后由该对象创建事务。您可以使用 SpannerIO.Read.withTransaction() 将结果视图传递给读取操作。

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

从所有可用表中读取数据

您可以从 Spanner 数据库中的所有可用表中读取数据。

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

排查不受支持的查询问题

Dataflow 连接器仅支持满足如下条件的 Spanner SQL 查询:查询执行计划中的第一个运算符是分布式联合运算符。如果您尝试使用查询从 Spanner 读取数据,但收到指出查询为 does not have a DistributedUnion at the root 的异常,请按照了解 Spanner 如何执行查询中的步骤,使用 Google Cloud 控制台检索查询的执行计划。

如果 SQL 查询不受支持,请将其简化为满足条件的查询,条件是查询执行计划中的第一个运算符是分布式联合运算符。请删除聚合函数、表连接以及运算符 DISTINCTGROUP BYORDER,因为这些运算符最有可能会阻止查询。

为写入操作创建变更

请使用 Mutation 类的 newInsertOrUpdateBuilder() 方法,而不要使用 newInsertBuilder() 方法,除非 Java 流水线绝对必要。对于 Python 流水线,请使用 SpannerInsertOrUpdate() 而不是 SpannerInsert()。Dataflow 提供至少一次保证,这意味着可能会多次写入变更。因此,仅 INSERT 变更可能会产生 com.google.cloud.spanner.SpannerException: ALREADY_EXISTS 错误,导致流水线失败。为防止出现此错误,请改用 INSERT_OR_UPDATE 变更,该变更会添加新行或更新列值(如果该行已存在)。可以多次应用 INSERT_OR_UPDATE 变更。

写入 Spanner 并转换数据

您可以使用 SpannerIO.write() 转换来执行一系列输入行变更,从而通过 Dataflow 连接器将数据写入 Spanner。Dataflow 连接器将多个变更组合成批次以提高效率。

以下示例说明了如何将写入转换应用于变更的 PCollection

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

如果转换在完成前意外停止,则已经应用的变更将不会回滚。

以原子方式应用变更组

您可以使用 MutationGroup 类确保以原子方式同时应用一组变更。MutationGroup 中的变更必定会在同一个事务中提交,但可能会重试该事务。

当变更组包含的变更会影响在键空间中紧密存储的数据时,变更组的效果最佳。由于 Spanner 将父表和子表数据一起交错在父表中,因此这些数据在键空间中始终靠得很近。我们建议您在构建变更组时,让它包含一个应用于父表的变更以及其他应用于子表的变更,或者使其所有变更都将修改在键空间中紧密存储的数据。如需详细了解 Spanner 如何存储父表和子表数据,请参阅架构和数据模型。如果您不围绕建议的表层次结构整理变更组,或者要访问的数据在键空间中不是邻近的,则 Spanner 可能需要执行两阶段提交,这会导致性能下降。如需了解详细信息,请参阅位置权衡

要使用 MutationGroup,请构建一个 SpannerIO.write() 转换并调用 SpannerIO.Write.grouped() 方法,该方法返回一个转换,然后您可以将该转换应用于 MutationGroup 对象的 PCollection

在创建 MutationGroup 时,列出的第一个变更变为主要变更。如果您的变更组同时影响父表和子表,那么主要变更应该是对父表的变更。否则,您可以使用任何变更作为主要变更。Dataflow 连接器使用主要变更来确定分区边界,以便高效地一起批处理变更。

例如,假设您的应用监视行为并标记有问题的用户行为以供审核。对于每个标记的行为,您需要更新 Users 表以阻止该用户访问您的应用,并且您还需要在 PendingReviews 表中记录该事件。为了确保两个表都以原子方式更新,请使用 MutationGroup

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

创建变更组时,作为参数提供的第一个变更为主要变更。在这种情况下,这两个表不相关,所以没有明确的主要变更。我们已将 userMutation 放在前面,选择它作为主要变更。虽然分别应用这两个变更会更快,但不能保证原子性,所以在这种情况下建立变更组是最好的选择。

后续步骤