Como fazer streaming de dados no BigQuery

Nesta página, descrevemos como fazer streaming de dados para o BigQuery usando o método tabledata.insertAll. O streaming é uma alternativa ao carregamento em lote para ingestão de dados no BigQuery. Com o streaming, você insere registros individuais um de cada vez ou em pequenos microlotes (geralmente 500 registros ou menos).

Casos de uso de exemplo

Se você tem um aplicativo que coleta uma grande quantidade de dados em tempo real, inserir streaming pode ser uma boa escolha. Geralmente, esses tipos de aplicativos têm os seguintes critérios:

  • Não transacional. Alto volume, linhas continuamente acrescentadas. O aplicativo consegue tolerar uma possibilidade rara de duplicação ou de dados temporariamente indisponíveis.
  • Análise agregada. Em geral, as consultas são realizadas para análise de tendências, em vez da seleção de registros simples ou estreitos.

Um exemplo de geração de registros de eventos de alto volume é o acompanhamento de eventos. Suponha que você tenha um aplicativo para dispositivos móveis que acompanhe eventos. O aplicativo, ou os servidores móveis, podem registrar de maneira independente as interações do usuário ou os erros do sistema e fazer streaming deles para o BigQuery. Analise esses dados para determinar as tendências gerais, como áreas de alta interação ou problemas, e monitorar as condições de erro em tempo real.

Antes de começar

  1. Verifique se você tem acesso de gravação ao conjunto de dados que contém a tabela de destino. A tabela precisa existir antes de você começar a gravar dados nela, a menos que você esteja usando tabelas de modelo. Para mais informações sobre tabelas de modelo, consulte Criação automática de tabelas usando tabelas de modelo.

  2. Verifique a política de cotas para dados de streaming.

  3. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

O streaming não está disponível pelo nível gratuito. Se tentar usar o streaming sem ativar o faturamento, este erro será exibido: BigQuery: Streaming insert is not allowed in the free tier.

Permissões necessárias

No mínimo, para fazer streaming de dados no BigQuery é preciso receber as seguintes permissões:

  • bigquery.tables.updateData para inserir dados na tabela
  • bigquery.tables.get para receber metadados da tabela
  • bigquery.datasets.get para receber metadados do conjunto de dados

Se você usar um modelo de tabela para criar a tabela automaticamente, também precisará ter a permissão bigquery.tables.create.

Os seguintes papéis predefinidos do Identity and Access Management (IAM) incluem as permissões bigquery.tables.updateData e bigquery.tables.create:

  • bigquery.dataEditor
  • bigquery.dataOwner
  • bigquery.admin

Para mais informações sobre os papéis e as permissões do IAM no BigQuery, consulte Papéis e permissões predefinidos.

Como fazer streaming de dados no BigQuery

C#

Antes de testar esta amostra, siga as instruções de configuração do C# no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery em C#.


using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

Antes de testar esta amostra, siga as instruções de configuração do Go no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery em Go.

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

