Transmite datos a BigQuery

En lugar de usar un trabajo para cargar datos en BigQuery, puedes optar por transmitir estos datos en BigQuery de a un registro a la vez mediante el método tabledata.insertAll. Este enfoque permite consultar datos sin la demora de ejecutar un trabajo de carga.

En este documento, se analizan varias compensaciones importantes para considerar antes de elegir un enfoque, como las cuotas de transmisión, la disponibilidad y la coherencia de datos.

Antes de comenzar

  1. Asegúrate de tener acceso de escritura al conjunto de datos que contiene tu tabla de destino. La tabla debe existir antes de comenzar la escritura de datos, a menos que estés usando tablas de plantillas Para obtener más información sobre las tablas de plantillas, consulta Crear tablas automáticamente con tablas de plantillas.

  2. Verifica la Política de cuota de transmisión de datos.

  3. Comprueba que la facturación esté habilitada en tu proyecto.

    Descubre cómo puedes habilitar la facturación

    La transmisión no está disponible a través del nivel gratuito. Si intentas usar la transmisión sin habilitar la facturación, recibes el siguiente error: BigQuery: Streaming insert is not allowed in the free tier.

Verifica la disponibilidad de los datos

Los datos transmitidos están disponibles para el análisis en tiempo real en cuestión de segundos después de la primera inserción de transmisión en una tabla. En circunstancias inusuales (como en una interrupción), es posible que los datos en el búfer de transmisión no estén disponibles temporalmente. Cuando los datos no están disponibles, las consultas se siguen ejecutando correctamente, pero omiten algunos de los datos que aún se encuentran en el búfer de transmisión. Estas consultas contienen una advertencia en el campo errors de bigquery.jobs.getQueryResults como respuesta a bigquery.jobs.query o en el campo status.errors de bigquery.jobs.get.

Los datos pueden tardar hasta 90 minutos en estar disponibles para las operaciones de copia y exportación. Además, cuando se realiza una transmisión a una tabla particionada, los datos en el búfer de transmisión tienen un valor NULL para la seudocolumna _PARTITIONTIME. A fin de consultar si los datos están disponibles para copiar y exportar, verifica la respuesta tables.get de una sección llamada streamingBuffer. Si esa sección no aparece, tus datos deberían estar disponibles para copiarlos o exportarlos y deberían tener un valor no nulo para la seudocolumna _PARTITIONTIME. Además, el campo streamingBuffer.oldestEntryTime se puede aprovechar para identificar la edad de los registros en el búfer de transmisión.

Asegura la coherencia de los datos

A fin de ayudar a garantizar la coherencia de los datos, puedes suministrar insertId para cada fila insertada. BigQuery recuerda este ID durante al menos un minuto. Si intentas transmitir el mismo conjunto de filas dentro de ese período y la propiedad insertId está configurada, BigQuery usa la propiedad insertId para anular tus datos duplicados sobre la base del "mejor esfuerzo".

Es posible que tengas que reintentar una inserción debido a que no hay manera de determinar el estado de una inserción de transmisión en ciertas condiciones de error, como errores de red entre tu sistema y BigQuery o errores internos en BigQuery. Si reintentas una inserción, usa el mismo insertId en el mismo conjunto de filas para que BigQuery intente anular tus datos duplicados. Para obtener más información, consulta solución de problemas de inserciones de transmisión.

En el caso poco frecuente de que un centro de datos de Google pierda conectividad inesperadamente, puede que la deduplicación automática no sea posible.

Si tienes requisitos más estrictos para tus datos, Google Cloud Datastore es un servicio alternativo que admite transacciones.

Inhabilita la deduplicación de mejor esfuerzo

Puedes inhabilitar la deduplicación de mejor esfuerzo si no propagas el campo insertId para cada fila insertada. Cuando no propagas insertId, obtienes cuotas de transferencia de transmisión mucho más altas para la región US. Para obtener más información, consulta la página sobre Cuotas y límites.

Transmite datos a través de ubicaciones de datos

Puedes transmitir datos a conjuntos de datos tanto en EE.UU. como en la UE. Los datos pueden fluir a través de máquinas fuera de la ubicación del conjunto de datos, mientras que BigQuery procesa la solicitud insertAll. Si transmites datos desde una ubicación fuera de la ubicación del conjunto de datos, es posible que experimentes un aumento de la latencia y tasas de error.

Transmite a tablas particionadas por tiempo de transferencia

