Neste documento, descrevemos como gravar dados do Dataflow para o BigQuery usando o conector de E/S do BigQuery do Apache Beam.
O conector de E/S do BigQuery está disponível no SDK do Apache Beam. Recomendamos usar a versão mais recente do SDK. Para mais informações, consulte SDKs do Apache Beam 2.x.
O suporte a várias linguagens para Python também está disponível.
Informações gerais
O conector de E/S do BigQuery aceita os seguintes métodos de gravação no BigQuery:
STORAGE_WRITE_API
. Neste modo, o conector realiza gravações diretas no armazenamento do BigQuery usando a API BigQuery Storage Write. A API Storage Write combina ingestão de streaming e carregamento em lote em uma única API de alto desempenho. Esse modo garante semântica exatamente uma vez.STORAGE_API_AT_LEAST_ONCE
. Esse modo também usa a API Storage Write, mas fornece semântica de pelo menos uma vez. Esse modo resulta em menor latência para a maioria dos pipelines. No entanto, gravações duplicadas são possíveis.FILE_LOADS
. Nesse modo, o conector grava os dados de entrada em arquivos de preparo no Cloud Storage. Em seguida, ele executa um job de carregamento do BigQuery para carregar os dados no BigQuery. O modo é o padrão paraPCollections
limitado, encontrado com frequência em pipelines de lote.STREAMING_INSERTS
. Neste modo, o conector usa a API de streaming legada. Esse modo é o padrão paraPCollections
ilimitado, mas não é recomendado para novos projetos.
Ao escolher um método de gravação, considere os seguintes pontos:
- Para jobs de streaming, considere usar
STORAGE_WRITE_API
ouSTORAGE_API_AT_LEAST_ONCE
, porque esses modos gravam diretamente no armazenamento do BigQuery, sem usar arquivos de preparo intermediários. - Se você executar o pipeline usando pelo menos um modo de streaming, defina o modo de gravação como
STORAGE_API_AT_LEAST_ONCE
. Essa configuração é mais eficiente e corresponde à semântica do modo de streaming pelo menos uma vez. - Os carregamentos de arquivos e a API Storage Write têm cotas e limites diferentes.
- Os jobs de carregamento usam o pool de slots compartilhado ou os slots reservados do BigQuery. Para usar slots reservados, execute o job de carregamento em um projeto com uma atribuição de reserva do tipo
PIPELINE
. Os jobs de carregamento serão gratuitos se você usar o pool de slots compartilhado do BigQuery. No entanto, o BigQuery não garante a capacidade disponível do pool compartilhado. Para mais informações, consulte Introdução às reservas.
Paralelismo
Para
FILE_LOADS
eSTORAGE_WRITE_API
em pipelines de streaming, o conector fragmenta os dados em vários arquivos ou streams. Em geral, recomendamos chamarwithAutoSharding
para ativar a fragmentação automática.Para
FILE_LOADS
em pipelines de lote, o conector grava dados em arquivos particionados, que são carregados no BigQuery em paralelo.Para
STORAGE_WRITE_API
em pipelines de lote, cada worker cria um ou mais streams para gravar no BigQuery, determinado pelo número total de fragmentos.Para
STORAGE_API_AT_LEAST_ONCE
, há um único fluxo de gravação padrão. Vários workers são anexados a esse stream.
Desempenho
A tabela a seguir mostra as métricas de desempenho para várias
opções de leitura de E/S do BigQuery. As cargas de trabalho foram executadas em um
worker e2-standard2
usando o SDK do Apache Beam 2.49.0 para Java. Eles não
usaram o Runner v2.
100 milhões de registros | 1 KB | 1 coluna | Capacidade de processamento (bytes) | Capacidade de processamento (elementos) |
---|---|---|
Gravação de armazenamento | 55 MBps | 54.000 elementos por segundo |
Carregamento do Avro | 78 MBps | 77.000 elementos por segundo |
Carregamento do JSON | 54 MBps | 53.000 elementos por segundo |
Essas métricas são baseadas em pipelines de lote simples. Elas servem para comparar o desempenho entre conectores de E/S e não representam necessariamente pipelines reais. O desempenho do pipeline do Dataflow é complexo e depende do tipo de VM, dos dados processados, do desempenho de origens e coletores externos e do código do usuário. As métricas se baseiam na execução do SDK do Java e não representam as características de desempenho de outros SDKs da linguagem. Para mais informações, confira Desempenho do E/S do Beam.
Práticas recomendadas
Esta seção descreve as práticas recomendadas para gravar no BigQuery a partir do Dataflow.
Considerações gerais
A API Storage Write tem limites de cota. O conector lida com esses limites na maioria dos pipelines. No entanto, alguns cenários podem esgotar os fluxos disponíveis da API Storage Write. Por exemplo, esse problema pode acontecer em um pipeline que usa fragmentação automática e escalonamento automático com um grande número de destinos, especialmente em jobs de longa duração com cargas de trabalho altamente variáveis. Se esse problema ocorrer, use
STORAGE_WRITE_API_AT_LEAST_ONCE
para evitá-lo.Use as métricas do Google Cloud para monitorar o uso da cota da API Storage Write.
Ao usar o carregamentos de arquivos, o Avro costuma ter um desempenho melhor do que o JSON. Para usar o Avro, chame
withAvroFormatFunction
.Por padrão, os jobs de carregamento são executados no mesmo projeto que o job do Dataflow. Para especificar um projeto diferente, chame
withLoadJobProjectId
.Ao usar o SDK do Java, crie uma classe que represente o esquema da tabela do BigQuery. Em seguida, chame
useBeamSchema
no pipeline para converter automaticamente entre os tiposRow
do Apache Beam eTableRow
do BigQuery. Para conferir um exemplo de classe de esquema, confiraExampleModel.java
.Se você carregar tabelas com esquemas complexos com milhares de campos, considere chamar
withMaxBytesPerPartition
para definir um tamanho máximo menor para cada job de carregamento.
Pipelines de streaming
As recomendações a seguir se aplicam a pipelines de streaming.
Para pipelines de streaming, recomendamos o uso da API Storage Write (
STORAGE_WRITE_API
ouSTORAGE_API_AT_LEAST_ONCE
).Um pipeline de streaming pode usar carregamentos de arquivos, mas essa abordagem tem desvantagens:
- É necessário janelamento para gravar os arquivos. Não é possível usar a janela global.
- O BigQuery carrega arquivos com base no melhor esforço ao usar o pool de slots compartilhado. Pode haver um atraso significativo entre o momento em que um registro é gravado e quando ele fica disponível no BigQuery.
- Se um job de carregamento falhar, por exemplo, devido a dados incorretos ou incompatibilidade de esquema, todo o pipeline falha.
Considere usar
STORAGE_WRITE_API_AT_LEAST_ONCE
sempre que possível. Isso pode resultar na gravação de registros duplicados no BigQuery, mas é mais barato e mais escalonável queSTORAGE_WRITE_API
.Em geral, evite usar
STREAMING_INSERTS
. As inserções por streaming são mais caras do que a API Storage Write e não têm um desempenho tão bom.A fragmentação de dados pode melhorar o desempenho em pipelines de streaming. Para a maioria dos pipelines, a fragmentação automática é um bom ponto de partida. No entanto, é possível ajustar a fragmentação da seguinte maneira:
- Para
STORAGE_WRITE_API
, chamewithNumStorageWriteApiStreams
para definir o número de streams de gravação. - Para
FILE_LOADS
, chamewithNumFileShards
para definir o número de fragmentos de arquivo.
- Para
Se você usa inserções de streaming, recomendamos definir
retryTransientErrors
como a política de nova tentativa.
Pipelines em lote
As recomendações a seguir se aplicam a pipelines em lote.
Para a maioria dos pipelines em lote grandes, recomendamos primeiro testar
FILE_LOADS
. Um pipeline em lote pode usarSTORAGE_WRITE_API
, mas é provável que exceda os limites de cota em grande escala (mais de 1.000 vCPUs) ou se pipelines simultâneos estiverem em execução. O Apache Beam não limita o número máximo de fluxos de gravação para jobsSTORAGE_WRITE_API
em lote, de modo que o job acaba atingindo os limites da API BigQuery Storage.Ao usar
FILE_LOADS
, é possível esgotar o pool de slots compartilhado do BigQuery ou o conjunto de slots reservados. Se você encontrar esse tipo de falha, tente as seguintes abordagens:- Reduza o número máximo ou o tamanho de workers para o job.
- Compre mais slots reservados.
- Considere usar
STORAGE_WRITE_API
.
Pipelines pequenos e médios (menos de 1.000 vCPUs) podem se beneficiar do uso de
STORAGE_WRITE_API
. Para esses jobs menores, considere usarSTORAGE_WRITE_API
se quiser uma fila de mensagens inativas ou quando o pool de slots compartilhadoFILE_LOADS
não for suficiente.Se for possível tolerar dados duplicados, use
STORAGE_WRITE_API_AT_LEAST_ONCE
. Esse modo pode resultar na gravação de registros duplicados no BigQuery, mas pode ser mais barato do que a opçãoSTORAGE_WRITE_API
.Diferentes modos de gravação podem ter um desempenho diferente com base nas características do pipeline. Experimente encontrar o melhor modo de gravação para sua carga de trabalho.
Solucionar erros no nível da linha
Nesta seção, descrevemos como lidar com erros que podem acontecer na linha, por exemplo, devido a dados de entrada malformados ou incompatibilidades de esquema.
Para a API Storage Write, todas as linhas que não podem ser gravadas são colocadas
em um PCollection
separado. Para acessar essa coleção, chame
getFailedStorageApiInserts
no objeto
WriteResult
. Para ver um exemplo dessa abordagem, consulte
Fazer streaming de dados para o BigQuery.
É uma prática recomendada enviar os erros para uma fila ou tabela de mensagens inativas para processamento posterior. Para mais
informações sobre esse padrão, consulte
BigQueryIO
padrão de mensagens inativas.
Para FILE_LOADS
, se ocorrer um erro ao carregar os dados, o job de carregamento falhará e o pipeline gerará uma exceção de tempo de execução. É possível ver o erro nos registros do Dataflow ou analisar o histórico de jobs do BigQuery.
O conector de E/S não retorna informações sobre linhas individuais com falha.
Para mais informações sobre como resolver erros, consulte Erros do conector do BigQuery.
Exemplos
Os exemplos a seguir mostram como usar o Dataflow para gravar no BigQuery.
Gravar em uma tabela existente
O exemplo a seguir cria um pipeline em lote que grava um PCollection<MyData>
no BigQuery, em que MyData
é um tipo de dados personalizado.
O método BigQueryIO.write()
retorna um tipo BigQueryIO.Write<T>
, que é usado para configurar a operação de gravação. Para mais informações, consulte Como gravar em uma tabela na documentação do Apache Beam. Este exemplo de código grava em uma tabela atual (CREATE_NEVER
) e anexa as novas linhas à tabela (WRITE_APPEND
).
Java
Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Gravar em uma tabela nova ou atual
O exemplo a seguir cria uma nova tabela caso a tabela de destino não exista, definindo a disposição de criação como CREATE_IF_NEEDED
. Ao usar essa opção, você precisa fornecer um esquema de tabela. O conector usará esse esquema se criar uma nova tabela.
Java
Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Transmitir dados para o BigQuery
O exemplo a seguir mostra como fazer streaming de dados usando semântica de exatamente uma vez, definindo o modo de gravação como STORAGE_WRITE_API
.
Nem todos os pipelines de streaming exigem semântica exatamente uma vez. Por exemplo, talvez seja possível remover cópias duplicadas manualmente da tabela de destino. Se a possibilidade de registros duplicados for aceitável no cenário, use a semântica pelo menos uma vez definindo o método de gravação como STORAGE_API_AT_LEAST_ONCE
. Esse método geralmente é mais eficiente e resulta em menor latência para a maioria dos pipelines.
Java
Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
A seguir
- Saiba mais sobre o conector de E/S do BigQuery na documentação do Apache Beam.
- Leia sobre Como fazer streaming de dados no BigQuery usando a API Storage Write (postagem do blog).