albums
// Spanner expects a Mutation object, so create it using the Album's data
.apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
@ProcessElement
public void processElement(ProcessContext c) {
Album album = c.element();
c.output(Mutation.newInsertOrUpdateBuilder("albums")
.set("singerId").to(album.singerId)
.set("albumId").to(album.albumId)
.set("albumTitle").to(album.albumTitle)
.build());
}
}))
// Write mutations to Spanner
.apply("WriteAlbums", SpannerIO.write()
.withInstanceId(instanceId)
.withDatabaseId(databaseId));