Utilizzare l'API Streaming precedente

Questo documento descrive come eseguire lo streaming dei dati in BigQuery utilizzando il metodo tabledata.insertAll precedente.

Per i nuovi progetti, consigliamo di utilizzare l'API BigQuery Storage Write anziché il metodo tabledata.insertAll. L'API Storage Write offre prezzi inferiori e funzionalità più efficaci, tra cui la semantica di caricamento esattamente una volta. Se stai eseguendo la migrazione di un progetto esistente dal metodo tabledata.insertAll all'API Storage Write, ti consigliamo di selezionare lo stream predefinito. Il metodo tabledata.insertAll è ancora completamente supportato.

Prima di iniziare

  1. Assicurati di disporre dell'accesso in scrittura al set di dati contenente la tabella di destinazione. La tabella deve esistere prima di iniziare a scrivere dati al suo interno, se non utilizzi tabelle modello. Per saperne di più sulle tabelle modello, consulta Creare tabelle automaticamente utilizzando le tabelle modello.

  2. Consulta le norme relative alle quote per i dati in streaming.

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Lo streaming non è disponibile nel livello gratuito. Se provi a utilizzare lo streaming senza attivare la fatturazione, viene visualizzato il seguente errore: BigQuery: Streaming insert is not allowed in the free tier.

  5. Concedi i ruoli IAM (Identity and Access Management) che concedono agli utenti le autorizzazioni necessarie per eseguire ogni attività in questo documento.

Autorizzazioni obbligatorie

Per eseguire lo streaming dei dati in BigQuery, devi disporre delle seguenti autorizzazioni IAM:

  • bigquery.tables.updateData (consente di inserire dati nella tabella)
  • bigquery.tables.get (consente di ottenere i metadati della tabella)
  • bigquery.datasets.get (consente di ottenere i metadati del set di dati)
  • bigquery.tables.create (obbligatorio se utilizzi una tabella modello per creare la tabella automaticamente)

Ciascuno dei seguenti ruoli IAM predefiniti include le autorizzazioni necessarie per trasmettere i dati in BigQuery:

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.admin

Per ulteriori informazioni sui ruoli e sulle autorizzazioni IAM in BigQuery, consulta Ruoli e autorizzazioni predefiniti.

Trasmissione di flussi di dati in BigQuery

C#

Prima di provare questo esempio, segui le istruzioni di configurazione C# riportate nella guida rapida all'utilizzo di BigQuery con le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery C#.

Per autenticarti in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.


using Google.Cloud.BigQuery.V2;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

Prima di provare questo esempio, segui le istruzioni di configurazione Go riportate nella guida rapida all'utilizzo di BigQuery con le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Go.

Per autenticarti in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %w", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione Java riportate nella guida rapida all'utilizzo di BigQuery con le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Java.

