Como fazer streaming de dados no BigQuery

Em vez de usar um job para carregar dados no BigQuery, é possível transmitir os dados por streaming para o BigQuery, um registro por vez, usando o método tabledata.insertAll. Essa abordagem permite a consulta de dados sem o atraso de execução de um job de carga. Este documento discute várias compensações importantes a considerar antes de escolher uma abordagem, incluindo cotas de streaming, disponibilidade de dados e consistência de dados.

Antes de começar

  1. Verifique se você tem acesso de gravação ao conjunto de dados que contém a tabela de destino. A tabela precisa existir antes de você começar a gravar dados nela, a menos que você esteja usando tabelas de modelo. Para mais informações sobre tabelas de modelo, consulte Criação automática de tabelas usando tabelas de modelo.

  2. Verifique a política de cotas para dados de streaming.

  3. Verifique se o faturamento foi ativado no projeto do Google Cloud Platform.

    Saiba como ativar o faturamento

    O streaming não está disponível pelo nível gratuito. Se você tentar usar o streaming sem ativar o faturamento, receberá o seguinte erro: BigQuery: Streaming insert is not allowed in the free tier.

Verificação da disponibilidade de dados

Os dados de streaming estão disponíveis para análise em tempo real alguns segundos depois da primeira inserção de streaming em uma tabela. Em raras circunstâncias, como em uma interrupção, os dados no buffer de streaming podem estar temporariamente indisponíveis. Quando os dados não estão disponíveis, as consultas continuam sendo executadas com êxito, mas ignoram alguns dos dados que ainda estão no buffer de streaming. Essas consultas conterão um aviso no campo errors de bigquery.jobs.getQueryResults, na resposta a bigquery.jobs.query ou no campo status.errors de bigquery.jobs.get.

Os dados podem levar até 90 minutos para ficarem disponíveis para operações de cópia e exportação. Além disso, ao fazer streaming para uma tabela particionada, os dados no buffer de streaming têm um valor NULL para a pseudocoluna _PARTITIONTIME. Para ver se os dados estão disponíveis para cópia e exportação, verifique a resposta tables.get para uma seção chamada streamingBuffer. Se essa seção estiver ausente, os dados estarão disponíveis para cópia ou exportação com um valor não nulo para a pseudocoluna _PARTITIONTIME. Além disso, o campo streamingBuffer.oldestEntryTime pode ser aproveitado para identificar a idade dos registros no buffer de streaming.

Como garantir a consistência dos dados

Para ajudar a garantir a consistência dos dados, forneça insertId para cada linha inserida. O BigQuery lembra esse código por pelo menos um minuto. Se você tentar transmitir o mesmo conjunto de linhas dentro desse período e a propriedade insertId for definida, o BigQuery usará a propriedade insertId para desduplicar os dados de acordo com o melhor esforço.

Talvez seja necessário tentar novamente uma inserção, porque não há nenhuma maneira de determinar o estado de uma inserção de streaming sob determinadas condições de erro, como erros de rede entre o sistema e o BigQuery ou erros internos no BigQuery. Se tentar novamente uma inserção, você precisará usar o mesmo insertId para o mesmo conjunto de linhas. Dessa forma, o BigQuery poderá tentar remover os dados duplicados. Para ver mais informações, consulte solução de problemas com inserções de streaming.

No caso raro de um data center do Google perder conectividade inesperadamente, a desduplicação automática talvez não seja possível.

Se os dados tiverem requisitos mais fortes, o Google Cloud Datastore será um serviço alternativo compatível com transações.

Transmissão de dados entre locais de dados

É possível transmitir dados para conjuntos de dados nos EUA e na UE. Os dados podem fluir por meio de máquinas fora do local do conjunto de dados enquanto o BigQuery processa a solicitação insertAll. Se você estiver transmitindo dados a partir de um local fora do local do conjunto de dados, poderá ter latência e taxas de erro mais elevadas.

Como fazer streaming em tabelas particionadas por tempo de processamento

É possível fazer o streaming de linhas individuais em uma tabela particionada usando solicitações insertAll. Por padrão, a partição de destino dos dados inseridos é inferida a partir da data atual com base na hora UTC.

