Este documento descreve como escrever dados do Dataflow para o BigQuery.
Vista geral
Para a maioria dos exemplos de utilização, considere usar a opção Managed I/O para escrever no BigQuery. A E/S gerida oferece funcionalidades como atualizações automáticas e uma API de configuração consistente. Quando escreve no BigQuery, a E/S gerida escolhe automaticamente o melhor método de escrita para tarefas em lote ou de streaming.
Se precisar de uma otimização do desempenho mais avançada, considere usar o conector
BigQueryIO
. Para mais informações, consulte a secção
Use o conector BigQueryIO
neste documento.
Desempenho
A tabela seguinte mostra as métricas de desempenho para várias cargas de trabalho. Estas cargas de trabalho foram executadas num e2-standard2
worker, usando o SDK do Apache Beam 2.49.0 para Java. Não usaram o Runner v2.
100 M de registos | 1 kB | 1 coluna | Débito (bytes) | Tráfego transmitido (elementos) |
---|---|---|
Escrita de armazenamento | 55 MBps | 54 000 elementos por segundo |
Carregamento de Avro | 78 MBps | 77 000 elementos por segundo |
Json Load | 54 MBps | 53 000 elementos por segundo |
Estas métricas baseiam-se em pipelines de processamento em lote simples. Destinam-se a comparar o desempenho entre conectores de E/S e não são necessariamente representativos de pipelines do mundo real. O desempenho do pipeline do Dataflow é complexo e é uma função do tipo de VM, dos dados que estão a ser processados, do desempenho das origens e dos destinos externos, e do código do utilizador. As métricas baseiam-se na execução do SDK Java e não são representativas das características de desempenho de outros SDKs de idiomas. Para mais informações, consulte o artigo Desempenho do Beam IO.
Use o conetor BigQueryIO
O conetor BigQuery I/O suporta os seguintes métodos para escrever no BigQuery:
STORAGE_WRITE_API
. Neste modo, o conector faz gravações diretas no armazenamento do BigQuery, usando a API BigQuery Storage Write. A API Storage Write combina o carregamento de streaming e o carregamento em lote numa única API de alto desempenho. Este modo garante a semântica exatamente uma vez.STORAGE_API_AT_LEAST_ONCE
. Este modo também usa a API Storage Write, mas oferece semântica pelo menos uma vez. Este modo resulta numa latência mais baixa para a maioria dos pipelines. No entanto, é possível fazer escritas duplicadas.FILE_LOADS
. Neste modo, o conetor escreve os dados de entrada em ficheiros de preparação no Cloud Storage. Em seguida, executa uma tarefa de carregamento do BigQuery para carregar os dados para o BigQuery. O modo é a predefinição para os conjuntos de dados delimitadosPCollections
, que são mais comuns em pipelines de processamento em lote.STREAMING_INSERTS
. Neste modo, o conetor usa a API Legacy Streaming. Este modo é o predefinido paraPCollections
ilimitados, mas não é recomendado para novos projetos.
Ao escolher um método de escrita, considere os seguintes pontos:
- Para tarefas de streaming, considere usar
STORAGE_WRITE_API
ouSTORAGE_API_AT_LEAST_ONCE
, porque estes modos escrevem diretamente no armazenamento do BigQuery, sem usar ficheiros de preparação intermédios. - Se executar o pipeline usando o modo de streaming at-least-once, defina o modo de escrita como
STORAGE_API_AT_LEAST_ONCE
. Esta definição é mais eficiente e corresponde à semântica do modo de streaming, pelo menos, uma vez. - O carregamento de ficheiros e a API Storage Write têm quotas e limites diferentes.
- As tarefas de carregamento usam o conjunto de slots do BigQuery partilhado ou slots reservados. Para usar slots reservados, execute a tarefa de carregamento num projeto com uma atribuição de reserva do tipo
PIPELINE
. As tarefas de carregamento são gratuitas se usar o conjunto de slots do BigQuery partilhado. No entanto, o BigQuery não faz garantias sobre a capacidade disponível do conjunto partilhado. Para mais informações, consulte o artigo Introdução às reservas.
Paralelismo
Para
FILE_LOADS
eSTORAGE_WRITE_API
em pipelines de streaming, o conetor fragmenta os dados num número de ficheiros ou streams. Em geral, recomendamos que chamewithAutoSharding
para ativar a divisão automática.Para
FILE_LOADS
em pipelines em lote, o conetor escreve dados em ficheiros particionados, que são, em seguida, carregados em paralelo para o BigQuery.Para
STORAGE_WRITE_API
em pipelines de processamento em lote, cada trabalhador cria um ou mais streams para escrever no BigQuery, determinado pelo número total de fragmentos.Para
STORAGE_API_AT_LEAST_ONCE
, existe uma única stream de gravação predefinida. Vários trabalhadores são anexados a esta stream.
Práticas recomendadas
A API Storage Write tem limites de quota. O conector processa estes limites para a maioria dos pipelines. No entanto, alguns cenários podem esgotar as streams da API Storage Write disponíveis. Por exemplo, este problema pode ocorrer num pipeline que usa a divisão automática e a escala automática com um grande número de destinos, especialmente em tarefas de longa duração com cargas de trabalho altamente variáveis. Se este problema ocorrer, pondere usar
STORAGE_WRITE_API_AT_LEAST_ONCE
, o que evita o problema.Use as métricas da Google Cloud Platform para monitorizar a utilização da quota da API Storage Write.
Quando usa carregamentos de ficheiros, o Avro tem normalmente um desempenho superior ao JSON. Para usar o Avro, ligue para
withAvroFormatFunction
.Por predefinição, as tarefas de carregamento são executadas no mesmo projeto que a tarefa do Dataflow. Para especificar um projeto diferente, chame
withLoadJobProjectId
.Quando usar o SDK Java, considere criar uma classe que represente o esquema da tabela do BigQuery. Em seguida, chame
useBeamSchema
no seu pipeline para converter automaticamente entre os tiposRow
do Apache Beam eTableRow
do BigQuery. Para ver um exemplo de uma classe de esquema, consulteExampleModel.java
.Se carregar tabelas com esquemas complexos que contenham milhares de campos, considere chamar
withMaxBytesPerPartition
para definir um tamanho máximo mais pequeno para cada tarefa de carregamento.Por predefinição, o
BigQueryIO
usa definições da API Storage Write que são razoáveis para a maioria dos pipelines. No entanto, se tiver problemas de desempenho, pode definir opções de pipeline para ajustar estas definições. Para mais informações, consulte o artigo Ajuste a API Storage Write na documentação do Apache Beam.
Pipelines de streaming
As seguintes recomendações aplicam-se a pipelines de streaming.
Para pipelines de streaming, recomendamos que use a API Storage Write (
STORAGE_WRITE_API
ouSTORAGE_API_AT_LEAST_ONCE
).Um pipeline de streaming pode usar carregamentos de ficheiros, mas esta abordagem tem desvantagens:
- Requer janelas para escrever os ficheiros. Não pode usar a janela global.
- O BigQuery carrega ficheiros com base no melhor esforço quando usa o conjunto de slots partilhado. Pode haver um atraso significativo entre o momento em que um registo é escrito e o momento em que fica disponível no BigQuery.
- Se uma tarefa de carregamento falhar, por exemplo, devido a dados incorretos ou a uma incompatibilidade de esquema, toda a pipeline falha.
Considere usar
STORAGE_WRITE_API_AT_LEAST_ONCE
sempre que possível. Pode resultar na escrita de registos duplicados no BigQuery, mas é menos dispendioso e mais escalável do queSTORAGE_WRITE_API
.Em geral, evite usar
STREAMING_INSERTS
. As inserções de streaming são mais caras do que a API Storage Write e não têm um desempenho tão bom.A divisão de dados pode melhorar o desempenho nos pipelines de streaming. Para a maioria dos pipelines, a divisão automática é um bom ponto de partida. No entanto, pode ajustar a divisão em partições da seguinte forma:
- Para
STORAGE_WRITE_API
, chamewithNumStorageWriteApiStreams
para definir o número de streams de escrita. - Para
FILE_LOADS
, ligue parawithNumFileShards
para definir o número de fragmentos de ficheiros.
- Para
Se usar inserções de streaming, recomendamos que defina
retryTransientErrors
como a política de repetição.
Pipelines em lote
As seguintes recomendações aplicam-se a pipelines em lote.
Para a maioria dos pipelines de processamento em lote grandes, recomendamos que experimente primeiro a opção
FILE_LOADS
. Um pipeline de lotes pode usarSTORAGE_WRITE_API
, mas é provável que exceda os limites de quota em grande escala (mais de 1000 vCPUs) ou se estiverem a ser executados pipelines concorrentes. O Apache Beam não limita o número máximo de streams de escrita para tarefas em lote, pelo que a tarefa acaba por atingir os limites da API BigQuery Storage.STORAGE_WRITE_API
Quando usa o
FILE_LOADS
, pode esgotar o conjunto de slots do BigQuery partilhado ou o seu conjunto de slots reservados. Se encontrar este tipo de falha, experimente as seguintes abordagens:- Reduza o número máximo de trabalhadores ou o tamanho dos trabalhadores para a tarefa.
- Compre mais horários reservados.
- Considere usar
STORAGE_WRITE_API
.
Os pipelines pequenos a médios (<1000 vCPUs) podem beneficiar da utilização de
STORAGE_WRITE_API
. Para estas tarefas mais pequenas, pondere usar oSTORAGE_WRITE_API
se quiser uma fila de mensagens rejeitadas ou quando o conjunto de ranuras partilhadasFILE_LOADS
não for suficiente.Se puder tolerar dados duplicados, considere usar o
STORAGE_WRITE_API_AT_LEAST_ONCE
. Este modo pode resultar na gravação de registos duplicados no BigQuery, mas pode ser menos dispendioso do que a opçãoSTORAGE_WRITE_API
.Os diferentes modos de escrita podem ter um desempenho diferente com base nas características do seu pipeline. Faça experiências para encontrar o melhor modo de escrita para a sua carga de trabalho.
Resolva erros ao nível da linha
Esta secção descreve como processar erros que podem ocorrer ao nível da linha, por exemplo, devido a dados de entrada mal formados ou incompatibilidades de esquemas.
Para a API Storage Write, todas as linhas que não podem ser escritas são colocadas
numa tabela PCollection
separada. Para obter esta coleção, chame getFailedStorageApiInserts
no objeto WriteResult
. Para ver um exemplo desta abordagem, consulte o artigo
Fazer stream de dados para o BigQuery.
É uma boa prática
enviar os erros para uma fila ou uma tabela de mensagens não entregues para processamento posterior. Para mais
informações sobre este padrão, consulte o
BigQueryIO
padrão de mensagens não entregues.
Para o FILE_LOADS
, se ocorrer um erro ao carregar os dados, a tarefa de carregamento falha e o pipeline gera uma exceção de tempo de execução. Pode ver o erro nos registos do Dataflow ou consultar o histórico de tarefas do BigQuery.
O conector de E/S não devolve informações sobre linhas individuais com falhas.
Para mais informações sobre a resolução de problemas de erros, consulte o artigo Erros do conetor do BigQuery.
Exemplos
Os exemplos seguintes mostram como usar o Dataflow para escrever no
BigQuery. Estes exemplos usam o conetor BigQueryIO
.
Escrever numa tabela existente
O exemplo seguinte cria um pipeline em lote que escreve um PCollection<MyData>
no BigQuery, onde MyData
é um tipo de dados personalizado.
O método BigQueryIO.write()
devolve um tipo BigQueryIO.Write<T>
, que é usado para configurar a operação de escrita. Para mais informações, consulte o artigo
Escrever numa tabela
na documentação do Apache Beam. Este exemplo de código escreve numa tabela existente (CREATE_NEVER
) e anexa as novas linhas à tabela (WRITE_APPEND
).
Java
Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Escrever numa tabela nova ou existente
O exemplo seguinte cria uma nova tabela se a tabela de destino não existir, definindo a disposição de criação como CREATE_IF_NEEDED
. Quando usa esta opção, tem de fornecer um esquema da tabela. O conetor usa este esquema se criar uma nova tabela.
Java
Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Transmita dados para o BigQuery
O exemplo seguinte mostra como fazer streaming de dados usando a semântica exatamente uma vez, através da
definição do modo de escrita como STORAGE_WRITE_API
Nem todos os pipelines de streaming requerem semântica exatamente uma vez. Por exemplo, pode remover manualmente duplicados da tabela de destino. Se a possibilidade de registos duplicados for aceitável para o seu cenário, considere usar a semântica pelo menos uma vez definindo o método de escrita como STORAGE_API_AT_LEAST_ONCE
. Geralmente, este método é mais eficiente e resulta numa latência inferior para a maioria dos pipelines.
Java
Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
O que se segue?
- Saiba mais sobre a E/S gerida.
- Saiba mais sobre as práticas recomendadas do Pub/Sub para o BigQuery.