Per autenticarti in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");
    // TODO(developer): Replace the row id with a unique value for each row.
    String rowId = "ROW_ID";
    tableInsertRows(datasetName, tableName, rowId, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, String rowId, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can omit the unique row ids to disable de-duplication.
                  .addRow(rowId, rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione Node.js riportate nella guida rapida all'utilizzo di BigQuery con le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Node.js.

Per autenticarti in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  const rows = [
    {name: 'Tom', age: 30},
    {name: 'Jane', age: 32},
  ];

  // Insert data into a table
  await bigquery.dataset(datasetId).table(tableId).insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}

PHP

Prima di provare questo esempio, segui le istruzioni di configurazione PHP riportate nella guida rapida all'utilizzo di BigQuery con le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery PHP.

Per autenticarti in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.

use Google\Cloud\BigQuery\BigQueryClient;

/**
 * Stream data into bigquery
 *
 * @param string $projectId The project Id of your Google Cloud Project.
 * @param string $datasetId The BigQuery dataset ID.
 * @param string $tableId The BigQuery table ID.
 * @param string $data Json encoded data For eg,
 *    $data = json_encode([
 *       "field1" => "value1",
 *       "field2" => "value2",
 *    ]);
 */
function stream_row(
    string $projectId,
    string $datasetId,
    string $tableId,
    string $data
): void {
    // instantiate the bigquery table service
    $bigQuery = new BigQueryClient([
      'projectId' => $projectId,
    ]);
    $dataset = $bigQuery->dataset($datasetId);
    $table = $dataset->table($tableId);

    $data = json_decode($data, true);
    $insertResponse = $table->insertRows([
      ['data' => $data],
      // additional rows can go here
    ]);
    if ($insertResponse->isSuccessful()) {
        print('Data streamed into BigQuery successfully' . PHP_EOL);
    } else {
        foreach ($insertResponse->failedRows() as $row) {
            foreach ($row['errors'] as $error) {
                printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
            }
        }
    }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione Python riportate nella guida rapida all'utilizzo di BigQuery con le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Python.

Per autenticarti in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

Prima di provare questo esempio, segui le istruzioni di configurazione Ruby riportate nella guida rapida all'utilizzo di BigQuery con le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Ruby.

Per autenticarti in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

Non è necessario compilare il campo insertID quando inserisci le righe. L'esempio seguente mostra come evitare di inviare un insertID per ogni riga durante lo streaming.

Java

Prima di provare questo esempio, segui le istruzioni di configurazione Java riportate nella guida rapida all'utilizzo di BigQuery con le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Java.

Per autenticarti in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.ImmutableList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to insert rows without row ids in a table
public class TableInsertRowsWithoutRowIds {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    tableInsertRowsWithoutRowIds(datasetName, tableName);
  }

  public static void tableInsertRowsWithoutRowIds(String datasetName, String tableName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      final BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      // Create rows to insert
      Map<String, Object> rowContent1 = new HashMap<>();
      rowContent1.put("stringField", "Phred Phlyntstone");
      rowContent1.put("numericField", 32);
      Map<String, Object> rowContent2 = new HashMap<>();
      rowContent2.put("stringField", "Wylma Phlyntstone");
      rowContent2.put("numericField", 29);
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(TableId.of(datasetName, tableName))
                  // No row ids disable de-duplication, and also disable the retries in the Java
                  // library.
                  .setRows(
                      ImmutableList.of(
                          InsertAllRequest.RowToInsert.of(rowContent1),
                          InsertAllRequest.RowToInsert.of(rowContent2)))
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table without row ids");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione Python riportate nella guida rapida all'utilizzo di BigQuery con le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Python.

Per autenticarti in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Inviare dati relativi a data e ora

Per i campi data e ora, formatta i dati nel metodo tabledata.insertAll come segue:

Tipo Formato
DATE Una stringa nel formato "YYYY-MM-DD"
DATETIME Una stringa nel formato "YYYY-MM-DD [HH:MM:SS]"
TIME Una stringa nel formato "HH:MM:SS"
TIMESTAMP Il numero di secondi dal 1° gennaio 1970 (l'epoca Unix) o una stringa nel formato "YYYY-MM-DD HH:MM[:SS]"

Inviare dati sull'intervallo

Per i campi di tipo RANGE<T>, formatta i dati nel metodo tabledata.insertAll come oggetto JSON con due campi, start e end. I valori mancanti o NULL per i campi start e end rappresentano confini illimitati. Questi campi devono avere lo stesso formato JSON supportato di tipo T, dove T può essere uno dei valori DATE, DATETIME e TIMESTAMP.

Nell'esempio seguente, il campo f_range_date rappresenta una colonna RANGE<DATE> in una tabella. In questa colonna viene inserita una riga utilizzando l'APItabledata.insertAll.

{
    "f_range_date": {
        "start": "1970-01-02",
        "end": null
    }
}

Disponibilità dei dati in streaming

I dati sono disponibili per l'analisi in tempo reale utilizzando le query GoogleSQL subito dopo che BigQuery ha confermato correttamente una richiestatabledata.insertAll.

Le righe di recente importazione in streaming in una tabella partizionata in base all'ora di importazione hanno temporaneamente un valore NULL per la pseudocolonna _PARTITIONTIME. Per queste righe, BigQuery assegna il valore finale non NULL della colonna PARTITIONTIME in background, in genere entro pochi minuti. In rari casi, l'operazione può richiedere fino a 90 minuti.

Alcune righe trasmesse in streaming di recente potrebbero non essere disponibili per la copia della tabella in genere per alcuni minuti. In rari casi, l'operazione può richiedere fino a 90 minuti. Per verificare se i dati sono disponibili per la copia della tabella, controlla la risposta tables.get per una sezione denominata streamingBuffer. Se la sezione streamingBuffer non è presente, i tuoi dati sono disponibili per la copia. Puoi anche utilizzare il campo streamingBuffer.oldestEntryTime per identificare la data di registrazione dei record nel buffer di streaming.

Deduplicazione best effort

Quando fornisci insertId per una riga inserita, BigQuery utilizza questo ID per supportare la deduplica secondo il criterio del massimo impegno per un massimo di un minuto. In altre parole, se scorri la stessa riga con lo stesso insertId più di una volta all'interno di quel periodo di tempo nella stessa tabella, BigQuery potrebbe deduplicare le più occorrenze di quella riga, conservandone solo una.

Il sistema si aspetta che le righe fornite con insertId identici siano anche identiche. Se due righe hanno insertId identici, non è deterministico la riga che BigQuery conserva.

La deduplica è in genere pensata per gli scenari di ripetizione in un sistema distribuito in cui non è possibile determinare lo stato di un inserimento di flussi di dati in determinate condizioni di errore, ad esempio errori di rete tra il sistema e BigQuery o errori interni in BigQuery. Se riprovi un'operazione di inserimento, utilizza lo stesso insertId per lo stesso insieme di righe in modo che BigQuery possa tentare di deduplicare i dati. Per ulteriori informazioni, consulta la sezione Risolvere i problemi relativi agli inserimenti di flussi di dati.

La deduplica offerta da BigQuery è il massimo impegno e non deve essere considerata un meccanismo per garantire l'assenza di duplicati nei dati. Inoltre, BigQuery potrebbe ridurre la qualità della deduplica con il massimo impegno in qualsiasi momento per garantire un'affidabilità e una disponibilità superiori per i tuoi dati.

Se hai requisiti rigorosi di deduplica per i tuoi dati, Google Cloud Datastore è un servizio alternativo che supporta le transazioni.

Disattivazione della deduplicazione best effort

Puoi disattivare la deduplicazione best effort non compilando il campo insertId per ogni riga inserita. Questo è il metodo consigliato per inserire i dati.

Apache Beam e Dataflow

Per disattivare la deduplica secondo il criterio del massimo impegno quando utilizzi il connettore BigQuery I/O di Apache Beam per Java, utilizza il metodo ignoreInsertIds().

Rimozione manuale dei duplicati

Per assicurarti che non esistano righe duplicate al termine dello streaming, utilizza la seguente procedura manuale:

  1. Aggiungi insertId come colonna nello schema della tabella e includi il valore insertId nei dati di ogni riga.
  2. Dopo aver interrotto lo streaming, esegui la seguente query per verificare la presenza di duplicati:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    Se il risultato è maggiore di 1, esistono duplicati.
  3. Per rimuovere i duplicati, esegui la seguente query. Specifica una tabella di destinazione, consenti risultati di grandi dimensioni e disattiva l'appiattimento dei risultati.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

Note sulla query di rimozione dei duplicati:

  • La strategia più sicura per la query di rimozione dei duplicati è scegliere come target una nuova tabella. In alternativa, puoi scegliere come target la tabella di origine con la disposizione di scritturaWRITE_TRUNCATE.
  • La query di rimozione dei duplicati aggiunge una colonna row_number con il valore 1 alla fine dello schema della tabella. La query utilizza un'istruzione SELECT * EXCEPT di GoogleSQL per escludere la colonna row_number dalla tabella di destinazione. Il prefisso #standardSQL attiva GoogleSQL per questa query. In alternativa, puoi eseguire la selezione in base a nomi di colonne specifici per omettere questa colonna.
  • Per eseguire query sui dati in tempo reale con i duplicati rimossi, puoi anche creare una vista sulla tabella utilizzando la query di rimozione dei duplicati. Tieni presente che i costi delle query in base alla visualizzazione vengono calcolati in base alle colonne selezionate nella visualizzazione, il che può comportare dimensioni di scansione dei byte elevate.

Stream in tabelle partizionate in base al tempo

Quando esegui lo streaming dei dati in una tabella partizionata in base al tempo, ogni partizione ha un buffer di streaming. Il buffer di streaming viene mantenuto quando esegui un job di caricamento, query o copia che sovrascrive una partizione impostando la proprietà writeDisposition su WRITE_TRUNCATE. Se vuoi rimuovere il buffer di streaming, verifica che sia vuoto chiamando tables.get nella partizione.

Partizionamento per data di importazione

Quando esegui lo streaming in una tabella partizionata per data di importazione, BigQuery ricava la partizione di destinazione dall'ora UTC corrente.

I dati appena arrivati vengono posizionati temporaneamente nella partizione __UNPARTITIONED__ mentre si trovano nel buffer dei flussi di dati. Quando sono disponibili dati non partizionati sufficienti, BigQuery li partiziona nella partizione corretta. Tuttavia, non esiste un SLA per il tempo necessario per spostare i dati dalla partizione __UNPARTITIONED__. Una query può escludere i dati nel buffer di streaming da una query filtrando i valori NULL dalla partizione __UNPARTITIONED__ utilizzando una delle pseudocolonne (_PARTITIONTIME o _PARTITIONDATE, a seconda del tipo di dati preferito).

Se stai eseguendo lo streaming dei dati in una tabella partizionata giornaliera, puoi eseguire l'override della deduzione della data fornendo un decoratore della partizione nell'ambito della richiesta insertAll. Includi il decoratore nel parametro tableId. Ad esempio, puoi eseguire lo streaming nella partizione corrispondente al 1° marzo 2021 per la tabella table1 utilizzando il decoratore della partizione:

table1$20210301

Quando esegui lo streaming utilizzando un decoratore delle partizioni, puoi eseguire lo streaming in partizioni degli ultimi 31 giorni nel passato e di 16 giorni nel futuro rispetto alla data corrente, in base all'ora UTC corrente. Per scrivere nelle partizioni per date al di fuori di questi limiti consentiti, utilizza un job di caricamento o di query, come descritto in Aggiunta e sovrascrittura dei dati delle tabella partizionata partizionate.

Lo streaming con un decoratore di partizione è supportato solo per le tavole con partizione giornaliera. Non è supportato per le tabelle partizionate su base oraria, mensile o annuale.

Per i test, puoi utilizzare lo strumento a riga di comando bq bq insert. Ad esempio, il seguente comando invia in streaming una singola riga a una partizione per la data 1° gennaio 2017 ($20170101) in una tabella partizionata denominata mydataset.mytable:

echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'

Partizionamento delle colonne per unità di tempo

Puoi eseguire lo streaming dei dati in una tabella partizionata in base a una colonna DATE, DATETIME o TIMESTAMP compresa tra 5 anni nel passato e 1 anno nel futuro. I dati al di fuori di questo intervallo vengono rifiutati.

Quando i dati vengono trasmessi in streaming, vengono inizialmente inseriti nella partizione __UNPARTITIONED__. Quando sono disponibili dati non partizionati sufficienti, BigQuery li ripartisce automaticamente inserendoli nella partizione appropriata. Tuttavia, non esiste un SLA per il tempo necessario per spostare i dati dalla partizione__UNPARTITIONED__.

  • Nota: le partizioni giornaliere vengono elaborate in modo diverso rispetto alle partizioni orarie, mensili e annuali. Solo i dati al di fuori dell'intervallo di date (degli ultimi 7 giorni ai prossimi 3 giorni) vengono estratti nella partizione NON PARTIZIONATA, awaiting to be repartitioned. Invece, per la tabella partizionata su base oraria, i dati vengono sempre estratti nella partizione NON PARTIZIONATA e successivamente ripartizionati.

Creare tabelle automaticamente utilizzando le tabelle modello

Le tabelle modello forniscono un meccanismo per suddividere una tabella logica in molte tabelle più piccole al fine di creare insiemi di dati più piccoli (ad esempio, in base all'ID utente). Le tabelle dei modelli hanno una serie di limitazioni descritte di seguito. I modi consigliati per ottenere questo comportamento sono invece le tabelle partizionate e le tabelle raggruppate in cluster.

Per utilizzare una tabella modello tramite l'API BigQuery, aggiungi un parametro templateSuffix alla richiesta insertAll. Per lo strumento a riga di comando bq, aggiungi il flag template_suffix al comando insert. Se BigQuery rileva un parametro templateSuffix o il flag template_suffix, tratta la tabella di destinazione come un modello di base. Viene creata una nuova tabella che condivide lo stesso schema della tabella di destinazione e ha un nome che include il suffisso specificato:

<targeted_table_name> + <templateSuffix>

Se utilizzi una tabella modello, eviti il sovraccarico di creare ogni tabella singolarmente e di specificare lo schema per ogni tabella. Devi solo creare un singolo modello e fornire suffissi diversi in modo che BigQuery possa creare le nuove tabelle per te. BigQuery inserisce le tabelle nello stesso progetto e nello stesso set di dati.

Le tabelle create utilizzando le tabelle modello sono in genere disponibili entro pochi secondi. In rari casi, potrebbe essere necessario più tempo prima che diventino disponibili.

Modificare lo schema della tabella del modello

Se modifichi lo schema di una tabella modello, tutte le tabelle generate successivamente useranno lo schema aggiornato. Le tabelle generate in precedenza non sono interessate, a meno che la tabella esistente non abbia ancora un buffer dei flussi.

Per le tabelle esistenti che hanno ancora un buffer di streaming, se modifichi lo schema della tabella modello in modo compatibile con le versioni precedenti, viene aggiornato anche lo schema delle tabelle generate in streaming attivo. Tuttavia, se modifichi lo schema della tabella del modello in modo non compatibile con le versioni precedenti, tutti i dati memorizzati nella cache che utilizzano il vecchio schema andranno persi. Inoltre, non puoi trasmettere nuovi dati alle tabelle generate esistenti che utilizzano lo schema precedente, ora incompatibile.

Dopo aver modificato lo schema di una tabella del modello, attendi che le modifiche vengano propagate prima di provare a inserire nuovi dati o eseguire query sulle tabelle generate. Le richieste di inserimento di nuovi campi dovrebbero andare a buon fine entro pochi minuti. I tentativi di eseguire query sui nuovi campi potrebbero richiedere un'attesa più lunga, fino a 90 minuti.

Se vuoi modificare lo schema di una tabella generata, non modificarlo finché lo streaming tramite la tabella del modello non è terminato e la sezione delle statistiche di streaming della tabella generata non è presente nella risposta tables.get(), il che indica che non vengono memorizzati in buffer dati nella tabella.

Le tabelle partizionate e le tabelle cluster non presentano le limitazioni precedenti e sono il meccanismo consigliato.

Dettagli della tabella del modello

Valore del suffisso del modello
Il valore templateSuffix (o --template_suffix) deve contenere solo lettere (a-z, A-Z), numeri (0-9) o trattini bassi (_). La lunghezza massima combinata del nome della tabella e del suffisso della tabella è di 1024 caratteri.
Quota

Le tabelle dei modelli sono soggette alle limitazioni della quota di streaming. Il progetto può creare fino a 10 tabelle al secondo con tabelle modello, in modo simile all'API tables.insert. Questa quota si applica solo alle tabelle in fase di creazione, non a quelle in fase di modifica.

Se la tua applicazione deve creare più di 10 tabelle al secondo, ti consigliamo di utilizzare le tabelle cluster. Ad esempio, puoi inserire l'ID tabella ad alta cardinalità nella colonna chiave di una singola tabella di clustering.

Durata

La tabella generata eredita la data e l'ora di scadenza dal set di dati. Come per i normali dati in streaming, le tabelle generate non possono essere copiate immediatamente.

Deduplicazione

La deduplicazione avviene solo tra riferimenti uniformi a una tabella di destinazione. Ad esempio, se esegui lo streaming contemporaneamente in una tabella generata utilizzando sia le tabelle modello sia un normale comando insertAll, non viene eseguita la deduplica tra le righe inserite dalle tabelle modello e da un normale comando insertAll.

Visualizzazioni

La tabella del modello e le tabelle generate non devono essere visualizzazioni.

Risolvere i problemi relativi agli inserimenti di flussi di dati

Le sezioni seguenti descrivono come risolvere i problemi che si verificano quando carichi i dati in BigQuery tramite l'API Streaming precedente. Per ulteriori informazioni su come risolvere gli errori di quota per gli inserimenti di flussi di dati, consulta la sezione Errori di quota relativi agli inserimenti di flussi di dati.

Codici di risposta HTTP di errore

Se ricevi un codice di risposta HTTP di errore, ad esempio un errore di rete, non c'è modo di sapere se l'inserimento di flussi di dati è andato a buon fine. Se provi a inviare nuovamente la richiesta, potresti ritrovarti con righe duplicate nella tabella. Per contribuire a proteggere la tua tabella dalla duplicazione, imposta la proprietà insertId quando invii la richiesta. BigQuery utilizza la proprietà insertId per la deduplicazione.

Se ricevi un errore di autorizzazione, un errore relativo al nome della tabella non valido o un errore di superamento della quota, non vengono inserite righe e l'intera richiesta non va a buon fine.

Codici di risposta HTTP di successo

Anche se ricevi un codice di risposta HTTP di successo, devi controllare la proprietà insertErrors della risposta per determinare se le inserzioni di righe sono andate a buon fine, perché è possibile che BigQuery abbia avuto solo un risultato parziale nell'inserire le righe. Potresti riscontrare uno dei seguenti scenari:

  • Tutte le righe sono state inserite correttamente. Se la proprietà insertErrors è un elenco vuoto, tutte le righe sono state inserite correttamente.
  • Alcune righe sono state inserite correttamente. Tranne nei casi in cui si verifichi una mancata corrispondenza dello schema in una delle righe, le righe indicate nella proprietà insertErrors non vengono inserite e tutte le altre righe vengono inserite correttamente. La proprietà errors contiene informazioni dettagliate sul motivo dell'errore di ogni riga non riuscita. La proprietà index indica l'indice di riga a partire da 0 della richiesta a cui si applica l'errore.
  • Nessuna delle righe è stata inserita correttamente. Se BigQuery rileva una mancata corrispondenza dello schema nelle singole righe della richiesta, nessuna delle righe viene inserita e viene restituita una voce insertErrors per ogni riga, anche per quelle che non hanno una mancata corrispondenza dello schema. Le righe che non hanno riscontrato una mancata corrispondenza dello schema presentano un errore con la proprietà reason impostata su stopped e possono essere inviate di nuovo così come sono. Le righe che non sono riuscite includono informazioni dettagliate sulla mancata corrispondenza dello schema. Per informazioni sui tipi di buffer di protocollo supportati per ogni tipo di dato BigQuery, consulta Conversioni dei tipi di dati.

Errori dei metadati per gli inserimenti di flussi di dati

Poiché l'API di streaming di BigQuery è progettata per tassi di inserzione elevati, le modifiche ai metadati della tabella sottostante sono alla fine coerenti quando interagiscono con il sistema di streaming. La maggior parte delle volte, le modifiche ai metadati vengono propagate entro pochi minuti, ma durante questo periodo le risposte dell'API potrebbero riflettere lo stato incoerente della tabella.

Ecco alcuni scenari:

  • Modifiche allo schema. La modifica dello schema di una tabella che ha ricevuto di recente inserimenti tramite streaming può causare risposte con errori di mancata corrispondenza dello schema perché il sistema di streaming potrebbe non rilevare immediatamente la modifica dello schema.
  • Creazione/eliminazione di tabelle. Lo streaming in una tabella inesistente restituisce una variazione di una risposta notFound. Una tabella creata in risposta potrebbe non essere riconosciuta immediatamente dagli inserimenti in streaming successivi. Analogamente, l'eliminazione o la ricreazione di una tabella può creare un periodo di tempo in cui gli inserimenti in streaming vengono effettivamente inviati alla vecchia tabella. Gli inserimenti in streaming potrebbero non essere presenti nella nuova tabella.
  • Troncamento della tabella. Analogamente, l'eliminazione dei dati di una tabella (utilizzando un job di query che utilizza writeDisposition di WRITE_TRUNCATE) può causare l'eliminazione degli inserimenti successivi durante il periodo di coerenza.

Dati mancanti/non disponibili

Gli inserimenti in streaming risiedono temporaneamente nello spazio di archiviazione ottimizzato per la scrittura, che presenta caratteristiche di disponibilità diverse rispetto allo spazio di archiviazione gestito. Alcune operazioni in BigQuery non interagiscono con lo spazio di archiviazione ottimizzato per la scrittura, ad esempio i job di copia delle tabelle e i metodi API come tabledata.list. I dati in streaming recenti non saranno presenti nella tabella o nell'output di destinazione.