Como fazer streaming de dados no BigQuery

Em vez de usar um job para carregar dados no BigQuery, é possível transmiti-los por streaming, um registro por vez, usando o método tabledata.insertAll. Essa abordagem permite a consulta de dados sem o atraso de execução de um job de carga.

Neste documento, você verá discussões sobre várias compensações importantes a considerar antes de escolher uma abordagem, incluindo cotas de streaming, disponibilidade de dados e consistência de dados.

Antes de começar

  1. Verifique se você tem acesso de gravação ao conjunto de dados que contém a tabela de destino. A tabela precisa existir antes de você começar a gravar dados nela, a menos que você esteja usando tabelas de modelo. Para mais informações sobre tabelas de modelo, consulte Criação automática de tabelas usando tabelas de modelo.

  2. Verifique a política de cotas para dados de streaming.

  3. Verifique se a cobrança está ativada para o seu projeto do Google Cloud. Saiba como confirmar se a cobrança está ativada para o seu projeto.

O streaming não está disponível pelo nível gratuito. Se tentar usar o streaming sem ativar o faturamento, este erro será exibido: BigQuery: Streaming insert is not allowed in the free tier.

Verificação da disponibilidade de dados

Os dados de streaming estão disponíveis para análise em tempo real alguns segundos depois da primeira inserção de streaming em uma tabela. Em raras circunstâncias, como em uma interrupção, os dados no buffer de streaming podem estar temporariamente indisponíveis. Quando os dados não estão disponíveis, as consultas continuam sendo executadas com êxito, mas ignoram alguns dos dados que ainda estão no buffer de streaming. Essas consultas conterão um aviso no campo errors de bigquery.jobs.getQueryResults, na resposta a bigquery.jobs.query ou no campo status.errors de bigquery.jobs.get.

Os dados podem levar até 90 minutos para ficarem disponíveis para operações de cópia. Além disso, ao fazer streaming para uma tabela particionada, os dados no buffer de streaming têm um valor NULL para a pseudocoluna _PARTITIONTIME. Para ver se os dados estão disponíveis para cópia, verifique se, na resposta tables.get, existe uma seção denominada streamingBuffer. Se estiver ausente, os dados estarão disponíveis para cópia com um valor não nulo para a pseudocoluna _PARTITIONTIME. Além disso, o campo streamingBuffer.oldestEntryTime pode ser aproveitado para identificar a idade dos registros no buffer de streaming.

Melhor eliminação de duplicação

Quando você fornece insertId para uma linha inserida, o BigQuery usa esse ID para aceitar a melhor eliminação de duplicação por até um minuto. Isso significa que, se você tentar transmitir a mesma linha com o mesmo insertId mais de uma vez nesse período na mesma tabela, o BigQuery pode desativar a duplicação das várias ocorrências dessa linha, mantendo apenas uma dessas ocorrências.

Isso geralmente se destina a cenários de nova tentativa em um sistema distribuído em que não é possível determinar o estado de uma inserção de streaming sob determinadas condições de erro, como erros de rede entre o sistema e o BigQuery ou erros internos em BigQuery. Se você tentar novamente uma inserção, use o mesmo insertId para o mesmo conjunto de linhas e o BigQuery tentará eliminar a duplicação dos seus dados. Para mais informações, consulte solução de problemas com inserções de streaming.

A eliminação de duplicação feita pelo BigQuery é o melhor esforço possível e não pode ser considerado um mecanismo para garantir a ausência de cópias nos dados. Além disso, o BigQuery pode prejudicar a qualidade da melhor eliminação de duplicação a qualquer momento para garantir maior confiabilidade e disponibilidade dos dados.

Se os dados tiverem requisitos rígidos de eliminação de duplicação, o Google Cloud Datastore é um serviço alternativo compatível com transações.

Como desativar a eliminação de duplicação por melhor esforço

É possível desativar a eliminação de duplicação por melhor esforço ao não preencher o campo insertId de cada linha inserida. Não preencher insertId resulta em cotas de ingestão de streaming mais altas em determinadas regiões. Essa é a maneira recomendada de aumentar os limites da cota de ingestão de streaming. Para mais informações, consulte Cotas e limites.

Apache Beam e Dataflow

Para desativar a eliminação de duplicação por melhor esforço ao usar o conector de E/S do BigQuery (em inglês) do Apache Beam para Java, use o método ignoreInsertIds().

Transmissão de dados entre locais de dados

