Usa l'API Streaming legacy

Questo documento descrive come inserire flussi di dati in BigQuery utilizzando il metodo legacy tabledata.insertAll.

Per i nuovi progetti, ti consigliamo di utilizzare l'API BigQuery Storage Write anziché il metodo tabledata.insertAll. L'API Storage Write offre prezzi più bassi e funzionalità più affidabili, tra cui la semantica della consegna "exactly-once". Se stai eseguendo la migrazione di un progetto esistente dal metodo tabledata.insertAll all'API Storage Write, ti consigliamo di selezionare il stream predefinito. Il metodo tabledata.insertAll è ancora completamente supportato.

Prima di iniziare

  1. Assicurati di disporre dell'accesso in scrittura al set di dati che contiene la tabella di destinazione. La tabella deve esistere prima di iniziare a scrivere dati nella tabella, a meno che non utilizzi tabelle di modelli. Per ulteriori informazioni sulle tabelle dei modelli, consulta la sezione Creazione automatica di tabelle con le tabelle dei modelli.

  2. Controlla i criteri per le quote per i flussi di dati.

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  4. Lo streaming non è disponibile tramite il livello gratuito. Se tenti di utilizzare lo streaming senza attivare la fatturazione, ricevi il seguente errore: BigQuery: Streaming insert is not allowed in the free tier.

  5. Concedi i ruoli IAM (Identity and Access Management) che concedono agli utenti le autorizzazioni necessarie per eseguire ogni attività nel documento.

Autorizzazioni obbligatorie

Per inserire un flusso di dati in BigQuery, devi disporre delle seguenti autorizzazioni IAM:

  • bigquery.tables.updateData (consente di inserire dati nella tabella)
  • bigquery.tables.get (consente di ottenere i metadati della tabella)
  • bigquery.datasets.get (ti consente di ottenere i metadati del set di dati)
  • bigquery.tables.create (obbligatorio se utilizzi una tabella modello per creare la tabella automaticamente)

Ciascuno dei seguenti ruoli IAM predefiniti include le autorizzazioni necessarie per trasmettere dati in BigQuery:

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.admin

Per ulteriori informazioni sui ruoli e sulle autorizzazioni IAM in BigQuery, consulta Autorizzazioni e ruoli predefiniti.

Trasmetti il flusso di dati in BigQuery

C#

Prima di provare questo esempio, segui le istruzioni di configurazione di C# disponibili nella guida rapida di BigQuery sull'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API BigQuery C#.

Per eseguire l'autenticazione in BigQuery, configura Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per le librerie client.


using Google.Cloud.BigQuery.V2;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go disponibili nella guida rapida di BigQuery sull'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API BigQuery Go.

Per eseguire l'autenticazione in BigQuery, configura Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per le librerie client.

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %w", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java disponibili nella guida rapida di BigQuery sull'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API BigQuery Java.

Per eseguire l'autenticazione in BigQuery, configura Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per le librerie client.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");
    // TODO(developer): Replace the row id with a unique value for each row.
    String rowId = "ROW_ID";
    tableInsertRows(datasetName, tableName, rowId, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, String rowId, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can omit the unique row ids to disable de-duplication.
                  .addRow(rowId, rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js disponibili nella guida rapida di BigQuery sull'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API BigQuery Node.js.

Per eseguire l'autenticazione in BigQuery, configura Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per le librerie client.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  const rows = [
    {name: 'Tom', age: 30},
    {name: 'Jane', age: 32},
  ];

  // Insert data into a table
  await bigquery.dataset(datasetId).table(tableId).insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}

PHP

Prima di provare questo esempio, segui le istruzioni di configurazione di PHP disponibili nella guida rapida di BigQuery sull'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API BigQuery PHP.

Per eseguire l'autenticazione in BigQuery, configura Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per le librerie client.

use Google\Cloud\BigQuery\BigQueryClient;

/**
 * Stream data into bigquery
 *
 * @param string $projectId The project Id of your Google Cloud Project.
 * @param string $datasetId The BigQuery dataset ID.
 * @param string $tableId The BigQuery table ID.
 * @param string $data Json encoded data For eg,
 *    $data = json_encode([
 *       "field1" => "value1",
 *       "field2" => "value2",
 *    ]);
 */
