Transmite los resultados de consultas de BigQuery

En este ejemplo, se muestra cómo transmitir los resultados de una consulta de BigQuery con una función de Cloud Functions.

Muestra de código

C#

Para autenticarte en funciones de Cloud Run, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


using Google.Cloud.BigQuery.V2;
using Google.Cloud.Functions.Framework;
using Microsoft.AspNetCore.Http;
using System;
using System.Threading.Tasks;

namespace StreamBigQuery;

public class Function : IHttpFunction
{
    public async Task HandleAsync(HttpContext context)
    {
        context.Response.ContentType = "text/plain";
        string projectId = Environment.GetEnvironmentVariable("GOOGLE_PROJECT_ID");
        BigQueryClient client = BigQueryClient.Create(projectId).WithDefaultLocation(Locations.UnitedStates);

        // Example to retrieve a large payload from BigQuery public dataset.
        string query = "SELECT title FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000";
        BigQueryParameter[] parameters = null;
        BigQueryResults results = await client.ExecuteQueryAsync(query, parameters, cancellationToken: context.RequestAborted);

        // Stream out the payload response by iterating rows.
        foreach (BigQueryRow row in results)
        {
            await context.Response.WriteAsync($"{row["title"]}\n", context.RequestAborted);
            // Artificially add a 50ms delay per line, just to make the streaming nature clearer
            // when viewing the results in a browser.
            await Task.Delay(50);
        }
    }
}

Go

Para autenticarte en funciones de Cloud Run, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


// Package responsestreaming contains a function that streams out a large payload in chunks.
package responsestreaming

import (
	"context"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"

	"cloud.google.com/go/bigquery"
	"github.com/GoogleCloudPlatform/functions-framework-go/functions"
	"google.golang.org/api/iterator"
)

func init() {
	functions.HTTP("streamBigQuery", streamBigQuery)
}

// streamBigQuery retrieves a large payload from BigQuery public dataset and streams its rows.
func streamBigQuery(w http.ResponseWriter, r *http.Request) {
	projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
	if projectID == "" {
		fmt.Println("GOOGLE_CLOUD_PROJECT environment variable must be set.")
		os.Exit(1)
	}
	ctx := context.Background()
	// Must include project ID in client.
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	// Run the query and ensure the job finishes without error.
	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}

	// Stream out the payload by iterating rows and flushing out the writer.
	streamResults(w, rows)
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {
	q := client.Query(
		"SELECT abstract FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000")
	q.Location = "US"
	return q.Read(ctx)
}

// streamResults streams results from a query of BigQuery's public dataset.
func streamResults(w io.Writer, it *bigquery.RowIterator) {
	for {
		var row []bigquery.Value
		err := it.Next(&row)
		if err == iterator.Done {
			break
		}
		fmt.Fprintln(w, row[0])
		if f, ok := w.(http.Flusher); ok {
			f.Flush()
			fmt.Fprintln(w, "Successfully flushed row")
		}
	}
}

Java

Para autenticarte en funciones de Cloud Run, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.HttpRequest;
import com.google.cloud.functions.HttpResponse;
import java.io.BufferedWriter;
import java.io.IOException;

public class StreamBigQuery implements HttpFunction {
  @Override
  public void service(HttpRequest request, HttpResponse response) {
    String query = "SELECT abstract FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000";
    streamQueryResult(query, response);
  }

  public static void streamQueryResult(String query, HttpResponse response) {
    try {
      BufferedWriter writer = response.getWriter();
      // Initialize client that will be used to send requests.
      // This client only needs to be created once,
      // and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      TableResult results = bigquery.query(QueryJobConfiguration.of(query));

      results.iterateAll().forEach(
          row -> row.forEach(val -> {
            try {
              writer.write(val.getValue().toString() + "\n");
              writer.flush();
              System.out.println("Successfully flushed row");
            } catch (IOException e) {
              System.out.println("Could not get rows: " + e.toString());
            }
          }));
    } catch (BigQueryException | InterruptedException | IOException e) {
      System.out.println("Query not performed: " + e.toString());
    }
  }
}

PHP

Para autenticarte en funciones de Cloud Run, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

use Psr\Http\Message\ServerRequestInterface;
use Google\Cloud\BigQuery\BigQueryClient;

function streamBigQuery(ServerRequestInterface $request)
{
    // Provide the Cloud Project ID by setting env var.
    $projectId = getenv('GOOGLE_PROJECT_ID');
    // Example large payload from BigQuery's public dataset.
    $bigQuery = new BigQueryClient(['projectId' => $projectId]);
    $queryJobConfig = $bigQuery->query(
        'SELECT abstract FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000'
    );
    $queryResults = $bigQuery->runQuery($queryJobConfig);

    // Stream out large payload by iterating rows and flushing output.
    foreach ($queryResults as $row) {
        foreach ($row as $column => $value) {
            printf('%s' . PHP_EOL, json_encode($value));
            flush();
        }
    }
    printf('Successfully streamed rows');
}

Python

Para autenticarte en funciones de Cloud Run, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

from flask import Response, stream_with_context

import functions_framework
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()


@functions_framework.http
def stream_big_query_output(request):
    """HTTP Cloud Function.
    Args:
        request (flask.Request): The request object.
        <https://flask.palletsprojects.com/en/1.1.x/api/#incoming-request-data>
    Returns:
        The response text, or any set of values that can be turned into a
        Response object using `make_response`
        <https://flask.palletsprojects.com/en/1.1.x/api/#flask.make_response>.
    """
    # Example large query from public dataset
    query = """
    SELECT abstract
    FROM `bigquery-public-data.breathe.bioasq`
    LIMIT 1000
    """
    query_job = client.query(query)  # Make an API request.

    def generate():
        for row in query_job:
            yield row[0]

    return Response(stream_with_context(generate()))

Ruby

Para autenticarte en funciones de Cloud Run, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

require "functions_framework"
require "google/cloud/bigquery"

FunctionsFramework.http "stream_big_query" do |_request|
  # Provide the Project ID by setting the env variable or specifying directly.
  # project_id = "Your Google Cloud project ID"
  project_id = ENV["CLOUD_PROJECT_ID"]
  bigquery = Google::Cloud::Bigquery.new project: project_id
  # Example to retrieve a large payload from public BigQuery dataset.
  sql = "SELECT abstract FROM `bigquery-public-data.breathe.bioasq` LIMIT 1000"

  results = bigquery.query sql do |config|
    config.location = "US"
  end

  # Stream out the large payload by iterating rows.
  body = Enumerator.new do |y|
    results.each do |row|
      y << "#{row}\n"
    end
  end
  [200, {}, body]
end

¿Qué sigue?

Para buscar y filtrar muestras de código para otros productos de Google Cloud, consulta el navegador de muestra de Google Cloud.