Se o streaming de dados estiver sendo feito em uma tabela particionada por tempo de processamento, é possível substituir a inferência de data fornecendo um decorador de partição como parte da solicitação insertAll. Por exemplo, é possível fazer o streaming para a partição correspondente a 2017-03-01 para a tabela mydataset.table usando o decorador de partições:

mydataset.table$20170301

Os dados recém-recebidos serão associados temporariamente à partição UNPARTITIONED enquanto estiverem no buffer de streaming. Uma consulta pode, portanto, excluir dados do buffer de streaming de uma consulta, filtrando os valores NULL da partição UNPARTITIONED usando uma das pseudocolunas ([_PARTITIONTIME] ou [_PARTITIONDATE], dependendo do tipo de dados preferido).

É possível fazer streaming usando um decorador de partições para partições dos últimos 31 dias e dos próximos 16 dias em relação à data atual, com base na hora UTC atual. Para gravar em partições de datas fora desses limites permitidos, é possível usar jobs de carregamento ou consulta, como descrito em Como anexar e substituir dados da tabela particionada.

Como fazer streaming para tabelas particionadas

É possível fazer streaming de dados em uma tabela particionada em uma coluna DATE ou TIMESTAMP que esteja entre os últimos 12 meses e os próximos 6 meses. Dados fora desse intervalo são rejeitados.

Quando o streaming de dados acontece, os dados entre os últimos 7 dias e os próximos 3 dias são colocados no buffer de streaming e, em seguida, extraídos para as partições correspondentes. Os dados fora dessa janela (mas dentro do intervalo de 12 meses, 6 meses) são colocados no buffer de streaming e, em seguida, extraídos para a partição UNPARTITIONED. Quando há uma quantidade suficiente de dados não particionados, eles são carregados nas partições correspondentes.

Criação automática de tabelas usando tabelas de modelo

Um padrão de uso comum para transmitir dados por streaming no BigQuery é dividir uma tabela lógica em muitas tabelas menores para criar conjuntos menores de dados (por exemplo, por código de usuário). Para criar conjuntos de dados menores por data, use tabelas particionadas. Para criar tabelas menores e não baseadas em data, use as tabelas de modelos e o BigQuery criará as tabelas para você.

Para usar uma tabela de modelo por meio da API do BigQuery, adicione um parâmetro templateSuffix à solicitação insertAll. Para a ferramenta de linha de comando bq, adicione a sinalização template_suffix ao comando insert. Se o BigQuery detectar um parâmetro templateSuffix ou a sinalização template_suffix, ele tratará a tabela visada como um modelo base e criará uma tabela nova que compartilha o mesmo esquema da tabela visada e tem um nome que inclui o sufixo especificado:

<targeted_table_name> + <templateSuffix>

Ao usar uma tabela de modelo, você evita a sobrecarga de criar cada tabela individualmente e especificar o esquema para cada tabela. Você só precisa criar um modelo único e fornecer sufixos diferentes para que o BigQuery possa criar as tabelas novas para você. O BigQuery coloca as tabelas no mesmo projeto e conjunto de dados. Os modelos também facilitam a atualização do esquema, porque você só precisa atualizar a tabela de modelos.

As tabelas criadas por meio de tabelas de modelo geralmente ficam disponíveis em alguns segundos. Em raras ocasiões, elas podem levar mais tempo para ficarem disponíveis.

Alteração do esquema de tabelas de modelo

Se você alterar um esquema de tabelas de modelo, todas as tabelas geradas de modo subsequente usarão o esquema atualizado. As tabelas geradas anteriormente não serão afetadas, a menos que a tabela ainda tenha um buffer de streaming.

Se você modificar o esquema de tabelas de modelo de uma maneira compatível com versões anteriores para tabelas que ainda têm um buffer de streaming, o esquema dessas tabelas geradas com streaming ativo também será atualizado. No entanto, se você modificar o esquema de tabelas de modelo de uma maneira não compatível com versões anteriores, todos os dados armazenados em buffer que usam o esquema antigo serão perdidos. Além disso, você não conseguirá transmitir novos dados para as tabelas geradas que usam o esquema antigo, mas agora incompatível.