Puedes transmitir filas individuales en una tabla particionada con solicitudes insertAll. Según la configuración predeterminada, la partición de destino para los datos insertados se infiere a partir de la fecha actual en función de la hora UTC.

Si transmites datos a una tabla particionada por tiempo de transferencia, puedes anular la fecha de inferencia si proporcionas un decorador de partición como parte de la solicitud insertAll. Por ejemplo, puedes transmitir la partición correspondiente al 01-03-2017 para la tabla mydataset.table con el decorador de partición.

mydataset.table$20170301

Los datos recién llegados se van a asociar temporalmente con la partición UNPARTITIONED mientras se encuentren en el búfer de transmisión. Por lo tanto, una consulta puede excluir los datos en el búfer de transmisión de una consulta mediante el filtro de los valores NULL desde la partición UNPARTITIONED con una de las pseudocolumnas ([_PARTITIONTIME] o [_PARTITIONDATE], según tu tipo de datos preferido).

Cuando transmites con un decorador de partición, puedes transmitir a particiones dentro de los últimos 31 días pasados y 16 días futuros en relación con la fecha actual, según la hora UTC actual. A fin de escribir en particiones para fechas fuera de estos límites permitidos, puedes usar trabajos de carga o de consulta, como se describe en Anexar y sobrescribir datos de tablas particionadas.

Transmite a tablas particionadas

Puedes transmitir datos a una tabla particionada en una columna DATE o TIMESTAMP que esté entre 1 año en el pasado y 6 meses en el futuro. Los datos fuera de este rango son rechazados.

Cuando se transmiten los datos, los datos entre 7 días en el pasado y 3 días en el futuro se colocan en el búfer de transmisión y luego se extraen a las particiones correspondientes. Los datos fuera de esta ventana (pero dentro del rango de 1 año, 6 meses) se colocan en un búfer de transmisión y luego se extraen a la partición UNPARTITIONED. Cuando hay suficientes datos no particionados, se cargan en las particiones correspondientes.

Crea tablas automáticamente con tablas de plantillas

Un patrón común para transmitir datos en BigQuery es dividir una tabla lógica en muchas tablas más pequeñas a fin de crear conjuntos de datos más pequeños (por ejemplo, por ID de usuario). Para crear conjuntos de datos más pequeños por fecha, usa tablas particionadas. A fin de crear tablas más pequeñas que no estén basadas en la fecha, usa las tablas de plantillas para que BigQuery cree las tablas por ti.

Para usar una tabla de plantilla a través de la API de BigQuery, agrega un parámetro templateSuffix a tu solicitud insertAll. Para la herramienta de línea de comandos de bq, agrega la marca template_suffix a tu comando insert. Si BigQuery detecta un parámetro templateSuffix o la marca template_suffix, trata la tabla de destino como una plantilla base y crea una nueva tabla que comparte el mismo esquema que la tabla de destino y tiene un nombre que incluye el sufijo especificado:

<targeted_table_name> + <templateSuffix>

Cuando uses una tabla de plantillas, evita la sobrecarga de crear cada tabla individualmente y de especificar el esquema para cada tabla. Solo necesitas crear una plantilla única y suministrar sufijos diferentes a fin de que BigQuery pueda crear las nuevas tablas por ti. BigQuery coloca las tablas en el mismo proyecto y conjunto de datos. Las plantillas también facilitan la actualización del esquema debido a que solo necesita actualizar la tabla de plantillas.

Las tablas que se crean mediante las tablas de plantillas suelen estar disponibles en unos pocos segundos. En raras ocasiones pueden tardar más en estar disponibles.

Cambia el esquema de la tabla de plantillas

Si cambias un esquema de tabla de plantilla, todas las tablas que se generen posteriormente usarán el esquema actualizado. Las tablas generadas anteriormente no se verán afectadas, a menos que la tabla existente aún tenga un búfer de transmisión.

Para las tablas existentes que todavía tienen un búfer de transmisión, si modificas el esquema de la tabla de la plantilla de una manera compatible con versiones anteriores, el esquema de esas tablas generadas de forma activa también se actualizará. Sin embargo, si modificas el esquema de la tabla de plantillas de una manera no compatible con versiones anteriores, se perderán todos los datos almacenados en el búfer que usen el esquema anterior. Además, no podrás transmitir datos nuevos a tablas generadas existentes que usen el esquema anterior, pero que ahora no es compatible.

