流式传输 BigQuery 查询结果

此示例演示了如何使用 Cloud Functions 流式传输 BigQuery 查询的结果。


如需向 Cloud Run functions 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

using Google.Cloud.BigQuery.V2;
using Google.Cloud.Functions.Framework;
using Microsoft.AspNetCore.Http;
using System;
using System.Threading.Tasks;

namespace StreamBigQuery;

public class Function : IHttpFunction
    public async Task HandleAsync(HttpContext context)
        context.Response.ContentType = "text/plain";
        string projectId = Environment.GetEnvironmentVariable("GOOGLE_PROJECT_ID");
        BigQueryClient client = BigQueryClient.Create(projectId).WithDefaultLocation(Locations.UnitedStates);

        // Example to retrieve a large payload from BigQuery public dataset.
        string query = "SELECT title FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000";
        BigQueryParameter[] parameters = null;
        BigQueryResults results = await client.ExecuteQueryAsync(query, parameters, cancellationToken: context.RequestAborted);

        // Stream out the payload response by iterating rows.
        foreach (BigQueryRow row in results)
            await context.Response.WriteAsync($"{row["title"]}\n", context.RequestAborted);
            // Artificially add a 50ms delay per line, just to make the streaming nature clearer
            // when viewing the results in a browser.
            await Task.Delay(50);

如需向 Cloud Run functions 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

// Package responsestreaming contains a function that streams out a large payload in chunks.
package responsestreaming

import (


func init() {
	functions.HTTP("streamBigQuery", streamBigQuery)

// streamBigQuery retrieves a large payload from BigQuery public dataset and streams its rows.
func streamBigQuery(w http.ResponseWriter, r *http.Request) {
	projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
	if projectID == "" {
		fmt.Println("GOOGLE_CLOUD_PROJECT environment variable must be set.")
	ctx := context.Background()
	// Must include project ID in client.
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	defer client.Close()

	// Run the query and ensure the job finishes without error.
	rows, err := query(ctx, client)
	if err != nil {

	// Stream out the payload by iterating rows and flushing out the writer.
	streamResults(w, rows)

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {
	q := client.Query(
		"SELECT abstract FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000")
	q.Location = "US"
	return q.Read(ctx)

// streamResults streams results from a query of BigQuery's public dataset.
func streamResults(w io.Writer, it *bigquery.RowIterator) {
	for {
		var row []bigquery.Value
		err := it.Next(&row)
		if err == iterator.Done {
		fmt.Fprintln(w, row[0])
		if f, ok := w.(http.Flusher); ok {
			fmt.Fprintln(w, "Successfully flushed row")

如需向 Cloud Run functions 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.HttpRequest;
import com.google.cloud.functions.HttpResponse;
import java.io.BufferedWriter;
import java.io.IOException;

public class StreamBigQuery implements HttpFunction {
  public void service(HttpRequest request, HttpResponse response) {
    String query = "SELECT abstract FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000";
    streamQueryResult(query, response);

  public static void streamQueryResult(String query, HttpResponse response) {
    try {
      BufferedWriter writer = response.getWriter();
      // Initialize client that will be used to send requests.
      // This client only needs to be created once,
      // and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      TableResult results = bigquery.query(QueryJobConfiguration.of(query));

          row -> row.forEach(val -> {
            try {
              writer.write(val.getValue().toString() + "\n");
              System.out.println("Successfully flushed row");
            } catch (IOException e) {
              System.out.println("Could not get rows: " + e.toString());
    } catch (BigQueryException | InterruptedException | IOException e) {
      System.out.println("Query not performed: " + e.toString());

如需向 Cloud Run functions 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

use Psr\Http\Message\ServerRequestInterface;
use Google\Cloud\BigQuery\BigQueryClient;

function streamBigQuery(ServerRequestInterface $request)
    // Provide the Cloud Project ID by setting env var.
    $projectId = getenv('GOOGLE_PROJECT_ID');
    // Example large payload from BigQuery's public dataset.
    $bigQuery = new BigQueryClient(['projectId' => $projectId]);
    $queryJobConfig = $bigQuery->query(
        'SELECT abstract FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000'
    $queryResults = $bigQuery->runQuery($queryJobConfig);

    // Stream out large payload by iterating rows and flushing output.
    foreach ($queryResults as $row) {
        foreach ($row as $column => $value) {
            printf('%s' . PHP_EOL, json_encode($value));
    printf('Successfully streamed rows');

如需向 Cloud Run functions 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

from flask import Response, stream_with_context

import functions_framework
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

def stream_big_query_output(request):
    """HTTP Cloud Function.
        request (flask.Request): The request object.
        The response text, or any set of values that can be turned into a
        Response object using `make_response`
    # Example large query from public dataset
    query = """
    SELECT abstract
    FROM `bigquery-public-data.breathe.bioasq`
    LIMIT 1000
    query_job = client.query(query)  # Make an API request.

    def generate():
        for row in query_job:
            yield row[0]

    return Response(stream_with_context(generate()))

如需向 Cloud Run functions 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

require "functions_framework"
require "google/cloud/bigquery"

FunctionsFramework.http "stream_big_query" do |_request|
  # Provide the Project ID by setting the env variable or specifying directly.
  # project_id = "Your Google Cloud project ID"
  project_id = ENV["CLOUD_PROJECT_ID"]
  bigquery = Google::Cloud::Bigquery.new project: project_id
  # Example to retrieve a large payload from public BigQuery dataset.
  sql = "SELECT abstract FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000"

  results = bigquery.query sql do |config|
    config.location = "US"

  # Stream out the large payload by iterating rows.
  body = Enumerator.new do |y|
    results.each do |row|
      y << "#{row}\n"
  [200, {}, body]


如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅Google Cloud 示例浏览器