Diffuser les résultats de requêtes BigQuery en streaming

Cet exemple montre comment diffuser les résultats d'une requête BigQuery en streaming à l'aide d'une fonction Cloud.

Exemple de code

C#

Pour vous authentifier auprès de Cloud Run Functions, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


using Google.Cloud.BigQuery.V2;
using Google.Cloud.Functions.Framework;
using Microsoft.AspNetCore.Http;
using System;
using System.Threading.Tasks;

namespace StreamBigQuery;

public class Function : IHttpFunction
{
    public async Task HandleAsync(HttpContext context)
    {
        context.Response.ContentType = "text/plain";
        string projectId = Environment.GetEnvironmentVariable("GOOGLE_PROJECT_ID");
        BigQueryClient client = BigQueryClient.Create(projectId).WithDefaultLocation(Locations.UnitedStates);

        // Example to retrieve a large payload from BigQuery public dataset.
        string query = "SELECT title FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000";
        BigQueryParameter[] parameters = null;
        BigQueryResults results = await client.ExecuteQueryAsync(query, parameters, cancellationToken: context.RequestAborted);

        // Stream out the payload response by iterating rows.
        foreach (BigQueryRow row in results)
        {
            await context.Response.WriteAsync($"{row["title"]}\n", context.RequestAborted);
            // Artificially add a 50ms delay per line, just to make the streaming nature clearer
            // when viewing the results in a browser.
            await Task.Delay(50);
        }
    }
}

Go

Pour vous authentifier auprès de Cloud Run Functions, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


// Package responsestreaming contains a function that streams out a large payload in chunks.
package responsestreaming

import (
	"context"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"

	"cloud.google.com/go/bigquery"
	"github.com/GoogleCloudPlatform/functions-framework-go/functions"
	"google.golang.org/api/iterator"
)

func init() {
	functions.HTTP("streamBigQuery", streamBigQuery)
}

// streamBigQuery retrieves a large payload from BigQuery public dataset and streams its rows.
func streamBigQuery(w http.ResponseWriter, r *http.Request) {
	projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
	if projectID == "" {
		fmt.Println("GOOGLE_CLOUD_PROJECT environment variable must be set.")
		os.Exit(1)
	}
	ctx := context.Background()
	// Must include project ID in client.
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	// Run the query and ensure the job finishes without error.
	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}

	// Stream out the payload by iterating rows and flushing out the writer.
	streamResults(w, rows)
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {
	q := client.Query(
		"SELECT abstract FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000")
	q.Location = "US"
	return q.Read(ctx)
}

// streamResults streams results from a query of BigQuery's public dataset.
func streamResults(w io.Writer, it *bigquery.RowIterator) {
	for {
		var row []bigquery.Value
		err := it.Next(&row)
		if err == iterator.Done {
			break
		}
		fmt.Fprintln(w, row[0])
		if f, ok := w.(http.Flusher); ok {
			f.Flush()
			fmt.Fprintln(w, "Successfully flushed row")
		}
	}
}

Java

Pour vous authentifier auprès de Cloud Run Functions, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.HttpRequest;
import com.google.cloud.functions.HttpResponse;
import java.io.BufferedWriter;
import java.io.IOException;

public class StreamBigQuery implements HttpFunction {
  @Override
  public void service(HttpRequest request, HttpResponse response) {
    String query = "SELECT abstract FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000";
    streamQueryResult(query, response);
  }

  public static void streamQueryResult(String query, HttpResponse response) {
    try {
      BufferedWriter writer = response.getWriter();
      // Initialize client that will be used to send requests.
      // This client only needs to be created once,
      // and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      TableResult results = bigquery.query(QueryJobConfiguration.of(query));

      results.iterateAll().forEach(
          row -> row.forEach(val -> {
            try {
              writer.write(val.getValue().toString() + "\n");
              writer.flush();
              System.out.println("Successfully flushed row");
            } catch (IOException e) {
              System.out.println("Could not get rows: " + e.toString());
            }
          }));
    } catch (BigQueryException | InterruptedException | IOException e) {
      System.out.println("Query not performed: " + e.toString());
    }
  }
}

PHP

Pour vous authentifier auprès de Cloud Run Functions, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

use Psr\Http\Message\ServerRequestInterface;
use Google\Cloud\BigQuery\BigQueryClient;

function streamBigQuery(ServerRequestInterface $request)
{
    // Provide the Cloud Project ID by setting env var.
    $projectId = getenv('GOOGLE_PROJECT_ID');
    // Example large payload from BigQuery's public dataset.
    $bigQuery = new BigQueryClient(['projectId' => $projectId]);
    $queryJobConfig = $bigQuery->query(
        'SELECT abstract FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000'
    );
    $queryResults = $bigQuery->runQuery($queryJobConfig);

    // Stream out large payload by iterating rows and flushing output.
    foreach ($queryResults as $row) {
        foreach ($row as $column => $value) {
            printf('%s' . PHP_EOL, json_encode($value));
            flush();
        }
    }
    printf('Successfully streamed rows');
}

Python

Pour vous authentifier auprès de Cloud Run Functions, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from flask import Response, stream_with_context

import functions_framework
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()


@functions_framework.http
def stream_big_query_output(request):
    """HTTP Cloud Function.
    Args:
        request (flask.Request): The request object.
        <https://flask.palletsprojects.com/en/1.1.x/api/#incoming-request-data>
    Returns:
        The response text, or any set of values that can be turned into a
        Response object using `make_response`
        <https://flask.palletsprojects.com/en/1.1.x/api/#flask.make_response>.
    """
    # Example large query from public dataset
    query = """
    SELECT abstract
    FROM `bigquery-public-data.breathe.bioasq`
    LIMIT 1000
    """
    query_job = client.query(query)  # Make an API request.

    def generate():
        for row in query_job:
            yield row[0]

    return Response(stream_with_context(generate()))

Ruby

Pour vous authentifier auprès de Cloud Run Functions, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

require "functions_framework"
require "google/cloud/bigquery"

FunctionsFramework.http "stream_big_query" do |_request|
  # Provide the Project ID by setting the env variable or specifying directly.
  # project_id = "Your Google Cloud project ID"
  project_id = ENV["CLOUD_PROJECT_ID"]
  bigquery = Google::Cloud::Bigquery.new project: project_id
  # Example to retrieve a large payload from public BigQuery dataset.
  sql = "SELECT abstract FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000"

  results = bigquery.query sql do |config|
    config.location = "US"
  end

  # Stream out the large payload by iterating rows.
  body = Enumerator.new do |y|
    results.each do |row|
      y << "#{row}\n"
    end
  end
  [200, {}, body]
end

Étape suivante

Pour rechercher et filtrer des exemples de code pour d'autres produits Google Cloud, consultez l'explorateur d'exemples Google Cloud.