Después de cambiar un esquema de tabla de plantilla, espera hasta que los cambios se hayan propagado antes de intentar insertar nuevos datos o consultar tablas generadas. Las solicitudes para insertar nuevos campos deben ser correctas en unos pocos minutos. Los intentos de consultar los nuevos campos pueden requerir más tiempo, hasta 90 minutos.

Si deseas cambiar el esquema de una tabla generada, no cambies el esquema hasta que la transmisión a través de la tabla de plantilla haya terminado y la sección de estadísticas de transmisión de la tabla generada no esté en la respuesta de tables.get(), lo que indica que no hay datos almacenados en el búfer en la tabla.

Detalles de la tabla de plantillas

Valor del sufijo de plantilla
El valor templateSuffix (o --template_suffix) debe contener solo letras (a-z, A-Z), números (0-9) o guiones bajos (_). La longitud máxima combinada del nombre de la tabla y el sufijo de la tabla es de 1,024 caracteres.
Cuota
Las mismas cuotas se aplican a todas las tablas, ya sean basadas en plantillas o creadas manualmente.
Tiempo de vida
La tabla generada hereda su fecha de caducidad del conjunto de datos. Al igual que con los datos de transmisión normales, las tablas generadas no se pueden copiar ni exportar inmediatamente.
Anulación de duplicación
La anulación de duplicación solo ocurre entre referencias uniformes a una tabla de destino. Por ejemplo, si realizas una transmisión simultánea a una tabla generada con ambas tablas de plantillas y un comando regular insertAll, no se produce ninguna anulación de duplicación entre las filas insertadas por las tablas de plantillas y un comando regular insertAll.
Vistas
La tabla de plantillas y las tablas generadas no deben ser vistas.

Casos prácticos de ejemplo

Registro de eventos de grandes volúmenes

Si tienes una app que recopile una gran cantidad de datos en tiempo real, las inserciones de transmisión pueden ser una buena opción. En general, estos tipos de apps tienen los siguientes criterios:

  • No transaccional. Grandes volúmenes, filas agregadas continuamente. La app puede tolerar una mínima posibilidad de que se produzca una duplicación o que los datos no estén disponibles temporalmente.
  • Análisis agregado. Generalmente, las consultas se realizan para el análisis de tendencias, a diferencia de la selección de registros simples o estrechos.

Un ejemplo de registro de eventos de gran volumen es el seguimiento de eventos. Imaginemos que tienes una app para dispositivos móviles que rastrea eventos. Tu app o servidores móviles podrían registrar de forma independiente las interacciones de los usuarios o los errores del sistema y transmitirlos a BigQuery. Podrías analizar estos datos para determinar tendencias generales, como áreas de alta interacción o problemas, y supervisar las condiciones de error en tiempo real.

Quita los duplicados manualmente

Puedes seguir el siguiente proceso manual para asegurarte de que no existan filas duplicadas una vez que hayas terminado la transmisión.

  1. Agrega el insertId como una columna en el esquema de tu tabla y el valor insertId en los datos de cada fila.
  2. Una vez que se haya detenido la transmisión, realiza la siguiente consulta para verificar si hay duplicados:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    Si el resultado es mayor que 1, existen duplicados.
  3. Para quitar duplicados, realiza la siguiente consulta. Debes especificar una tabla de destino, inhabilitar los resultados acoplados y permitir resultados grandes.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

Notas sobre la consulta para quitar duplicados:

  • La estrategia más segura para la consulta sobre quitar duplicados es dirigirse a una tabla nueva De manera alternativa, puedes dirigir la tabla de origen con la disposición de escritura WRITE_TRUNCATE.
  • La consulta de eliminación de duplicados agrega una columna row_number con el valor 1 al final del esquema de la tabla. La consulta usa una instrucción SELECT * EXCEPT desde SQL estándar para excluir la columna row_number de la tabla de destino. El prefijo #standardSQL habilita SQL estándar en esta consulta. De manera alternativa, puedes seleccionar por nombres de columna específicos para omitir esta columna.
  • Para consultar datos en vivo con duplicados que se quitaron, también puedes crear una vista sobre tu tabla mediante la consulta de eliminación de duplicados. Ten en cuenta que los costos de consulta con respecto a la vista se calcularán en función de las columnas seleccionadas en tu vista, lo que puede dar como resultado tamaños analizados de bytes grandes.

Solución de problemas de inserciones de transmisión

Para obtener más información sobre cómo solucionar errores durante las inserciones de transmisión, consulta Solución de problemas de inserciones de transmisión en la página de errores de solución de problemas.

Volver al principio

Ejemplos de inserciones de transmisión

C#