function stream_row(
    string $projectId,
    string $datasetId,
    string $tableId,
    string $data
): void {
    // instantiate the bigquery table service
    $bigQuery = new BigQueryClient([
      'projectId' => $projectId,
    ]);
    $dataset = $bigQuery->dataset($datasetId);
    $table = $dataset->table($tableId);

    $data = json_decode($data, true);
    $insertResponse = $table->insertRows([
      ['data' => $data],
      // additional rows can go here
    ]);
    if ($insertResponse->isSuccessful()) {
        print('Data streamed into BigQuery successfully' . PHP_EOL);
    } else {
        foreach ($insertResponse->failedRows() as $row) {
            foreach ($row['errors'] as $error) {
                printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
            }
        }
    }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python disponibili nella guida rapida di BigQuery sull'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API BigQuery Python.

Per eseguire l'autenticazione in BigQuery, configura Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per le librerie client.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby disponibili nella guida rapida di BigQuery sull'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API BigQuery Ruby.

Per eseguire l'autenticazione in BigQuery, configura Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per le librerie client.

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

Non è necessario compilare il campo insertID quando inserisci le righe. L'esempio seguente mostra come evitare di inviare un valore insertID per ogni riga durante il flusso di dati.

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java disponibili nella guida rapida di BigQuery sull'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API BigQuery Java.

Per eseguire l'autenticazione in BigQuery, configura Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per le librerie client.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.ImmutableList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to insert rows without row ids in a table
public class TableInsertRowsWithoutRowIds {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    tableInsertRowsWithoutRowIds(datasetName, tableName);
  }

  public static void tableInsertRowsWithoutRowIds(String datasetName, String tableName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      final BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      // Create rows to insert
      Map<String, Object> rowContent1 = new HashMap<>();
      rowContent1.put("stringField", "Phred Phlyntstone");
      rowContent1.put("numericField", 32);
      Map<String, Object> rowContent2 = new HashMap<>();
      rowContent2.put("stringField", "Wylma Phlyntstone");
      rowContent2.put("numericField", 29);
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(TableId.of(datasetName, tableName))
                  // No row ids disable de-duplication, and also disable the retries in the Java
                  // library.
                  .setRows(
                      ImmutableList.of(
                          InsertAllRequest.RowToInsert.of(rowContent1),
                          InsertAllRequest.RowToInsert.of(rowContent2)))
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table without row ids");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python disponibili nella guida rapida di BigQuery sull'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API BigQuery Python.

Per eseguire l'autenticazione in BigQuery, configura Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per le librerie client.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Invia dati relativi a data e ora

Per i campi di data e ora, formatta i dati nel metodo tabledata.insertAll come segue:

Tipo Formato
DATE Una stringa nel formato "YYYY-MM-DD"
DATETIME Una stringa nel formato "YYYY-MM-DD [HH:MM:SS]"
TIME Una stringa nel formato "HH:MM:SS"
TIMESTAMP Il numero di secondi dal 1° gennaio 1970 (epoca di Unix) o una stringa nel formato "YYYY-MM-DD HH:MM[:SS]"

Invia dati intervallo

Per i campi di tipo RANGE<T>, formatta i dati nel metodo tabledata.insertAll come un oggetto JSON con due campi, start e end. I valori null per i campi start e end rappresentano confini illimitati. Questi campi devono avere lo stesso formato JSON supportato di tipo T, dove T può essere DATE, DATETIME e TIMESTAMP.

Nell'esempio seguente, il campo f_range_date rappresenta una colonna RANGE<DATE> in una tabella. Una riga viene inserita in questa colonna utilizzando l'API tabledata.insertAll.

{
    "f_range_date": {
        "start": "1970-01-02",
        "end": null
    }
}

Disponibilità dei dati degli stream

I dati sono disponibili per l'analisi in tempo reale utilizzando le query GoogleSQL subito dopo che BigQuery conferma correttamente una richiesta tabledata.insertAll.

Le righe trasferite di recente a una tabella partizionata per data di importazione hanno temporaneamente un valore NULL per la pseudo colonna _PARTITIONTIME. Per queste righe, BigQuery assegna il valore non NULL finale della colonna PARTITIONTIME in background, in genere entro pochi minuti. In rari casi, questa operazione può richiedere fino a 90 minuti.

Alcune righe trasferite di recente potrebbero non essere disponibili per la copia della tabella in genere per alcuni minuti. In rari casi, questa operazione può richiedere fino a 90 minuti. Per verificare se i dati sono disponibili per la copia della tabella, controlla la risposta tables.get per una sezione denominata streamingBuffer. Se la sezione streamingBuffer è assente, i tuoi dati sono disponibili per la copia. Puoi anche utilizzare il campo streamingBuffer.oldestEntryTime per identificare l'età dei record nel buffer del flusso.

Deduplicazione ottimale

Quando fornisci insertId per una riga inserita, BigQuery utilizza questo ID per supportare la deduplicazione ottimale per un massimo di un minuto. In altre parole, se trasmetti nella stessa tabella la stessa riga con lo stesso insertId più di una volta nel periodo di tempo in questione, BigQuery potrebbe deduplicare le molteplici occorrenze di quella riga, conservando solo una di queste occorrenze.

Il sistema prevede che anche le righe fornite con insertId identici siano identiche. Se due righe hanno insertId identici, non è deterministica quale riga viene conservata da BigQuery.

La deduplicazione è generalmente consigliata per scenari di nuovi tentativi in un sistema distribuito in cui non è possibile determinare lo stato di un inserimento di flussi di dati in determinate condizioni di errore, come errori di rete tra il sistema e BigQuery o errori interni in BigQuery. Se riprovi a eseguire un nuovo inserimento, utilizza lo stesso insertId per lo stesso insieme di righe in modo che BigQuery possa tentare di deduplicare i tuoi dati. Per ulteriori informazioni, consulta la sezione Risoluzione dei problemi relativi all'inserimento di flussi di dati.

La deduplicazione offerta da BigQuery è il massimo sforzo e non deve essere considerata come un meccanismo per garantire l'assenza di duplicati nei dati. Inoltre, BigQuery potrebbe ridurre in qualsiasi momento la qualità della deduplicazione massima possibile per garantire una maggiore affidabilità e disponibilità dei dati.

Se hai requisiti di deduplicazione rigorosi per i tuoi dati, Google Cloud Datastore è un servizio alternativo che supporta le transazioni.

Disattivazione della deduplicazione Best effort

Puoi disabilitare la deduplicazione best effort non compilando il campo insertId per ogni riga inserita. Questo è il metodo consigliato per inserire i dati.

Apache Beam e Dataflow

Per disabilitare la deduplicazione ottimale quando utilizzi il connettore BigQuery I/O di Apache Beam per Java, utilizza il metodo ignoreInsertIds().

Rimozione manuale dei duplicati

Per assicurarti che non esistano righe duplicate dopo aver completato il flusso di dati, utilizza la seguente procedura manuale:

  1. Aggiungi insertId come colonna nello schema della tabella e includi il valore insertId nei dati per ogni riga.
  2. Dopo aver interrotto il flusso di dati, esegui la query seguente per verificare la presenza di duplicati:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    Se il risultato è maggiore di 1, esistono duplicati.
  3. Per rimuovere i duplicati, esegui la query seguente. Specifica una tabella di destinazione, consenti risultati di grandi dimensioni e disabilita la suddivisione dei risultati.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

Note sulla query di rimozione duplicata:

  • La strategia più sicura per la query di rimozione duplicata è scegliere come target una nuova tabella. In alternativa, puoi scegliere come target la tabella di origine con disposizione di scrittura WRITE_TRUNCATE.
  • La query di rimozione duplicata aggiunge una colonna row_number con il valore 1 alla fine dello schema della tabella. La query utilizza un'istruzione SELECT * EXCEPT da GoogleSQL per escludere la colonna row_number dalla tabella di destinazione. Il prefisso #standardSQL attiva GoogleSQL per questa query. In alternativa, puoi omettere questa colonna in base a nomi di colonna specifici.
  • Per eseguire query sui dati in tempo reale con i duplicati rimossi, puoi anche creare una visualizzazione della tabella utilizzando la query di rimozione duplicata. Tieni presente che i costi delle query sulla visualizzazione sono calcolati in base alle colonne selezionate nella visualizzazione, il che può comportare una scansione di grandi dimensioni dei byte.

Flusso in tabelle partizionate in base al tempo

Quando trasmetti i flussi di dati a una tabella partizionata nel tempo, ogni partizione ha un buffer di flusso. Il buffer di flusso viene conservato quando esegui un job di caricamento, query o copia che sovrascrive una partizione impostando la proprietà writeDisposition su WRITE_TRUNCATE. Se vuoi rimuovere il buffer di streaming, verifica che sia vuoto chiamando tables.get sulla partizione.

Partizionamento in fase di importazione

Quando esegui il flusso di dati in una tabella partizionata per data di importazione, BigQuery deduce la partizione di destinazione dall'ora UTC attuale.

I dati appena arrivati vengono inseriti temporaneamente nella partizione __UNPARTITIONED__ mentre si trova nel buffer del flusso. Se la quantità di dati non partizionati è sufficiente, BigQuery partiziona i dati nella partizione corretta. Tuttavia, non è previsto lo SLA (accordo sul livello del servizio) per quanto tempo impiega i dati a essere trasferiti fuori dalla partizione __UNPARTITIONED__. Una query può escludere i dati nel buffer di flusso da una query filtrando i valori NULL dalla partizione __UNPARTITIONED__ utilizzando una delle pseudocolonne (_PARTITIONTIME o _PARTITIONDATE a seconda del tipo di dati preferito).

Se esegui il flusso di dati in una tabella partizionata giornaliera, puoi eseguire l'override dell'inferenza della data fornendo un decorator della partizione come parte della richiesta insertAll. Includi il decorator nel parametro tableId. Ad esempio, puoi eseguire un flusso di dati nella partizione corrispondente a 2021-03-01 per la tabella table1 utilizzando il decorator della partizione:

table1$20210301

Quando esegui il flusso di dati con un decorator di partizione, puoi trasmettere in streaming alle partizioni negli ultimi 31 giorni nell'ultimo e nei 16 giorni futuri in relazione alla data corrente, in base all'ora UTC attuale. Per scrivere nelle partizioni per le date che non rientrano in questi limiti consentiti, utilizza invece un job di caricamento o query, come descritto in Aggiunta e sovrascrittura di tabella partizionata partizionate.

Il flusso di dati utilizzando un decorator della partizione è supportato solo per le tabelle partizionate giornaliere. Non è supportato per le tabelle partizionate con cadenza oraria, mensile o annuale.

Per i test, puoi utilizzare il comando dell'interfaccia a riga di comando bq bq insert. Ad esempio, il seguente comando trasmette in streaming una singola riga a una partizione per la data 1° gennaio 2017 ($20170101) in una tabella partizionata denominata mydataset.mytable:

echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'

Partizionamento delle colonne in unità di tempo

Puoi inserire flussi di dati in una tabella partizionata in una colonna DATE, DATETIME o TIMESTAMP compresa tra 5 anni nel passato e 1 anno nel futuro. I dati al di fuori di questo intervallo vengono rifiutati.

Durante il flusso di dati, i dati vengono inizialmente posizionati nella partizione __UNPARTITIONED__. Se la quantità di dati non partizionati è sufficiente, BigQuery esegue automaticamente il partizionamento dei dati, inserendoli nella partizione appropriata. Tuttavia, non è previsto uno SLA (accordo sul livello del servizio) per il tempo necessario per lo spostamento dei dati dalla partizione __UNPARTITIONED__.

  • Nota: le partizioni giornaliere vengono elaborate in modo diverso rispetto alle partizioni orarie, mensili e annuali. Solo i dati al di fuori dell'intervallo di date (dagli ultimi 7 ai 3 giorni futuri) vengono estratti nella partizione UNPARTITIONED, in attesa di essere ripartizionati. D'altra parte, per la tabella partizionata ogni ora, i dati vengono sempre estratti nella partizione UNPARTITIONED e successivamente ripartizionati.

Crea tabelle automaticamente utilizzando le tabelle dei modelli

Le tabelle modello forniscono un meccanismo per suddividere una tabella logica in molte tabelle più piccole per creare set di dati più piccoli (ad esempio, per ID utente). Le tabelle di modello presentano una serie di limitazioni descritte di seguito. Invece, le tabelle partizionate e le tabelle in cluster sono i modi consigliati per ottenere questo comportamento.

Per utilizzare una tabella modello tramite l'API BigQuery, aggiungi un parametro templateSuffix alla richiesta insertAll. Per lo strumento a riga di comando bq, aggiungi il flag template_suffix al comando insert. Se BigQuery rileva un parametro templateSuffix o il flag template_suffix, tratta la tabella scelta come target come un modello di base. Crea una nuova tabella che condivide lo stesso schema della tabella scelta come target e ha un nome che include il suffisso specificato:

<targeted_table_name> + <templateSuffix>

L'utilizzo di una tabella modello evita l'overhead associato alla creazione di ogni singola tabella e alla specifica dello schema per ogni tabella. Devi creare un solo modello e fornire suffissi diversi in modo che BigQuery possa creare le nuove tabelle per te. BigQuery posiziona le tabelle nello stesso progetto e set di dati.

Le tabelle create utilizzando le tabelle modello sono in genere disponibili entro pochi secondi. In rare occasioni, potrebbe essere necessario più tempo prima che diventino disponibili.

Modifica lo schema della tabella del modello

Se modifichi lo schema di una tabella del modello, tutte le tabelle generate successivamente utilizzano lo schema aggiornato. Le tabelle generate in precedenza non sono interessate, a meno che la tabella esistente non abbia ancora un buffer di flusso.

Per le tabelle esistenti che hanno ancora un buffer di flusso, se modifichi lo schema della tabella del modello in modo compatibile con le versioni precedenti, viene aggiornato anche lo schema di queste tabelle generate attivamente con flussi. Tuttavia, se modifichi lo schema della tabella del modello in un modo non compatibile con le versioni precedenti, tutti i dati nel buffer che utilizzano lo schema precedente andranno persi. Inoltre, non puoi trasmettere nuovi dati alle tabelle generate esistenti che utilizzano il vecchio schema, ma ora incompatibile.

Dopo aver modificato lo schema di una tabella del modello, attendi la propagazione delle modifiche prima di provare a inserire nuovi dati o eseguire query sulle tabelle generate. Le richieste di inserimento di nuovi campi dovrebbero andare a buon fine entro pochi minuti. I tentativi di eseguire query sui nuovi campi potrebbero richiedere un'attesa più lunga, fino a 90 minuti.

Se vuoi modificare lo schema di una tabella generata, modifica lo schema solo dopo che è terminato il flusso della tabella del modello e che la sezione delle statistiche relative ai flussi di dati della tabella generata non è presente nella risposta tables.get(), il che indica che non è stato eseguito il buffering di nessun dato nella tabella.

Le tabelle partizionate e le tabelle in cluster non sono soggette alle limitazioni precedenti e sono il meccanismo consigliato.

Dettagli tabella modello

Valore suffisso modello
Il valore templateSuffix (o --template_suffix) deve contenere solo lettere (a-z, A-Z), numeri (0-9) o trattini bassi (_). La lunghezza massima combinata del nome e del suffisso della tabella è di 1024 caratteri.
Quota

Le tabelle di modello sono soggette a limitazioni per la quota di flussi di dati. Il tuo progetto può creare fino a 10 tabelle al secondo con tabelle modello, simili all'API tables.insert. Questa quota si applica solo alle tabelle in fase di creazione, non a quelle in fase di modifica.

Se la tua applicazione deve creare più di 10 tabelle al secondo, ti consigliamo di utilizzare le tabelle in cluster. Ad esempio, puoi inserire l'ID tabella ad alta cardinalità nella colonna della chiave di una singola tabella di clustering.

Durata

La tabella generata eredita la data di scadenza dal set di dati. Come con i normali dati in modalità flusso, le tabelle generate non possono essere copiate immediatamente.

Deduplicazione

La deduplicazione avviene solo tra riferimenti uniformi a una tabella di destinazione. Ad esempio, se esegui contemporaneamente il flusso di dati su una tabella generata utilizzando sia le tabelle modello sia un normale comando insertAll, non si verifica alcuna deduplicazione tra le righe inserite dalle tabelle del modello e un normale comando insertAll.

Viste

La tabella del modello e le tabelle generate non devono essere visualizzazioni.

Risoluzione dei problemi relativi all'inserimento di flussi di dati

Le seguenti sezioni descrivono come risolvere gli errori che si verificano quando esegui un flusso di dati in BigQuery utilizzando l'API di inserimento di flussi legacy. Per ulteriori informazioni su come risolvere gli errori di quota per l'inserimento di flussi di dati, consulta Errori di quota di inserimento dei flussi di dati.

Codici di risposta HTTP di errore

Se ricevi un codice di risposta HTTP di errore, ad esempio un errore di rete, non è possibile sapere se l'inserimento del flusso di dati è riuscito. Se provi a inviare di nuovo la richiesta, la tabella potrebbe contenere righe duplicate. Per proteggere la tabella dai duplicati, imposta la proprietà insertId quando invii la richiesta. BigQuery utilizza la proprietà insertId per la deduplicazione.

Se ricevi un errore di autorizzazione, un errore con il nome della tabella non valido o un errore di quota superata, non viene inserita alcuna riga e l'intera richiesta non va a buon fine.

Codici di risposta HTTP di operazione riuscita

Anche se ricevi un codice di risposta HTTP con esito positivo, devi controllare la proprietà insertErrors della risposta per determinare se l'inserimento delle righe è riuscito perché è possibile che BigQuery l'inserimento delle righe sia riuscito solo parzialmente. Potresti riscontrare uno dei seguenti scenari:

  • Tutte le righe sono state inserite correttamente. Se la proprietà insertErrors è un elenco vuoto, tutte le righe sono state inserite correttamente.
  • Alcune righe sono state inserite correttamente. Ad eccezione dei casi in cui è presente una mancata corrispondenza dello schema in una delle righe, le righe indicate nella proprietà insertErrors non vengono inserite e tutte le altre righe vengono inserite correttamente. La proprietà errors contiene informazioni dettagliate sul motivo per cui l'errore di ogni riga non è andato a buon fine. La proprietà index indica l'indice di riga con base 0 della richiesta a cui si applica l'errore.
  • Nessuna delle righe inserite correttamente. Se BigQuery rileva una mancata corrispondenza dello schema nelle singole righe della richiesta, nessuna delle righe viene inserita e viene restituita una voce insertErrors per ogni riga, anche le righe che non avevano una mancata corrispondenza dello schema. Le righe senza una mancata corrispondenza dello schema presentano un errore con la proprietà reason impostata su stopped e possono essere inviate di nuovo così come sono. Le righe non corrispondenti includono informazioni dettagliate sulla mancata corrispondenza dello schema. Per informazioni sui tipi di buffer di protocollo supportati per ogni tipo di dati BigQuery, consulta Conversioni dei tipi di dati.

Errori dei metadati per l'inserimento di flussi di dati

Poiché l'API per l'inserimento di flussi di dati di BigQuery è progettata per tassi di inserzione elevati, le modifiche ai metadati della tabella sottostante sono alla fine coerenti quando si interagisce con il sistema di inserimento di flussi. La maggior parte delle volte, le modifiche ai metadati vengono propagate entro pochi minuti, ma durante questo periodo le risposte dell'API potrebbero riflettere lo stato incoerente della tabella.

Ecco alcuni scenari:

  • Modifiche allo schema. La modifica dello schema di una tabella che ha ricevuto di recente insert di flussi di dati può causare risposte con errori di mancata corrispondenza dello schema, poiché il sistema di inserimento di flussi potrebbe non rilevare immediatamente la modifica dello schema.
  • Creazione/eliminazione della tabella. L'invio di flussi di dati a una tabella inesistente restituisce una variante di una risposta notFound. Una tabella creata in risposta potrebbe non essere riconosciuta immediatamente dai successivi inserimento di flussi di dati. Allo stesso modo, l'eliminazione o la nuova creazione di una tabella può creare un periodo di tempo in cui gli inserti di flussi di dati vengono effettivamente recapitati alla vecchia tabella. Gli insert di flussi di dati potrebbero non essere presenti nella nuova tabella.
  • Troncamento della tabella. Il troncamento dei dati di una tabella (utilizzando un job di query che utilizza writeDisposition di WRITE_TRUNCATE) può causare in modo simile gli inserti successivi durante il periodo di coerenza.

Dati mancanti/non disponibili

Gli inserti di flussi di dati risiedono temporaneamente nello spazio di archiviazione ottimizzato per la scrittura, che ha caratteristiche di disponibilità diverse rispetto allo spazio di archiviazione gestito. Alcune operazioni in BigQuery non interagiscono con l'archiviazione ottimizzata per la scrittura, come i job di copia delle tabelle e i metodi API come tabledata.list. I dati relativi ai flussi di dati recenti non saranno presenti nella tabella o nell'output di destinazione.