Utilizzare l'API Streaming precedente

Questo documento descrive come eseguire lo streaming dei dati in BigQuery utilizzando il metodo tabledata.insertAll precedente.

Per i nuovi progetti, consigliamo di utilizzare l'API BigQuery Storage Write anziché il metodo tabledata.insertAll. L'API StorageWrite ha prezzi e funzionalità più solide, inclusa la semantica della consegna "exactly-once". Se stai eseguendo la migrazione di un progetto esistente dal metodo tabledata.insertAll all'API StorageWrite, ti consigliamo di selezionare stream predefinito. Il metodo tabledata.insertAll è ancora completamente supportato.

Prima di iniziare

  1. Assicurati di disporre dell'accesso in scrittura al set di dati contenente la tabella di destinazione. La tabella deve esistere prima di iniziare a scrivere dati a meno che non utilizzi tabelle modello. Per saperne di più sulle tabelle dei modelli, consulta la sezione Creare tabelle automaticamente utilizzando le tabelle modello.

  2. Consulta le norme relative alle quote per i dati in streaming.

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Lo streaming non è disponibile per il livello gratuito. Se tenti di utilizzare lo streaming senza attivare la fatturazione, viene visualizzato il seguente errore: BigQuery: Streaming insert is not allowed in the free tier.

  5. Concedi i ruoli IAM (Identity and Access Management) che concedono agli utenti le autorizzazioni necessarie per eseguire ogni attività in questo documento.

Autorizzazioni obbligatorie

Per trasmettere i dati in BigQuery, devi disporre delle seguenti autorizzazioni IAM:

  • bigquery.tables.updateData (consente di inserire dati nella tabella)
  • bigquery.tables.get (consente di ottenere i metadati della tabella)
  • bigquery.datasets.get (consente di ottenere i metadati del set di dati)
  • bigquery.tables.create (obbligatorio se utilizzi una tabella modello per creare la tabella automaticamente)

Ciascuno dei seguenti ruoli IAM predefiniti include le autorizzazioni necessarie per trasmettere i dati in modalità flusso a BigQuery:

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.admin

Per ulteriori informazioni sui ruoli e sulle autorizzazioni IAM in BigQuery, consulta Ruoli e autorizzazioni predefiniti.

Inserire i dati in BigQuery in modalità flusso

C#

Prima di provare questo esempio, segui le istruzioni di configurazione C# riportate nella guida rapida all'utilizzo di BigQuery con le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery C#.

Per autenticarti a BigQuery, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per le librerie client.


using Google.Cloud.BigQuery.V2;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

Prima di provare questo esempio, segui le istruzioni di configurazione Go riportate nella guida rapida all'utilizzo di BigQuery con le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Go.

Per eseguire l'autenticazione su BigQuery, configura Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %w", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni per la configurazione di Java nel Guida rapida di BigQuery con librerie client. Per ulteriori informazioni, consulta API Java BigQuery documentazione di riferimento.

Per eseguire l'autenticazione su BigQuery, configura Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");
    // TODO(developer): Replace the row id with a unique value for each row.
    String rowId = "ROW_ID";
    tableInsertRows(datasetName, tableName, rowId, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, String rowId, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can omit the unique row ids to disable de-duplication.
                  .addRow(rowId, rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione Node.js riportate nella guida rapida all'utilizzo di BigQuery con le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Node.js.

Per eseguire l'autenticazione su BigQuery, configura Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  const rows = [
    {name: 'Tom', age: 30},
    {name: 'Jane', age: 32},
  ];

  // Insert data into a table
  await bigquery.dataset(datasetId).table(tableId).insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}

PHP

Prima di provare questo esempio, segui le istruzioni per la configurazione di PHP nel Guida rapida di BigQuery con librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery PHP.

Per autenticarti a BigQuery, configura le credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.

use Google\Cloud\BigQuery\BigQueryClient;

/**
 * Stream data into bigquery
 *
 * @param string $projectId The project Id of your Google Cloud Project.
 * @param string $datasetId The BigQuery dataset ID.
 * @param string $tableId The BigQuery table ID.
 * @param string $data Json encoded data For eg,
 *    $data = json_encode([
 *       "field1" => "value1",
 *       "field2" => "value2",
 *    ]);
 */