Antes de probar esta muestra, sigue las instrucciones de configuración para C# que se encuentran en la Guía de inicio rápido de BigQuery con bibliotecas cliente. Si necesitas más información, consulta la documentación de referencia de la API de BigQuery para C#.


using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go incluidas en la Guía de inicio rápido de BigQuery con bibliotecas cliente. A fin de obtener más información, consulta la Documentación de referencia de la API de Go de BigQuery.

// To run this sample, you will need to create (or reuse) a context and
// an instance of the bigquery client.  For example:
// import "cloud.google.com/go/bigquery"
// ctx := context.Background()
// client, err := bigquery.NewClient(ctx, "your-project-id")
u := client.Dataset(datasetID).Table(tableID).Inserter()
items := []*Item{
	// Item implements the ValueSaver interface.
	{Name: "Phred Phlyntstone", Age: 32},
	{Name: "Wylma Phlyntstone", Age: 29},
}
if err := u.Put(ctx, items); err != nil {
	return err
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java incluidas en la Guía de inicio rápido de BigQuery con bibliotecas cliente. A fin de obtener más información, consulta la documentación de referencia de la API de BigQuery para Java.

TableId tableId = TableId.of(datasetName, tableName);
// Values of the row to insert
Map<String, Object> rowContent = new HashMap<>();
rowContent.put("booleanField", true);
// Bytes are passed in base64
rowContent.put("bytesField", "Cg0NDg0="); // 0xA, 0xD, 0xD, 0xE, 0xD in base64
// Records are passed as a map
Map<String, Object> recordsContent = new HashMap<>();
recordsContent.put("stringField", "Hello, World!");
rowContent.put("recordField", recordsContent);
InsertAllResponse response =
    bigquery.insertAll(
        InsertAllRequest.newBuilder(tableId)
            .addRow(rowContent)
            // More rows can be added in the same RPC by invoking .addRow() on the builder.
            // You can also supply optional unique row keys to support de-duplication scenarios.
            .build());
if (response.hasErrors()) {
  // If any of the insertions failed, this lets you inspect the errors
  for (Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
    // inspect row error
  }
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración para Node.js que se encuentran en la Guía de inicio rápido de BigQuery con bibliotecas cliente. A fin de obtener más información, consulta la documentación de referencia de la API de BigQuery para Node.js.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  // const rows = [{name: 'Tom', age: 30}, {name: 'Jane', age: 32}];

  // Create a client
  const bigqueryClient = new BigQuery();

  // Insert data into a table
  await bigqueryClient
    .dataset(datasetId)
    .table(tableId)
    .insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}
insertRowsAsStream();

PHP

Antes de probar esta muestra, sigue las instrucciones de configuración de PHP que se encuentran en la Guía de inicio rápido de BigQuery con bibliotecas cliente. A fin de obtener más información, consulta la documentación de referencia de la API de BigQuery para PHP.

use Google\Cloud\BigQuery\BigQueryClient;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableId   = 'The BigQuery table ID';
// $data = [
//     "field1" => "value1",
//     "field2" => "value2",
// ];

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table($tableId);

$insertResponse = $table->insertRows([
    ['data' => $data],
    // additional rows can go here
]);
if ($insertResponse->isSuccessful()) {
    print('Data streamed into BigQuery successfully' . PHP_EOL);
} else {
    foreach ($insertResponse->failedRows() as $row) {
        foreach ($row['errors'] as $error) {
            printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
        }
    }
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python incluidas en la Guía de inicio rápido de BigQuery con bibliotecas cliente. A fin de obtener más información, consulta la Documentación de referencia de la API de Python de BigQuery.

# TODO(developer): Uncomment the lines below and replace with your values.
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'  # replace with your dataset ID
# For this sample, the table must already exist and have a defined schema
# table_id = 'my_table'  # replace with your table ID
# table_ref = client.dataset(dataset_id).table(table_id)
# table = client.get_table(table_ref)  # API request

rows_to_insert = [
    (u'Phred Phlyntstone', 32),
    (u'Wylma Phlyntstone', 29),
]

errors = client.insert_rows(table, rows_to_insert)  # API request

assert errors == []

Ruby

Antes de probar esta muestra, sigue las instrucciones de configuración para Ruby que se encuentran en la Guía de inicio rápido de BigQuery con bibliotecas cliente. A fin de obtener más información, consulta la documentación de referencia de la API de BigQuery para Ruby.

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

Volver al principio

¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Si necesitas ayuda, visita nuestra página de asistencia.