Antes de testar esta amostra, siga as instruções de configuração do Java no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery em Java.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");

    tableInsertRows(datasetName, tableName, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can also supply optional unique row keys to support de-duplication
                  // scenarios.
                  .addRow(rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

Antes de testar esta amostra, siga as instruções de configuração do Node.js no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery Node.js.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  const rows = [
    {name: 'Tom', age: 30},
    {name: 'Jane', age: 32},
  ];

  // Insert data into a table
  await bigquery
    .dataset(datasetId)
    .table(tableId)
    .insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}

PHP

Antes de testar esta amostra, siga as instruções de configuração do PHP no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery PHP.

use Google\Cloud\BigQuery\BigQueryClient;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableId   = 'The BigQuery table ID';
// $data = [
//     "field1" => "value1",
//     "field2" => "value2",
// ];

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table($tableId);

$insertResponse = $table->insertRows([
    ['data' => $data],
    // additional rows can go here
]);
if ($insertResponse->isSuccessful()) {
    print('Data streamed into BigQuery successfully' . PHP_EOL);
} else {
    foreach ($insertResponse->failedRows() as $row) {
        foreach ($row['errors'] as $error) {
            printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
        }
    }
}

Python

Antes de testar esta amostra, siga as instruções de configuração do Python no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery em Python.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {u"full_name": u"Phred Phlyntstone", u"age": 32},
    {u"full_name": u"Wylma Phlyntstone", u"age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

Antes de testar esta amostra, siga as instruções de configuração do Ruby no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de Referência da API BigQuery Ruby.

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

Não é necessário preencher o campo insertID ao inserir linhas. O exemplo a seguir mostra como evitar o envio de um insertID para cada linha durante o streaming.

Python

Antes de testar esta amostra, siga as instruções de configuração do Python no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery em Python.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {u"full_name": u"Phred Phlyntstone", u"age": 32},
    {u"full_name": u"Wylma Phlyntstone", u"age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Como enviar dados de data/hora

Para campos de dados/tempo, formate os dados no método tabledata.insertAll da seguinte maneira.

Tipo Formato
DATE Uma string no formato "YYYY-MM-DD"
DATETIME Uma string no formato "YYYY-MM-DD [HH:MM:SS]"
TIME Uma string no formato "HH:MM:SS"
TIMESTAMP O número de segundos desde 1970-01-01 (a época do Unix) ou uma string no formato "YYYY-MM-DD HH:MM[:SS]"

Verificação da disponibilidade de dados

Os dados de streaming estão disponíveis para análise em tempo real alguns segundos depois da primeira inserção de streaming em uma tabela. Em raras circunstâncias, como em uma interrupção, os dados no buffer de streaming podem estar temporariamente indisponíveis. Quando os dados não estão disponíveis, as consultas continuam sendo executadas com êxito, mas ignoram alguns dos dados que ainda estão no buffer de streaming. Essas consultas conterão um aviso no campo errors de bigquery.jobs.getQueryResults, na resposta a bigquery.jobs.query ou no campo status.errors de bigquery.jobs.get.

Os dados podem levar até 90 minutos para ficarem disponíveis para operações de cópia. Além disso, ao fazer streaming para uma tabela particionada, os dados no buffer de streaming têm um valor NULL para a pseudocoluna _PARTITIONTIME. Para ver se os dados estão disponíveis para cópia, verifique se, na resposta tables.get, existe uma seção denominada streamingBuffer. Se estiver ausente, os dados estarão disponíveis para cópia com um valor não nulo para a pseudocoluna _PARTITIONTIME. Além disso, o campo streamingBuffer.oldestEntryTime pode ser aproveitado para identificar a idade dos registros no buffer de streaming.

Eliminação de duplicação por melhor esforço

Quando você fornece insertId para uma linha inserida, o BigQuery usa esse ID para aceitar a eliminação de duplicação por melhor esforço por até um minuto. Isso significa que, se você tentar transmitir a mesma linha com o mesmo insertId mais de uma vez nesse período na mesma tabela, o BigQuery pode eliminar a duplicação das várias ocorrências dessa linha, mantendo apenas uma dessas ocorrências.

Isso geralmente se destina a cenários de nova tentativa em um sistema distribuído em que não é possível determinar o estado de uma inserção de streaming sob determinadas condições de erro, como erros de rede entre o sistema e o BigQuery ou erros internos do BigQuery. Se você tentar novamente uma inserção, use o mesmo insertId para o mesmo conjunto de linhas e o BigQuery tentará eliminar a duplicação dos seus dados. Para mais informações, consulte solução de problemas com inserções de streaming.

A eliminação de duplicação feita pelo BigQuery é o melhor esforço possível e não pode ser considerado um mecanismo para garantir a ausência de cópias nos dados. Além disso, o BigQuery pode prejudicar a qualidade da eliminação de duplicação por melhor esforço a qualquer momento para garantir maior confiabilidade e disponibilidade dos dados.

Se você tem requisitos rígidos de eliminação de duplicação para os dados, o Google Cloud Datastore é um serviço alternativo compatível com transações.

Como desativar a eliminação de duplicação por melhor esforço

É possível desativar a eliminação de duplicação por melhor esforço ao não preencher o campo insertId de cada linha inserida. Não preencher insertId resulta em cotas de ingestão de streaming mais altas em determinadas regiões. Essa é a maneira recomendada de aumentar os limites da cota de ingestão de streaming. Para mais informações, consulte Cotas e limites.

Apache Beam e Dataflow

Para desativar a eliminação de duplicação por melhor esforço ao usar o conector de E/S do BigQuery (em inglês) do Apache Beam para Java, use o método ignoreInsertIds().

Remoção manual de duplicatas

Use o processo manual a seguir para garantir que não existam linhas duplicadas depois do término do streaming.

  1. Adicione insertId como uma coluna no esquema da tabela e inclua o valor insertId nos dados de cada linha.
  2. Após o streaming ser interrompido, execute a seguinte consulta para verificar se há duplicatas:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    Se o resultado for maior que 1, existem duplicatas.
  3. Para remover as duplicatas, execute a seguinte consulta. Você deve especificar uma tabela de destino, permitir resultados grandes e desativar o achatamento de resultados.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

Observações sobre a consulta de remoção de duplicatas:

  • A estratégia mais segura para a consulta de remoção de duplicatas é direcionar para uma nova tabela. Outra opção é direcionar para a tabela de origem com a disposição de gravação WRITE_TRUNCATE.
  • A consulta de remoção de duplicatas adiciona uma coluna row_number, com o valor 1, ao final do esquema da tabela. A consulta usa uma instrução SELECT * EXCEPT do SQL padrão para excluir a coluna row_number da tabela de destino. O prefixo #standardSQL ativa o SQL padrão para essa consulta. Outra opção é selecionar por nomes de coluna específicos para omitir essa coluna.
  • Para consultar dados ativos com duplicatas removidas, também é possível criar uma visualização sobre a tabela usando a consulta de remoção de duplicatas. Lembre-se de que os custos de consulta com a visualização serão calculados com base nas colunas selecionadas. Isso pode resultar em grandes tamanhos verificados de bytes.

Como fazer streaming para tabelas particionadas por tempo

Quando você faz stream de dados para uma tabela particionada por tempo, cada partição tem um buffer de streaming. O buffer de streaming é mantido quando você executa um job de carregamento, consulta ou cópia que substitui uma partição ao definir a propriedade writeDisposition como WRITE_TRUNCATE. Se quiser remover o buffer de streaming, verifique se ele está vazio chamando tables.get na partição.

Particionamento por tempo de processamento

Ao fazer streaming para uma tabela particionada por tempo de processamento, o BigQuery infere a partição de destino a partir da hora UTC atual.

Os dados recém-recebidos são colocados temporariamente na partição __UNPARTITIONED__ enquanto estão no buffer de streaming. Quando há dados não particionados suficientes, o BigQuery particiona os dados na partição correta. Uma consulta pode excluir dados do buffer de streaming de uma consulta, filtrando os valores NULL da partição __UNPARTITIONED__ usando uma das pseudocolunas ([_PARTITIONTIME] ou [_PARTITIONDATE], dependendo do tipo de dados preferido).

Se estiver fazendo streaming de dados para uma tabela particionada diária, é possível substituir a inferência de data fornecendo um decorador de partição como parte da solicitação insertAll. Inclua o decorador no parâmetro tableId. Por exemplo, é possível fazer o streaming para a partição correspondente a 01-03-2021 para a tabela table1 usando o decorador de partições:

table1$20210301

É possível fazer streaming usando um decorador de partições para partições dos últimos 31 dias e dos próximos 16 dias em relação à data atual, com base na hora UTC atual. Para gravar em partições de datas fora desses limites permitidos, use um job de carregamento ou de consulta, como descrito em Como anexar e substituir dados da tabela particionada.

O streaming usando um decorador de partição só é compatível com tabelas particionadas por dia. Ele não é compatível com tabelas particionadas por hora, mês ou ano.

Para testar, use o comando de CLI bq insert da ferramenta de linha de comando bq. Por exemplo, o comando a seguir faz streaming de uma única linha de uma partição de 1º de janeiro de 2017 ($20170101) para uma tabela particionada denominada mydataset.mytable:

echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'

Particionamento de colunas por unidade de tempo

É possível fazer streaming de dados em uma tabela particionada em uma coluna DATE, DATETIME ou TIMESTAMP que esteja entre os últimos cinco anos e o próximo ano. Dados fora desse intervalo são rejeitados.

Quando os dados são transmitidos, eles são inicialmente colocados na partição __UNPARTITIONED__. Quando há dados não particionados suficientes, o BigQuery os particiona automaticamente, colocando-os na partição apropriada.

Criação automática de tabelas usando tabelas de modelo

As tabelas de modelo fornecem um mecanismo para dividir uma tabela lógica em muitas tabelas menores para criar conjuntos menores de dados (por exemplo, por ID do usuário). As tabelas de modelo têm várias limitações, conforme descrito abaixo. Já as tabelas particionadas e em cluster são as maneiras recomendadas de conseguir esse comportamento.

Para usar uma tabela de modelo por meio da API BigQuery, adicione um parâmetro templateSuffix à sua solicitação insertAll. Na ferramenta de linha de comando bq, adicione a sinalização template_suffix ao comando insert. Se o BigQuery detectar um parâmetro templateSuffix ou a sinalização template_suffix, ele tratará a tabela visada como um modelo base e criará uma tabela nova que compartilha o mesmo esquema da tabela visada e tem um nome que inclui o sufixo especificado:

<targeted_table_name> + <templateSuffix>

Ao usar uma tabela de modelo, você evita a sobrecarga de criar cada tabela individualmente e especificar o esquema para cada tabela. Você só precisa criar um modelo único e fornecer sufixos diferentes para que o BigQuery possa criar as tabelas novas para você. O BigQuery coloca as tabelas no mesmo projeto e conjunto de dados.

As tabelas criadas por meio de tabelas de modelo geralmente ficam disponíveis em alguns segundos. Em raras ocasiões, elas podem levar mais tempo para ficarem disponíveis.

Alteração do esquema de tabelas de modelo

Se você alterar um esquema de tabelas de modelo, todas as tabelas geradas de modo subsequente usarão o esquema atualizado. As tabelas geradas anteriormente não serão afetadas, a menos que a tabela ainda tenha um buffer de streaming.

Se você modificar o esquema de tabelas de modelo de uma maneira compatível com versões anteriores para tabelas que ainda têm um buffer de streaming, o esquema dessas tabelas geradas com streaming ativo também será atualizado. No entanto, se você modificar o esquema de tabelas de modelo de uma maneira não compatível com versões anteriores, todos os dados armazenados em buffer que usam o esquema antigo serão perdidos. Além disso, você não conseguirá transmitir novos dados para as tabelas geradas que usam o esquema antigo, mas agora incompatível.

Depois de alterar um esquema de tabelas de modelo, aguarde até que as alterações se propaguem antes de tentar inserir novos dados ou consultar tabelas geradas. As solicitações para inserir novos campos devem ficar prontas em poucos minutos. As tentativas de consulta dos novos campos podem exigir uma espera mais longa, de até 90 minutos.

Caso seja necessário alterar o esquema de uma tabela gerada, não altere o esquema até que o streaming da tabela modelo seja interrompido e a seção estatísticas de streaming da tabela esteja ausente da resposta tables.get(), indicando que nenhum dado está armazenado em buffer na tabela.

As tabelas particionadas e em cluster não apresentam as limitações acima e são o mecanismo recomendado.

Detalhes da tabela de modelo

Valor do sufixo do modelo
O valor templateSuffix (ou --template_suffix) precisa conter apenas letras (a-z, A-Z), números (0-9) ou sublinhados (_). O tamanho máximo combinado do nome e do sufixo da tabela é de 1.024 caracteres.
Cota
As tabelas de modelo estão sujeitas a limitações de cota de streaming semelhantes às de outras tabelas. Além disso, ao fazer streaming para tabelas de modelo, desativar a eliminação de duplicação por melhor esforço não fornece cotas maiores.
Tempo para ficar ativa
A tabela gerada herda o prazo de validade do conjunto de dados. Como acontece com os dados de streaming normais, as tabelas geradas não podem ser copiadas imediatamente.
Desduplicação
A desduplicação só acontece entre referências uniformes a uma tabela de destino. Por exemplo, ao fazer streaming simultâneo para uma tabela gerada usando tabelas de modelo e um comando insertAll normal, nenhuma duplicação será eliminada entre linhas inseridas por tabelas de modelo e um comando insertAll normal.
Visualizações
A tabela de modelo e as tabelas geradas não devem ser visualizadas.

Solução de problemas de inserções de streaming

Para informações sobre como resolver problemas durante inserções de streaming, consulte Resolução de problemas com inserções de streaming na página "Resolução de problemas de erros".