Fazer streaming dos resultados da consulta do BigQuery

Neste exemplo, demonstramos como fazer streaming dos resultados de uma consulta do BigQuery usando uma função do Cloud.

Exemplo de código

Para autenticar nas funções do Cloud Run, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


using Google.Cloud.BigQuery.V2;
using Google.Cloud.Functions.Framework;
using Microsoft.AspNetCore.Http;
using System;
using System.Threading.Tasks;

namespace StreamBigQuery;

public class Function : IHttpFunction
{
    public async Task HandleAsync(HttpContext context)
    {
        context.Response.ContentType = "text/plain";
        string projectId = Environment.GetEnvironmentVariable("GOOGLE_PROJECT_ID");
        BigQueryClient client = BigQueryClient.Create(projectId).WithDefaultLocation(Locations.UnitedStates);

        // Example to retrieve a large payload from BigQuery public dataset.
        string query = "SELECT title FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000";
        BigQueryParameter[] parameters = null;
        BigQueryResults results = await client.ExecuteQueryAsync(query, parameters, cancellationToken: context.RequestAborted);

        // Stream out the payload response by iterating rows.
        foreach (BigQueryRow row in results)
        {
            await context.Response.WriteAsync($"{row["title"]}\n", context.RequestAborted);
            // Artificially add a 50ms delay per line, just to make the streaming nature clearer
            // when viewing the results in a browser.
            await Task.Delay(50);
        }
    }
}

Para autenticar nas funções do Cloud Run, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


// Package responsestreaming contains a function that streams out a large payload in chunks.
package responsestreaming

import (
	"context"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"

	"cloud.google.com/go/bigquery"
	"github.com/GoogleCloudPlatform/functions-framework-go/functions"
	"google.golang.org/api/iterator"
)

func init() {
	functions.HTTP("streamBigQuery", streamBigQuery)
}

// streamBigQuery retrieves a large payload from BigQuery public dataset and streams its rows.
func streamBigQuery(w http.ResponseWriter, r *http.Request) {
	projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
	if projectID == "" {
		fmt.Println("GOOGLE_CLOUD_PROJECT environment variable must be set.")
		os.Exit(1)
	}
	ctx := context.Background()
	// Must include project ID in client.
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	// Run the query and ensure the job finishes without error.
	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}

	// Stream out the payload by iterating rows and flushing out the writer.
	streamResults(w, rows)
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {
	q := client.Query(
		"SELECT abstract FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000")
	q.Location = "US"
	return q.Read(ctx)
}

// streamResults streams results from a query of BigQuery's public dataset.
func streamResults(w io.Writer, it *bigquery.RowIterator) {
	for {
		var row []bigquery.Value
		err := it.Next(&row)
		if err == iterator.Done {
			break
		}
		fmt.Fprintln(w, row[0])
		if f, ok := w.(http.Flusher); ok {
			f.Flush()
			fmt.Fprintln(w, "Successfully flushed row")
		}
	}
}

Para autenticar nas funções do Cloud Run, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.HttpRequest;
import com.google.cloud.functions.HttpResponse;
import java.io.BufferedWriter;
import java.io.IOException;

public class StreamBigQuery implements HttpFunction {
  @Override
  public void service(HttpRequest request, HttpResponse response) {
    String query = "SELECT abstract FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000";
    streamQueryResult(query, response);
  }

  public static void streamQueryResult(String query, HttpResponse response) {
    try {
      BufferedWriter writer = response.getWriter();
      // Initialize client that will be used to send requests.
      // This client only needs to be created once,
      // and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      TableResult results = bigquery.query(QueryJobConfiguration.of(query));

      results.iterateAll().forEach(
          row -> row.forEach(val -> {
            try {
              writer.write(val.getValue().toString() + "\n");
              writer.flush();
              System.out.println("Successfully flushed row");
            } catch (IOException e) {
              System.out.println("Could not get rows: " + e.toString());
            }
          }));
    } catch (BigQueryException | InterruptedException | IOException e) {
      System.out.println("Query not performed: " + e.toString());
    }
  }
}

Para autenticar nas funções do Cloud Run, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

use Psr\Http\Message\ServerRequestInterface;
use Google\Cloud\BigQuery\BigQueryClient;

function streamBigQuery(ServerRequestInterface $request)
{
    // Provide the Cloud Project ID by setting env var.
    $projectId = getenv('GOOGLE_PROJECT_ID');
    // Example large payload from BigQuery's public dataset.
    $bigQuery = new BigQueryClient(['projectId' => $projectId]);
    $queryJobConfig = $bigQuery->query(
        'SELECT abstract FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000'
    );
    $queryResults = $bigQuery->runQuery($queryJobConfig);

    // Stream out large payload by iterating rows and flushing output.
    foreach ($queryResults as $row) {
        foreach ($row as $column => $value) {
            printf('%s' . PHP_EOL, json_encode($value));
            flush();
        }
    }
    printf('Successfully streamed rows');
}

Para autenticar nas funções do Cloud Run, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

from flask import Response, stream_with_context

import functions_framework
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()


@functions_framework.http
def stream_big_query_output(request):
    """HTTP Cloud Function.
    Args:
        request (flask.Request): The request object.
        <https://flask.palletsprojects.com/en/1.1.x/api/#incoming-request-data>
    Returns:
        The response text, or any set of values that can be turned into a
        Response object using `make_response`
        <https://flask.palletsprojects.com/en/1.1.x/api/#flask.make_response>.
    """
    # Example large query from public dataset
    query = """
    SELECT abstract
    FROM `bigquery-public-data.breathe.bioasq`
    LIMIT 1000
    """
    query_job = client.query(query)  # Make an API request.

    def generate():
        for row in query_job:
            yield row[0]

    return Response(stream_with_context(generate()))

Para autenticar nas funções do Cloud Run, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

require "functions_framework"
require "google/cloud/bigquery"

FunctionsFramework.http "stream_big_query" do |_request|
  # Provide the Project ID by setting the env variable or specifying directly.
  # project_id = "Your Google Cloud project ID"
  project_id = ENV["CLOUD_PROJECT_ID"]
  bigquery = Google::Cloud::Bigquery.new project: project_id
  # Example to retrieve a large payload from public BigQuery dataset.
  sql = "SELECT abstract FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000"

  results = bigquery.query sql do |config|
    config.location = "US"
  end

  # Stream out the large payload by iterating rows.
  body = Enumerator.new do |y|
    results.each do |row|
      y << "#{row}\n"
    end
  end
  [200, {}, body]
end

A seguir

Para pesquisar e filtrar exemplos de código de outros Google Cloud produtos, consulte a pesquisa de exemplos de código doGoogle Cloud .