使用 Arrow 数据格式下载表数据,并将数据反序列化为行对象。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
Java
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions;
import com.google.common.base.Preconditions;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
public class StorageArrowSample {
/*
* SimpleRowReader handles deserialization of the Apache Arrow-encoded row batches transmitted
* from the storage API using a generic datum decoder.
*/
private static class SimpleRowReader implements AutoCloseable {
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
// Decoder object will be reused to avoid re-allocation and too much garbage collection.
private final VectorSchemaRoot root;
private final VectorLoader loader;
public SimpleRowReader(ArrowSchema arrowSchema) throws IOException {
Schema schema =
MessageSerializer.deserializeSchema(
new ReadChannel(
new ByteArrayReadableSeekableByteChannel(
arrowSchema.getSerializedSchema().toByteArray())));
Preconditions.checkNotNull(schema);
List<FieldVector> vectors = new ArrayList<>();
for (Field field : schema.getFields()) {
vectors.add(field.createVector(allocator));
}
root = new VectorSchemaRoot(vectors);
loader = new VectorLoader(root);
}
/**
* Sample method for processing Arrow data which only validates decoding.
*
* @param batch object returned from the ReadRowsResponse.
*/
public void processRows(ArrowRecordBatch batch) throws IOException {
org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch =
MessageSerializer.deserializeRecordBatch(
new ReadChannel(
new ByteArrayReadableSeekableByteChannel(
batch.getSerializedRecordBatch().toByteArray())),
allocator);
loader.load(deserializedBatch);
// Release buffers from batch (they are still held in the vectors in root).
deserializedBatch.close();
System.out.println(root.contentToTSVString());
// Release buffers from vectors in root.
root.clear();
}
@Override
public void close() {
root.close();
allocator.close();
}
}
public static void main(String... args) throws Exception {
// Sets your Google Cloud Platform project ID.
// String projectId = "YOUR_PROJECT_ID";
String projectId = args[0];
Integer snapshotMillis = null;
if (args.length > 1) {
snapshotMillis = Integer.parseInt(args[1]);
}
try (BigQueryReadClient client = BigQueryReadClient.create()) {
String parent = String.format("projects/%s", projectId);
// This example uses baby name data from the public datasets.
String srcTable =
String.format(
"projects/%s/datasets/%s/tables/%s",
"bigquery-public-data", "usa_names", "usa_1910_current");
// We specify the columns to be projected by adding them to the selected fields,
// and set a simple filter to restrict which rows are transmitted.
TableReadOptions options =
TableReadOptions.newBuilder()
.addSelectedFields("name")
.addSelectedFields("number")
.addSelectedFields("state")
.setRowRestriction("state = \"WA\"")
.build();
// Start specifying the read session we want created.
ReadSession.Builder sessionBuilder =
ReadSession.newBuilder()
.setTable(srcTable)
// This API can also deliver data serialized in Apache Avro format.
// This example leverages Apache Arrow.
.setDataFormat(DataFormat.ARROW)
.setReadOptions(options);
// Optionally specify the snapshot time. When unspecified, snapshot time is "now".
if (snapshotMillis != null) {
Timestamp t =
Timestamp.newBuilder()
.setSeconds(snapshotMillis / 1000)
.setNanos((int) ((snapshotMillis % 1000) * 1000000))
.build();
TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build();
sessionBuilder.setTableModifiers(modifiers);
}
// Begin building the session creation request.
CreateReadSessionRequest.Builder builder =
CreateReadSessionRequest.newBuilder()
.setParent(parent)
.setReadSession(sessionBuilder)
.setMaxStreamCount(1);
ReadSession session = client.createReadSession(builder.build());
// Setup a simple reader and start a read session.
try (SimpleRowReader reader = new SimpleRowReader(session.getArrowSchema())) {
// Assert that there are streams available in the session. An empty table may not have
// data available. If no sessions are available for an anonymous (cached) table, consider
// writing results of a query to a named table rather than consuming cached results
// directly.
Preconditions.checkState(session.getStreamsCount() > 0);
// Use the first stream to perform reading.
String streamName = session.getStreams(0).getName();
ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadStream(streamName).build();
// Process each block of rows as they arrive and decode using our simple row reader.
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
Preconditions.checkState(response.hasArrowRecordBatch());
reader.processRows(response.getArrowRecordBatch());
}
}
}
}
}
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。