É possível transmitir dados para conjuntos de dados nos EUA e na UE. Os dados podem fluir por meio de máquinas fora do local do conjunto de dados enquanto o BigQuery processa a solicitação insertAll. Se você estiver transmitindo dados a partir de um local fora do local do conjunto de dados, poderá ter latência e taxas de erro mais elevadas.

Como fazer streaming em tabelas particionadas por tempo de processamento

É possível fazer o streaming de linhas individuais em uma tabela particionada usando solicitações insertAll. Por padrão, a partição de destino dos dados inseridos é inferida a partir da data atual com base na hora UTC.

Se estiver fazendo streaming de dados para uma tabela particionada por tempo de ingestão, é possível substituir a inferência de data fornecendo um decorador de partição como parte da solicitação insertAll. Por exemplo, é possível fazer o streaming para a partição correspondente a 01-03-2017 para a tabela mydataset.table por meio do decorador de partições:

mydataset.table$20170301

Os dados recém-recebidos serão associados temporariamente à partição UNPARTITIONED enquanto estiverem no buffer de streaming. Uma consulta pode, portanto, excluir dados do buffer de streaming de uma consulta, filtrando os valores NULL da partição UNPARTITIONED usando uma das pseudocolunas ([_PARTITIONTIME] ou [_PARTITIONDATE], dependendo do tipo de dados preferido).

É possível fazer streaming usando um decorador de partições para partições dos últimos 31 dias e dos próximos 16 dias em relação à data atual, com base na hora UTC atual. Para gravar em partições de datas fora desses limites permitidos, é possível usar jobs de carregamento ou consulta, como descrito em Como anexar e substituir dados de tabelas particionadas.

Como fazer streaming para tabelas particionadas

É possível fazer streaming de dados em uma tabela particionada em uma coluna DATE ou TIMESTAMP que esteja entre os últimos cinco anos e o próximo ano. Dados fora desse intervalo são rejeitados.

Quando transmitidos, os dados são extraídos primeiro para a partição UNPARTITIONED. Quando há dados não particionados suficientes, eles são particionados nas partições correspondentes.

Criação automática de tabelas usando tabelas de modelo

As tabelas de modelo fornecem um mecanismo para dividir uma tabela lógica em muitas tabelas menores para criar conjuntos menores de dados (por exemplo, por ID de usuário). As tabelas de modelo têm várias limitações, conforme descrito abaixo. Já as tabelas particionadas e em cluster são as maneiras recomendadas de atingir esse comportamento.

Para usar uma tabela de modelo por meio da API BigQuery, adicione um parâmetro templateSuffix à sua solicitação insertAll. Na ferramenta de linha de comando bq, adicione a sinalização template_suffix ao comando insert. Se o BigQuery detectar um parâmetro templateSuffix ou a sinalização template_suffix, ele tratará a tabela visada como um modelo base e criará uma tabela nova que compartilha o mesmo esquema da tabela visada e tem um nome que inclui o sufixo especificado:

<targeted_table_name> + <templateSuffix>

Ao usar uma tabela de modelo, você evita a sobrecarga de criar cada tabela individualmente e especificar o esquema para cada tabela. Você só precisa criar um modelo único e fornecer sufixos diferentes para que o BigQuery possa criar as tabelas novas para você. O BigQuery coloca as tabelas no mesmo projeto e conjunto de dados.

As tabelas criadas por meio de tabelas de modelo geralmente ficam disponíveis em alguns segundos. Em raras ocasiões, elas podem levar mais tempo para ficarem disponíveis.

Alteração do esquema de tabelas de modelo

Se você alterar um esquema de tabelas de modelo, todas as tabelas geradas de modo subsequente usarão o esquema atualizado. As tabelas geradas anteriormente não serão afetadas, a menos que a tabela ainda tenha um buffer de streaming.

Se você modificar o esquema de tabelas de modelo de uma maneira compatível com versões anteriores para tabelas que ainda têm um buffer de streaming, o esquema dessas tabelas geradas com streaming ativo também será atualizado. No entanto, se você modificar o esquema de tabelas de modelo de uma maneira não compatível com versões anteriores, todos os dados armazenados em buffer que usam o esquema antigo serão perdidos. Além disso, você não conseguirá transmitir novos dados para as tabelas geradas que usam o esquema antigo, mas agora incompatível.