Depois de alterar um esquema de tabelas de modelo, aguarde até que as alterações se propaguem antes de tentar inserir novos dados ou consultar tabelas geradas. As solicitações para inserir novos campos devem ficar prontas em poucos minutos. As tentativas de consulta dos novos campos podem exigir uma espera mais longa, de até 90 minutos.

Se você quiser alterar o esquema de uma tabela gerada, não o altere até que o streaming por meio da tabela de modelo tenha cessado e a seção de estatísticas de streaming da tabela gerada esteja ausente da resposta tables.get(), o que indica que nenhum dado está armazenado no buffer na tabela.

Detalhes da tabela de modelo

Valor do sufixo do modelo
O valor templateSuffix (ou --template_suffix) só pode conter letras (a-z, A-Z), números (0-9) ou sublinhados (_). O tamanho máximo combinado do nome e do sufixo da tabela é de 1.024 caracteres.
Cota
As mesmas cotas se aplicam a todas as tabelas, sejam elas baseadas em modelos ou criadas manualmente.
Tempo para ficar ativa
A tabela gerada herda o tempo de expiração do conjunto de dados. Tal como acontece com os dados de streaming normais, as tabelas geradas não podem ser copiadas ou exportadas imediatamente.
Desduplicação
A desduplicação só acontece entre referências uniformes a uma tabela de destino. Por exemplo, se você fizer streaming simultaneamente para uma tabela gerada usando tabelas de modelo e um comando insertAll regular, nenhuma desduplicação ocorrerá entre as linhas inseridas por tabelas de modelo e um comando insertAll regular.
Visualizações
A tabela de modelo e as tabelas geradas não devem ser visualizadas.

Casos de uso de exemplo

Geração de registros de eventos de alto volume

Se você tem um aplicativo que coleta uma grande quantidade de dados em tempo real, inserir streaming pode ser uma boa escolha. Geralmente, esses tipos de aplicativos têm os seguintes critérios:

  • Não transacional. Alto volume, linhas continuamente acrescentadas. O aplicativo consegue tolerar uma possibilidade rara de duplicação ou de dados temporariamente indisponíveis.
  • Análise agregada. Em geral, as consultas são realizadas para análise de tendências, em vez da seleção de registros simples ou estreitos.

Um exemplo de geração de registros de eventos de alto volume é o acompanhamento de eventos. Suponha que você tenha um aplicativo para dispositivos móveis que acompanhe eventos. O aplicativo, ou os servidores móveis, podem registrar de maneira independente as interações do usuário ou os erros do sistema e fazer streaming deles para o BigQuery. Analise esses dados para determinar as tendências gerais, como áreas de alta interação ou problemas, e monitorar as condições de erro em tempo real.

Remoção manual de duplicatas

Use o processo manual a seguir para garantir que não existam linhas duplicadas depois do término do streaming.

  1. Adicione o insertId como uma coluna no esquema da tabela e inclua o valor insertId na data para cada linha.
  2. Após o streaming ser interrompido, execute a seguinte consulta para verificar se há duplicatas:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    Se o resultado for maior que 1, existem duplicatas.
  3. Para remover as duplicatas, execute a seguinte consulta. Você deve especificar uma tabela de destino, permitir resultados grandes e desativar o achatamento de resultados.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

Observações sobre a consulta de remoção de duplicatas:

  • A estratégia mais segura para a consulta de remoção de duplicatas é direcionar para uma nova tabela. Outra opção é direcionar para a tabela de origem com a disposição de gravação WRITE_TRUNCATE.
  • A consulta de remoção de cópias adiciona uma coluna row_number, com o valor 1, ao final do esquema da tabela. A consulta usa uma instrução SELECT * EXCEPT do SQL padrão para excluir a coluna row_number da tabela de destino. O prefixo #standardSQL ativa o SQL padrão dessa consulta. Outra opção é selecionar por nomes de coluna específicos para omitir essa coluna.
  • Para consultar dados ativos com duplicatas removidas, também é possível criar uma visualização sobre a tabela usando a consulta de remoção de duplicatas. Lembre-se de que os custos de consulta com a visualização serão calculados com base nas colunas selecionadas. Isso pode resultar em grandes tamanhos verificados de bytes.