function stream_row(
    string $projectId,
    string $datasetId,
    string $tableId,
    string $data
): void {
    // instantiate the bigquery table service
    $bigQuery = new BigQueryClient([
      'projectId' => $projectId,
    ]);
    $dataset = $bigQuery->dataset($datasetId);
    $table = $dataset->table($tableId);

    $data = json_decode($data, true);
    $insertResponse = $table->insertRows([
      ['data' => $data],
      // additional rows can go here
    ]);
    if ($insertResponse->isSuccessful()) {
        print('Data streamed into BigQuery successfully' . PHP_EOL);
    } else {
        foreach ($insertResponse->failedRows() as $row) {
            foreach ($row['errors'] as $error) {
                printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
            }
        }
    }
}

Python

Prima di provare questo esempio, segui le istruzioni per la configurazione di Python nel Guida rapida di BigQuery con librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Python.

Per autenticarti a BigQuery, configura le credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

Prima di provare questo esempio, segui le istruzioni per la configurazione di Ruby nel Guida rapida di BigQuery con librerie client. Per ulteriori informazioni, consulta API Ruby BigQuery documentazione di riferimento.

Per autenticarti a BigQuery, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per le librerie client.

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

Non è necessario compilare il campo insertID quando inserisci le righe. L'esempio seguente mostra come evitare di inviare un insertID per ogni riga durante lo streaming.

Java

Prima di provare questo esempio, segui le istruzioni per la configurazione di Java nel Guida rapida di BigQuery con librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Java.