Depois de alterar um esquema de tabelas de modelo, aguarde até que as alterações se propaguem antes de tentar inserir novos dados ou consultar tabelas geradas. As solicitações para inserir novos campos devem ficar prontas em poucos minutos. As tentativas de consulta dos novos campos podem exigir uma espera mais longa, de até 90 minutos.

Caso seja necessário alterar o esquema de uma tabela gerada, não altere o esquema até que o streaming da tabela modelo seja interrompido e a seção estatísticas de streaming da tabela esteja ausente da resposta tables.get(), indicando que nenhum dado está armazenado em buffer na tabela.

As tabelas particionadas e em cluster não apresentam as limitações acima e são o mecanismo recomendado.

Detalhes da tabela de modelo

Valor do sufixo do modelo
O valor templateSuffix (ou --template_suffix) precisa conter apenas letras (a-z, A-Z), números (0-9) ou sublinhados (_). O tamanho máximo combinado do nome e do sufixo da tabela é de 1.024 caracteres.
Cota
As tabelas de modelo estão sujeitas a limitações de cota de streaming semelhantes às de outras tabelas. Além disso, ao fazer streaming para tabelas de modelo, desativar a eliminação de duplicação por melhor esforço não fornece cotas maiores.
Tempo para ficar ativa
A tabela gerada herda o prazo de validade do conjunto de dados. Como acontece com os dados de streaming normais, as tabelas geradas não podem ser copiadas ou exportadas imediatamente.
Desduplicação
A desduplicação só acontece entre referências uniformes a uma tabela de destino. Por exemplo, ao fazer streaming simultâneo para uma tabela gerada usando tabelas de modelo e um comando insertAll normal, nenhuma duplicação será eliminada entre linhas inseridas por tabelas de modelo e um comando insertAll normal.
Visualizações
A tabela de modelo e as tabelas geradas não devem ser visualizadas.

Casos de uso de exemplo

Geração de registros de eventos de alto volume

Se você tem um aplicativo que coleta uma grande quantidade de dados em tempo real, inserir streaming pode ser uma boa escolha. Geralmente, esses tipos de aplicativos têm os seguintes critérios:

  • Não transacional. Alto volume, linhas continuamente acrescentadas. O aplicativo consegue tolerar uma possibilidade rara de duplicação ou de dados temporariamente indisponíveis.
  • Análise agregada. Em geral, as consultas são realizadas para análise de tendências, em vez da seleção de registros simples ou estreitos.

Um exemplo de geração de registros de eventos de alto volume é o acompanhamento de eventos. Suponha que você tenha um aplicativo para dispositivos móveis que acompanhe eventos. O aplicativo, ou os servidores móveis, podem registrar de maneira independente as interações do usuário ou os erros do sistema e fazer streaming deles para o BigQuery. Analise esses dados para determinar as tendências gerais, como áreas de alta interação ou problemas, e monitorar as condições de erro em tempo real.

Remoção manual de duplicatas

Use o processo manual a seguir para garantir que não existam linhas duplicadas depois do término do streaming.

  1. Adicione insertId como uma coluna no esquema da tabela e inclua o valor insertId nos dados de cada linha.
  2. Após o streaming ser interrompido, execute a seguinte consulta para verificar se há duplicatas:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    Se o resultado for maior que 1, existem duplicatas.
  3. Para remover as duplicatas, execute a seguinte consulta. Você deve especificar uma tabela de destino, permitir resultados grandes e desativar o achatamento de resultados.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

Observações sobre a consulta de remoção de duplicatas:

  • A estratégia mais segura para a consulta de remoção de duplicatas é direcionar para uma nova tabela. Outra opção é direcionar para a tabela de origem com a disposição de gravação WRITE_TRUNCATE.
  • A consulta de remoção de duplicatas adiciona uma coluna row_number, com o valor 1, ao final do esquema da tabela. A consulta usa uma instrução SELECT * EXCEPT do SQL padrão para excluir a coluna row_number da tabela de destino. O prefixo #standardSQL ativa o SQL padrão para essa consulta. Outra opção é selecionar por nomes de coluna específicos para omitir essa coluna.
  • Para consultar dados ativos com duplicatas removidas, também é possível criar uma visualização sobre a tabela usando a consulta de remoção de duplicatas. Lembre-se de que os custos de consulta com a visualização serão calculados com base nas colunas selecionadas. Isso pode resultar em grandes tamanhos verificados de bytes.

Solução de problemas de inserções de streaming

