使用 Dataflow 导入、导出和修改数据

Dataflow 是一种用于对数据进行转换并丰富数据内容的托管服务。借助适用于 Spanner 的 Dataflow 连接器,您可以读取数据 在 Dataflow 流水线中将数据写入 Spanner,并向其写入数据; 选择性地转换或修改数据。你还可以创建流水线 在 Spanner 与其他 Google Cloud 产品之间传输数据的工作流。

推荐使用 Dataflow 连接器高效地将数据批量移入和移出 Spanner,以及对分区 DML 不支持的数据库执行大型转换,例如表移动、需要 JOIN 的批量删除等。工作时 对于单个数据库,您还可以使用其他方法导入和 导出数据:

  • 使用 Google Cloud 控制台从以下位置导出单个数据库: Spanner 到 Cloud Storage(采用 Avro 格式)。
  • 使用 Google Cloud 控制台将数据库import回 Spanner。
  • 使用 REST API 或 Google Cloud CLI 运行导出import将作业从 Spanner 导入 Cloud Storage,然后再回到 Cloud Storage 使用 Avro 格式)。

Spanner 的 Dataflow 连接器是 Apache Beam Java SDK 的组件,它提供了用于执行上述操作的 API。如需详细了解以下讨论的某些概念(例如 PCollection 对象和转换),请参阅 Apache Beam 编程指南

将连接器添加到 Maven 项目

要将 Google Cloud Dataflow 连接器添加到 Maven 项目,请将 beam-sdks-java-io-google-cloud-platform Maven 软件工件作为依赖项添加到 pom.xml 文件中。

例如,假设您的 pom.xml 文件将 beam.version 设置为适当的版本号,您将添加以下依赖项:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

从 Spanner 读取数据

如需从 Spanner 读取数据,请应用 SpannerIO.read() 转换。使用 SpannerIO.Read 类中的方法配置读取操作。应用转换后将返回 PCollection<Struct>,其中集合中的每个元素表示读取操作返回的单个行。无论是否使用特定 SQL,您都可以从 Spanner 读取数据 查询,具体取决于您想要的输出。

应用 SpannerIO.read() 转换后将通过执行强读来返回一致的数据视图。除非另行指定,否则将在您开始读取时对读取结果截取快照。查看阅读以了解详情 有关 Spanner 可执行的不同类型读取的信息。

使用查询读取数据

如需从 Spanner 读取特定的一组数据,请使用 SpannerIO.Read.withQuery() 方法指定 SQL 查询,以此来配置转换。例如:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

在不指定查询的情况下读取数据

如需在不使用查询的情况下从数据库读取数据,可以指定一个表 SpannerIO.Read.withTable() 方法指定名称,并指定一个 使用 SpannerIO.Read.withColumns() 读取的列的列表 方法。例如:

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

要限制读取的行数,您可以使用 SpannerIO.Read.withKeySet() 方法。

您还可以使用指定的二级索引来读取表。与 readUsingIndex() API 调用,则索引必须包含 是否出现在查询结果中。

为此,请按上例所示指定表,并使用 SpannerIO.Read.withIndex() 方法指定包含所需列值的索引。该索引必须存储转换需要读取的所有列。基表的主键是 隐式存储。例如,使用索引读取 SongsSongsBySongName,您需要使用 以下代码:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

控制事务数据的过时

转换必定在一致的数据快照上执行。要控制数据的过时,请使用 SpannerIO.Read.withTimestampBound() 方法。如需了解详细信息,请参阅事务

在同一事务中从多个表读取数据

要在同一时间从多个表读取数据以确保数据一致性,请在单个事务中执行所有读取操作。为此,请应用 createTransaction() 转换创建 PCollectionView<Transaction> 对象,然后由该对象创建事务。您可以使用 SpannerIO.Read.withTransaction() 将结果视图传递给读取操作。

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

从所有可用表中读取数据

您可以从 Spanner 数据库的所有可用表中读取数据。

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

对不受支持的查询进行问题排查

Dataflow 连接器仅支持满足条件的 Spanner SQL 查询,条件是查询执行计划中的第一个运算符是分布式联合运算符。如果您尝试使用查询从 Spanner 读取数据,并且 您收到一个异常,指出查询 does not have a DistributedUnion at the root,请按照了解 Spanner 如何执行的步骤进行操作 查询,以便使用 Google Cloud 控制台。

如果 SQL 查询不受支持,请将其简化为满足条件的查询,条件是查询执行计划中的第一个运算符是分布式联合运算符。请删除聚合函数、表连接以及运算符 DISTINCTGROUP BYORDER,因为这些运算符最有可能会阻止查询。

创建要写入的变更

使用 Mutation 类的 newInsertOrUpdateBuilder() 方法,而不是 newInsertBuilder() 方法 除非对于 Java 流水线绝对必要对于 Python 流水线,请使用 SpannerInsertOrUpdate()(原价) SpannerInsert()。Dataflow 提供 “至少一次”保证,这意味着可以将变更写入 多次进行。因此,只有 INSERT 变更可能生成 com.google.cloud.spanner.SpannerException: ALREADY_EXISTS 个错误会导致 导致流水线出现故障为防止出现此错误,请使用 INSERT_OR_UPDATE 变更,它会添加新行或更新列值(如果该行 已存在。可以多次应用 INSERT_OR_UPDATE 变更。

写入 Spanner 并转换数据

您可以通过 Dataflow 将数据写入 Spanner 使用 SpannerIO.write() 转换执行 输入行变更的集合。Dataflow 连接器组 批量变更,以提高效率。

以下示例说明了如何将写入转换应用于变更的 PCollection

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

如果转换在完成之前意外停止,则已经 将不会回滚。

以原子方式应用变更组

您可以使用 MutationGroup 类确保以原子方式同时应用一组变更。MutationGroup 中的变更必定会在同一个事务中提交,但可能会重试该事务。

当变更组包含的变更会影响在键空间中紧密存储的数据时,变更组的效果最佳。由于 Spanner 将父表和子表数据交错在父表中, 在键空间中始终相距很近。我们建议您在构建变更组时,让它包含一个应用于父表的变更以及其他应用于子表的变更,或者使其所有变更都将修改在键空间中紧密存储的数据。如需详细了解 Spanner 如何存储父级和 子表数据,请参阅架构和数据模型。如果您不整理 更改组。 在键空间内不相邻,Spanner 可能会 需要执行两阶段提交,这会导致性能下降。如需了解详细信息,请参阅位置权衡

要使用 MutationGroup,请构建一个 SpannerIO.write() 转换并调用 SpannerIO.Write.grouped() 方法,该方法返回一个转换,然后您可以将该转换应用于 MutationGroup 对象的 PCollection

在创建 MutationGroup 时,列出的第一个变更变为主要变更。如果您的变更组同时影响父表和子表,那么主要变更应该是对父表的变更。否则,您可以使用任何变更作为主要变更。Dataflow 连接器使用主要变更来确定分区边界,以便高效地一起批处理变更。

例如,假设您的应用监视行为并标记有问题的用户行为以供审核。对于每个标记的行为,您需要更新 Users 表以阻止该用户访问您的应用,并且您还需要在 PendingReviews 表中记录该事件。为了确保两个表都以原子方式更新,请使用 MutationGroup

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

创建变更组时,作为参数提供的第一个变更为主要变更。在这种情况下,这两个表不相关,所以没有明确的主要变更。我们已将 userMutation 放在前面,选择它作为主要变更。虽然分别应用这两个变更会更快,但不能保证原子性,所以在这种情况下建立变更组是最好的选择。

后续步骤