Per autenticarti a BigQuery, configura le credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.ImmutableList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to insert rows without row ids in a table
public class TableInsertRowsWithoutRowIds {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    tableInsertRowsWithoutRowIds(datasetName, tableName);
  }

  public static void tableInsertRowsWithoutRowIds(String datasetName, String tableName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      final BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      // Create rows to insert
      Map<String, Object> rowContent1 = new HashMap<>();
      rowContent1.put("stringField", "Phred Phlyntstone");
      rowContent1.put("numericField", 32);
      Map<String, Object> rowContent2 = new HashMap<>();
      rowContent2.put("stringField", "Wylma Phlyntstone");
      rowContent2.put("numericField", 29);
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(TableId.of(datasetName, tableName))
                  // No row ids disable de-duplication, and also disable the retries in the Java
                  // library.
                  .setRows(
                      ImmutableList.of(
                          InsertAllRequest.RowToInsert.of(rowContent1),
                          InsertAllRequest.RowToInsert.of(rowContent2)))
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table without row ids");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Python

Prima di provare questo esempio, segui le istruzioni per la configurazione di Python nel Guida rapida di BigQuery con librerie client. Per ulteriori informazioni, consulta API Python BigQuery documentazione di riferimento.

Per autenticarti a BigQuery, configura le credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Invia dati relativi a data e ora

Per i campi relativi a data e ora, formatta i dati nel metodo tabledata.insertAll come che segue:

Tipo Formato
DATE Una stringa nel formato "YYYY-MM-DD"
DATETIME Una stringa nel formato "YYYY-MM-DD [HH:MM:SS]"
TIME Una stringa nel formato "HH:MM:SS"
TIMESTAMP Il numero di secondi dal 1° gennaio 1970 (l'epoca Unix) o una stringa nel formato "YYYY-MM-DD HH:MM[:SS]"

Inviare dati sull'intervallo

Per i campi di tipo RANGE<T>, formatta i dati in tabledata.insertAll come oggetto JSON con due campi, start e end. I valori mancanti o NULL per i campi start e end rappresentano confini illimitati. Questi campi devono avere lo stesso formato JSON supportato del tipo T, dove T può essere DATE, DATETIME e TIMESTAMP.

Nell'esempio seguente, il campo f_range_date rappresenta una colonna RANGE<DATE> in una tabella. In questa colonna viene inserita una riga utilizzando API tabledata.insertAll.

{
    "f_range_date": {
        "start": "1970-01-02",
        "end": null
    }
}

Disponibilità dei dati in streaming

I dati sono disponibili per l'analisi in tempo reale utilizzando le query GoogleSQL subito dopo che BigQuery ha confermato correttamente una richiesta tabledata.insertAll.

Le righe di recente importazione in streaming in una tabella partizionata in base all'ora di importazione hanno temporaneamente un valore NULL per la pseudocolonna _PARTITIONTIME. Per queste righe, BigQuery assegna lo stato finale non NULL valore della colonna PARTITIONTIME in background, in genere entro pochi minuti. In rari casi, l'operazione può richiedere fino a 90 minuti.

Alcune righe trasmesse di recente potrebbero non essere disponibili per la copia della tabella in genere per un per alcuni minuti. In rari casi, l'operazione può richiedere fino a 90 minuti. Per vedere se i dati è disponibile per la copia della tabella, controlla la risposta tables.get per una sezione denominata streamingBuffer Se la sezione streamingBuffer non è presente, i tuoi dati sono disponibili per la copia. Puoi anche utilizzare il campo streamingBuffer.oldestEntryTime per identificare la data di registrazione dei record nel buffer di streaming.

Deduplicazione best effort

Quando fornisci insertId per una riga inserita, BigQuery utilizza questo ID per supportare la deduplica secondo il criterio del massimo impegno per un massimo di un minuto. Ossia, se trasmetti in streaming la stessa riga con lo stesso insertId più di una volta all'interno quel periodo di tempo nella stessa tabella, BigQuery potrebbe deduplicare le occorrenze multiple della riga, conservando solo una di queste occorrenze.

Il sistema si aspetta che le righe fornite con insertId identici siano anche identiche. Se due righe hanno insertId identici, non è deterministico quali righe vengono conservate da BigQuery.

La deduplica è in genere pensata per gli scenari di ripetizione in un sistema distribuito in cui non è possibile determinare lo stato di un inserimento in streaming in determinate condizioni di errore, ad esempio errori di rete tra il sistema e BigQuery o errori interni in BigQuery. Se riprovi un'inserzione, utilizza lo stesso insertId per lo stesso insieme di righe in modo che BigQuery possa tentare di deduplicare i dati. Per ulteriori informazioni, consulta la sezione Risolvere i problemi relativi agli inserimenti di flussi di dati.

La deduplica offerta da BigQuery è il massimo impegno e non deve essere considerata un meccanismo per garantire l'assenza di duplicati nei dati. Inoltre, BigQuery potrebbe ridurre la qualità dei di eliminare i duplicati in qualsiasi momento per garantire l'affidabilità e la disponibilità dei dati.

Se hai requisiti rigorosi di deduplica per i tuoi dati, Google Cloud Datastore è un servizio alternativo che supporta le transazioni.

Disattivazione della deduplicazione best effort

Puoi disattivare la deduplicazione best effort non compilando il insertId per ogni riga inserita. Questo è il metodo consigliato per inserire i dati.

Apache Beam e Dataflow

Per disattivare la deduplica secondo il criterio del massimo impegno quando utilizzi il connettore BigQuery I/O di Apache Beam per Java, utilizza il metodo ignoreInsertIds().

Rimozione manuale dei duplicati

Per assicurarti che non esistano righe duplicate al termine della trasmissione del flusso, utilizza la seguente procedura manuale:

  1. Aggiungi insertId come colonna nello schema della tabella e includi il parametro insertId nei dati per ogni riga.
  2. Dopo aver interrotto lo streaming, esegui la seguente query per verificare la presenza di duplicati:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    Se il risultato è maggiore di 1, esistono duplicati.
  3. Per rimuovere i duplicati, esegui la seguente query. Specifica una tabella di destinazione, consenti risultati di grandi dimensioni e disattiva l'appiattimento dei risultati.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

Note sulla query di rimozione duplicata:

  • La strategia più sicura per la query di rimozione dei duplicati è scegliere come target una nuova tabella. In alternativa, puoi scegliere come target la tabella di origine con disposizione di scrittura WRITE_TRUNCATE.
  • La query di rimozione duplicata aggiunge una colonna row_number con il valore 1 a alla fine dello schema della tabella. La query utilizza un'istruzione SELECT * EXCEPT di GoogleSQL per escludere la colonna row_number dalla tabella di destinazione. La Prefisso #standardSQL attiva GoogleSQL per questa query. In alternativa, puoi eseguire la selezione in base a nomi di colonne specifici per omettere questa colonna.
  • Per eseguire query sui dati in tempo reale con i duplicati rimossi, puoi anche creare una vista sulla tabella utilizzando la query di rimozione dei duplicati. Tieni presente che i costi delle query rispetto alla vista vengono calcolate in base alle colonne selezionate di grandi dimensioni, il che può comportare grandi dimensioni di scansione dei byte.

Stream in tabelle partizionate in base al tempo

Quando trasmetti i flussi di dati in una tabella partizionata nel tempo, ogni partizione ha un flusso di dati buffer. Il buffer di streaming viene mantenuto quando esegui un job di caricamento, query o copia che sovrascrive una partizione impostando la proprietà writeDisposition su WRITE_TRUNCATE. Se vuoi rimuovere il buffer di streaming, verifica che il buffer di streaming è vuoto richiamando tables.get sulla partizione.

Partizionamento in fase di importazione

Quando esegui il flusso di dati in una tabella partizionata per data di importazione, BigQuery deduce la partizione di destinazione dall'ora UTC attuale.

I nuovi dati in arrivo vengono temporaneamente inseriti nella partizione __UNPARTITIONED__ mentre sono nel buffer di flusso. Quando sono disponibili dati non partizionati sufficienti, BigQuery li partiziona nella partizione corretta. Tuttavia, non esiste uno SLA (accordo sul livello del servizio) per quanto tempo impiegano i dati a spostarsi __UNPARTITIONED__ partizione. Una query può escludere i dati nel buffer di streaming da una query filtrando i valori NULL dalla partizione __UNPARTITIONED__ utilizzando una delle pseudocolonne (_PARTITIONTIME o _PARTITIONDATE, a seconda del tipo di dati preferito).

Se disponi di flussi di dati in una tabella partizionata giornalmente, puoi eseguire l'override l'inferenza delle date fornendo un decoratore della partizione come parte dell'insertAll richiesta. Includi il decoratore nel parametro tableId. Ad esempio, puoi eseguire lo streaming nella partizione corrispondente al 1° marzo 2021 per la tabella table1 utilizzando il decoratore della partizione:

table1$20210301

Quando esegui lo streaming utilizzando un decoratore delle partizioni, puoi eseguire lo streaming in partizioni degli ultimi 31 giorni nel passato e di 16 giorni nel futuro rispetto alla data corrente, in base all'ora UTC corrente. Per scrivere nelle partizioni per date al di fuori di questi limiti consentiti, utilizza un job di caricamento o di query, come descritto in Aggiunta e sovrascrittura dei dati delle tabelle partizionate.

Lo streaming con un decorator partizione è supportato solo per il partizionamento giornaliero tabelle. Non è supportato per le tabelle partizionate orarie, mensili o annuali.

Per i test, puoi utilizzare lo strumento a riga di comando bq bq insert. Ad esempio, il seguente comando invia in streaming una singola riga a una partizione per la data 1° gennaio 2017 ($20170101) in una tabella partizionata denominata mydataset.mytable:

echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'

Partizionamento delle colonne in base all'unità di tempo

Puoi eseguire lo streaming dei dati in una tabella partizionata in base a una colonna DATE, DATETIME o TIMESTAMP compresa tra 5 anni nel passato e 1 anno nel futuro. I dati al di fuori di questo intervallo vengono rifiutati.

Quando i dati vengono trasmessi in streaming, vengono inizialmente inseriti nella partizione __UNPARTITIONED__. Quando sono disponibili dati non partizionati sufficienti, BigQuery li ripartisce automaticamente inserendoli nella partizione appropriata. Tuttavia, non esiste uno SLA (accordo sul livello del servizio) sul tempo necessario affinché i dati vengano spostati al di fuori __UNPARTITIONED__ partizione.

  • Nota: le partizioni giornaliere vengono elaborate in modo diverso rispetto a quelle orarie, mensili e partizioni annuali. Solo i dati al di fuori dell'intervallo di date (degli ultimi 7 giorni ai prossimi 3 giorni) vengono estratti nella partizione NON PARTIZIONATA, awaiting to be repartitioned. D'altra parte, per una tabella partizionata oraria, i dati vengono sempre estratti la partizione UNPARTITIONED e successivamente ripartizionata.

Creare tabelle automaticamente utilizzando le tabelle modello

Le tabelle modello forniscono un meccanismo per suddividere una tabella logica in molte tabelle più piccole per creare set di dati più piccoli (ad esempio, per ID utente). Tabelle modello presentino alcune limitazioni descritte di seguito. I modi consigliati per ottenere questo comportamento sono invece le tabelle partizionate e le tabelle raggruppate in cluster.

Per utilizzare una tabella modello tramite l'API BigQuery, aggiungi un parametro templateSuffix alla tua richiesta di insertAll. Per lo strumento a riga di comando bq, aggiungi il flag template_suffix al comando insert. Se BigQuery rileva un templateSuffix o il flag template_suffix, tratta la tabella target come base modello. Viene creata una nuova tabella che condivide lo stesso schema della tabella di destinazione e ha un nome che include il suffisso specificato:

<targeted_table_name> + <templateSuffix>

L'utilizzo di una tabella modello evita il sovraccarico della creazione di ogni tabella singolarmente e specificando lo schema per ogni tabella. Devi solo creare un singolo modello e fornire diversi suffissi in modo che BigQuery possa creare le nuove tabelle. BigQuery inserisce le tabelle nello stesso progetto e nello stesso set di dati.

Le tabelle create utilizzando le tabelle modello sono in genere disponibili entro pochi secondi. In rare occasioni, potrebbe essere necessario più tempo prima che diventino disponibili.

Modifica lo schema della tabella del modello

Se modifichi lo schema di una tabella del modello, tutte le tabelle generate successivamente useranno lo schema aggiornato. Le tabelle generate in precedenza non sono interessate, a meno che la tabella esistente non abbia ancora un buffer dei flussi.

Per le tabelle esistenti che hanno ancora un buffer di flusso, se modifichi modello di tabella in modo compatibile con le versioni precedenti, lo schema di questi vengono aggiornate anche le tabelle generate in streaming attivamente. Tuttavia, se modifichi lo schema della tabella del modello in modo non compatibile con le versioni precedenti, tutti i dati memorizzati nella cache che utilizzano il vecchio schema andranno persi. Inoltre, non puoi trasmettere nuovi dati alle tabelle generate esistenti che utilizzano lo schema precedente, ora incompatibile.

Dopo aver modificato lo schema di una tabella modello, attendi che le modifiche vengano propagate prima di provare a inserire nuovi dati o a eseguire query sulle tabelle generate. Le richieste di inserimento di nuovi campi dovrebbero andare a buon fine entro pochi minuti. I tentativi di eseguire query sui nuovi campi potrebbero richiedere un'attesa più lunga, fino a 90 minuti.

Se vuoi modificare lo schema di una tabella generata, non modificare il valore dello schema finché non viene interrotto il flusso di dati tramite la tabella del modello e la sezione delle statistiche sui flussi di dati della tabella non è presente nella risposta tables.get(), che indica che nessun dato è memorizzato nel buffer nella tabella.

Le tabelle partizionate e le tabelle cluster non presentano le limitazioni precedenti e sono il meccanismo consigliato.

Dettagli della tabella del modello

Valore suffisso del modello
Il valore templateSuffix (o --template_suffix) deve contenere solo lettere (a-z, A-Z), numeri (0-9) o trattini bassi (_). La lunghezza massima combinata del nome della tabella e del suffisso della tabella è di 1024 caratteri.
Quota

Le tabelle dei modelli sono soggette alla quota di flussi di dati limitazioni. Il progetto può creare fino a 10 tabelle al secondo con tabelle modello, in modo simile all'API tables.insert. Questa quota si applica solo alle tabelle in fase di creazione, non alle tabelle in fase di modifica.

Se la tua applicazione deve creare più di 10 tabelle al secondo, ti consigliamo di utilizzare le tabelle raggruppate. Ad esempio, puoi inserire l'ID tabella ad alta cardinalità nella colonna chiave di una singola tabella di clustering.

Durata

La tabella generata eredita la data e l'ora di scadenza dal set di dati. Come per i normali dati in streaming, le tabelle generate non possono essere copiate immediatamente.

Deduplicazione

La deduplicazione avviene solo tra riferimenti uniformi a una tabella di destinazione. Ad esempio, se esegui lo streaming contemporaneamente in una tabella generata utilizzando sia le tabelle modello sia un normale comando insertAll, non viene eseguita la deduplica tra le righe inserite dalle tabelle modello e da un normale comando insertAll.

Visualizzazioni

La tabella modello e le tabelle generate non devono essere viste.

Risoluzione dei problemi di inserimento di flussi di dati

Le sezioni seguenti illustrano come risolvere gli errori che si verificano. quando trasferisci flussi di dati in BigQuery utilizzando l'API di streaming legacy. Per ulteriori informazioni su come risolvere gli errori di quota per gli inserimenti di flussi di dati, consulta la sezione Errori di quota relativi agli inserimenti di flussi di dati.

Codici di risposta HTTP di errore

Se ricevi un codice di risposta HTTP di errore, ad esempio un errore di rete, non è possibile capire se l'inserimento di flussi di dati è riuscito. Se provi a inviare di nuovo la richiesta, potresti ritrovarti con righe duplicate nella tua tabella. Per aiutare proteggi la tua tabella dalla duplicazione, imposta la proprietà insertId quando inviando la tua richiesta. BigQuery utilizza la proprietà insertId per la deduplicazione.

Se ricevi un errore di autorizzazione, un errore relativo al nome della tabella non valido o un errore di superamento della quota, non vengono inserite righe e l'intera richiesta non va a buon fine.

Codici di risposta HTTP di successo

Anche se ricevi un codice di risposta HTTP di successo, devi controllare la proprietà insertErrors della risposta per determinare se le inserzioni di righe sono andate a buon fine, perché è possibile che BigQuery abbia avuto solo un parziale successo nell'inserire le righe. Potresti riscontrare uno dei seguenti scenari:

  • Tutte le righe sono state inserite correttamente. Se la proprietà insertErrors è un elenco vuoto, tutte le righe sono state inserite correttamente.
  • Alcune righe sono state inserite correttamente. A meno che non esista una mancata corrispondenza dello schema in una delle righe, le righe indicate nella proprietà insertErrors non vengono inserite e tutte le altre righe vengono inserite correttamente. La proprietà errors contiene informazioni dettagliate sul motivo dell'errore di ogni riga non riuscita. La proprietà index indica l'indice di riga a partire da 0 della richiesta a cui si applica l'errore.
  • Nessuna delle righe inserite correttamente. Se BigQuery rileva mancata corrispondenza dello schema nelle singole righe della richiesta, nessuna riga viene inserita e Viene restituita una voce insertErrors per ogni riga, anche per le righe che non avevano uno schema mancata corrispondenza. Le righe che non hanno riscontrato una mancata corrispondenza dello schema presentano un errore con la proprietà reason impostata su stopped e possono essere inviate di nuovo così come sono. Le righe che non sono riuscite includono informazioni dettagliate sulla mancata corrispondenza dello schema. Per informazioni sui tipi di buffer di protocollo supportati per ogni tipo di dato BigQuery, consulta Conversioni dei tipi di dati.

Errori dei metadati per gli inserimenti di flussi di dati

Poiché l'API per i flussi di dati di BigQuery è progettata per tassi di inserzione elevati, le modifiche all'esposizione dei metadati della tabella sottostante sono coerenti quando si interagisce con il sistema di streaming. La maggior parte delle volte, le modifiche ai metadati vengono propagate entro pochi minuti, ma durante questo periodo le risposte dell'API potrebbero riflettere lo stato incoerente della tabella.

Ecco alcuni scenari:

  • Modifiche allo schema. La modifica dello schema di una tabella che ha ricevuto di recente inserimenti tramite streaming può causare risposte con errori di mancata corrispondenza dello schema perché il sistema di streaming potrebbe non rilevare immediatamente la modifica dello schema.
  • Creazione/eliminazione di una tabella. Il flusso di dati verso una tabella inesistente restituisce una variante di un Risposta notFound. Una tabella creata in risposta potrebbe non essere riconosciuta immediatamente per inserimento di flussi di dati successivi. Analogamente, l'eliminazione o la ricreazione di una tabella può creare un periodo di tempo in cui gli inserimenti in streaming vengono effettivamente inviati alla vecchia tabella. Il flusso di dati inserisce potrebbero non essere presenti nella nuova tabella.
  • Troncamento della tabella. Troncamento dei dati di una tabella (utilizzando un job di query che utilizza writeDisposition di WRITE_TRUNCATE) può causare allo stesso modo inserimenti successivi durante la coerenza in un periodo di tempo limitato.

Dati mancanti/non disponibili

Gli inserimenti in streaming risiedono temporaneamente nello spazio di archiviazione ottimizzato per la scrittura, che presenta caratteristiche di disponibilità diverse rispetto allo spazio di archiviazione gestito. Alcune operazioni in BigQuery non interagiscono con lo spazio di archiviazione ottimizzato per la scrittura, come i job di copia delle tabelle e i metodi API come tabledata.list. I flussi di dati recenti non saranno presenti nella tabella o nell'output di destinazione.