Para informações sobre como resolver problemas durante inserções de streaming, consulte Resolução de problemas com inserções de streaming na página "Resolução de problemas de erros".

Voltar ao início

Exemplos de inserções de streaming

C#

Antes de testar essa amostra, siga as instruções de configuração para C# no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery C# (em inglês).


using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

Antes de testar essa amostra, siga as instruções de configuração para Go no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery Go (em inglês).

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, "", nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

Antes de testar essa amostra, siga as instruções de configuração para Java no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery Java.

Ver no GitHub (em inglês) Feedback
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void runTableInsertRows() {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");

    tableInsertRows(datasetName, tableName, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can also supply optional unique row keys to support de-duplication
                  // scenarios.
                  .addRow(rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

Antes de testar essa amostra, siga as instruções de configuração para Node.js no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery Node.js (em inglês).

Ver no GitHub (em inglês) Feedback
// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  // const rows = [{name: 'Tom', age: 30}, {name: 'Jane', age: 32}];

  // Create a client
  const bigqueryClient = new BigQuery();

  // Insert data into a table
  await bigqueryClient
    .dataset(datasetId)
    .table(tableId)
    .insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}
insertRowsAsStream();

PHP

Antes de testar esta amostra, siga as instruções de configuração para PHP no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery PHP (em inglês).

Ver no GitHub (em inglês) Feedback
use Google\Cloud\BigQuery\BigQueryClient;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableId   = 'The BigQuery table ID';
// $data = [
//     "field1" => "value1",
//     "field2" => "value2",
// ];

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table($tableId);

$insertResponse = $table->insertRows([
    ['data' => $data],
    // additional rows can go here
]);
if ($insertResponse->isSuccessful()) {
    print('Data streamed into BigQuery successfully' . PHP_EOL);
} else {
    foreach ($insertResponse->failedRows() as $row) {
        foreach ($row['errors'] as $error) {
            printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
        }
    }
}

Python

Antes de testar essa amostra, siga as instruções de configuração para Python no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery Python (em inglês).

Ver no GitHub (em inglês) Feedback
# TODO(developer): Import the client library.
# from google.cloud import bigquery

# TODO(developer): Construct a BigQuery client object.
# client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the model to fetch.
# table_id = "your-project.your_dataset.your_table"

table = client.get_table(table_id)  # Make an API request.
rows_to_insert = [(u"Phred Phlyntstone", 32), (u"Wylma Phlyntstone", 29)]

errors = client.insert_rows(table, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")

Ruby

Antes de testar esta amostra, siga as instruções de configuração para Ruby no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery Ruby (em inglês).

Ver no GitHub (em inglês) Feedback
require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

Não é necessário preencher o campo insertId ao inserir linhas. Os exemplos a seguir mostram como evitar o envio de insertId para cada linha durante o streaming.

Java

Antes de testar essa amostra, siga as instruções de configuração para Java no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery Java.

Ver no GitHub (em inglês) Feedback
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to insert rows without row ids in a table
public class TableInsertRowsWithoutRowIds {

  public static void runTableInsertRowsWithoutRowIds() {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create rows to insert
    Map<String, Object> rowContent1 = new HashMap<>();
    rowContent1.put("stringField", "Phred Phlyntstone");
    rowContent1.put("numericField", 32);
    Map<String, Object> rowContent2 = new HashMap<>();
    rowContent2.put("stringField", "Wylma Phlyntstone");
    rowContent2.put("numericField", 29);
    List<InsertAllRequest.RowToInsert> rowContent = new ArrayList<>();
    // insertId is null if not specified
    rowContent.add(InsertAllRequest.RowToInsert.of(rowContent1));
    rowContent.add(InsertAllRequest.RowToInsert.of(rowContent2));
    tableInsertRowsWithoutRowIds(datasetName, tableName, rowContent);
  }

  public static void tableInsertRowsWithoutRowIds(
      String datasetName, String tableName, Iterable<InsertAllRequest.RowToInsert> rows) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(InsertAllRequest.newBuilder(tableId).setRows(rows).build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table without row ids");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Python

Antes de testar essa amostra, siga as instruções de configuração para Python no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery Python (em inglês).


from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the model to fetch.
# table_id = "your-project.your_dataset.your_table"

table = client.get_table(table_id)  # Make an API request.
rows_to_insert = [(u"Phred Phlyntstone", 32), (u"Wylma Phlyntstone", 29)]

errors = client.insert_rows(
    table, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")

Voltar ao início