Solução de problemas com inserções de streaming

Para informações sobre como resolver problemas durante inserções de streaming, consulte Resolução de problemas com inserções de streaming na página "Resolução de problemas de erros".

Voltar ao início

Exemplos de inserções de streaming

C#

Antes de testar esta amostra, siga as instruções de configuração do C# no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API do BigQuery para C#.

using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

Antes de testar esta amostra, siga as instruções de configuração do Go no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API do BigQuery para Go.

// To run this sample, you will need to create (or reuse) a context and
// an instance of the bigquery client.  For example:
// import "cloud.google.com/go/bigquery"
// ctx := context.Background()
// client, err := bigquery.NewClient(ctx, "your-project-id")
u := client.Dataset(datasetID).Table(tableID).Uploader()
items := []*Item{
	// Item implements the ValueSaver interface.
	{Name: "Phred Phlyntstone", Age: 32},
	{Name: "Wylma Phlyntstone", Age: 29},
}
if err := u.Put(ctx, items); err != nil {
	return err
}

Java

Antes de testar esta amostra, siga as instruções de configuração do Java no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery Java.

TableId tableId = TableId.of(datasetName, tableName);
// Values of the row to insert
Map<String, Object> rowContent = new HashMap<>();
rowContent.put("booleanField", true);
// Bytes are passed in base64
rowContent.put("bytesField", "Cg0NDg0="); // 0xA, 0xD, 0xD, 0xE, 0xD in base64
// Records are passed as a map
Map<String, Object> recordsContent = new HashMap<>();
recordsContent.put("stringField", "Hello, World!");
rowContent.put("recordField", recordsContent);
InsertAllResponse response =
    bigquery.insertAll(
        InsertAllRequest.newBuilder(tableId)
            .addRow("rowId", rowContent)
            // More rows can be added in the same RPC by invoking .addRow() on the builder
            .build());
if (response.hasErrors()) {
  // If any of the insertions failed, this lets you inspect the errors
  for (Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
    // inspect row error
  }
}

Node.js

Antes de testar esta amostra, siga as instruções de configuração do Node.js em Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API do BigQuery para Node.js.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  // const rows = [{name: 'Tom', age: 30}, {name: 'Jane', age: 32}];

  // Create a client
  const bigqueryClient = new BigQuery();

  // Insert data into a table
  await bigqueryClient
    .dataset(datasetId)
    .table(tableId)
    .insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}
insertRowsAsStream();

PHP

Antes de testar esta amostra, siga as instruções de configuração do PHP em Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery PHP.

use Google\Cloud\BigQuery\BigQueryClient;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableId   = 'The BigQuery table ID';
// $data = [
//     "field1" => "value1",
//     "field2" => "value2",
// ];

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table($tableId);

$insertResponse = $table->insertRows([
    ['data' => $data],
    // additional rows can go here
]);
if ($insertResponse->isSuccessful()) {
    print('Data streamed into BigQuery successfully' . PHP_EOL);
} else {
    foreach ($insertResponse->failedRows() as $row) {
        foreach ($row['errors'] as $error) {
            printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
        }
    }
}

Python

Antes de testar esta amostra, siga as instruções de configuração do Python em Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery Python (em inglês).

# TODO(developer): Uncomment the lines below and replace with your values.
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'  # replace with your dataset ID
# For this sample, the table must already exist and have a defined schema
# table_id = 'my_table'  # replace with your table ID
# table_ref = client.dataset(dataset_id).table(table_id)
# table = client.get_table(table_ref)  # API request

rows_to_insert = [
    (u'Phred Phlyntstone', 32),
    (u'Wylma Phlyntstone', 29),
]

errors = client.insert_rows(table, rows_to_insert)  # API request

assert errors == []

Ruby

Antes de testar esta amostra, siga as instruções de configuração do Ruby em Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery Ruby (em inglês).

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

Volte ao início

Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…

Precisa de ajuda? Acesse nossa página de suporte.