Neste documento, descrevemos como ler dados do BigQuery para o Dataflow usando o conector de E/S do BigQuery do Apache Beam.
Visão geral
O conector de E/S do BigQuery é compatível com duas opções de leitura do BigQuery:
- Leituras diretas da tabela. Essa opção é a mais rápida, porque usa a API BigQuery Storage Read.
- Job de exportação. Com essa opção, o BigQuery executa um job de exportação que grava os dados da tabela no Cloud Storage. Em seguida, o conector lê os dados exportados do Cloud Storage. Essa opção é menos eficiente, porque requer a etapa de exportação.
Jobs de exportação são a opção padrão. Para especificar leituras diretas, chame
withMethod(Method.DIRECT_READ)
.
O conector serializa os dados da tabela em um PCollection
. Cada elemento na PCollection
representa uma única linha da tabela. O conector é compatível com os seguintes métodos de serialização:
- Ler os dados como registros formatados em Avro. Com esse método, você fornece uma função que analisa os registros Avro em um tipo de dados personalizado.
- Leia os dados como objetos
TableRow
. Esse método é conveniente porque não requer um tipo de dados personalizado. No entanto, geralmente ela tem desempenho menor do que a leitura de registros formatados em Avro.
Paralelismo
O paralelismo nesse conector depende do método de leitura:
Leituras diretas: o conector de E/S produz um número dinâmico de streams, com base no tamanho da solicitação de exportação. Ele lê esses streams diretamente do BigQuery em paralelo.
Jobs de exportação: o BigQuery determina quantos arquivos serão gravados no Cloud Storage. O número de arquivos depende da consulta e do volume de dados. O conector de E/S lê os arquivos exportados em paralelo.
Desempenho
A tabela a seguir mostra as métricas de desempenho para várias
opções de leitura de E/S do BigQuery. As cargas de trabalho foram executadas em um
worker e2-standard2
usando o SDK do Apache Beam 2.49.0 para Java. Eles não
usaram o Runner v2.
100 milhões de registros | 1 KB | 1 coluna | Capacidade de processamento (bytes) | Capacidade de processamento (elementos) |
---|---|---|
Leitura de armazenamento | 120 MBps | 88.000 elementos por segundo |
Exportação de Avro | 105 MBps | 78.000 elementos por segundo |
Exportação para o JSON | 110 MBps | 81.000 elementos por segundo |
Essas métricas são baseadas em pipelines de lote simples. Elas servem para comparar o desempenho entre conectores de E/S e não representam necessariamente pipelines reais. O desempenho do pipeline do Dataflow é complexo e depende do tipo de VM, dos dados processados, do desempenho de origens e coletores externos e do código do usuário. As métricas se baseiam na execução do SDK do Java e não representam as características de desempenho de outros SDKs da linguagem. Para mais informações, confira Desempenho do E/S do Beam.
Práticas recomendadas
Em geral, recomendamos o uso de leituras diretas de tabelas (
Method.DIRECT_READ
). A API Storage Read é mais adequada para pipelines de dados do que jobs de exportação porque não precisa da etapa intermediária de exportação de dados.Se você ativar leituras diretas, será cobrado pelo uso da API Storage Read. Consulte Preços de extração de dados na página de preços do BigQuery.
Não há custo extra para jobs de exportação. No entanto, os jobs de exportação têm limites. Para uma grande movimentação de dados, em que a pontualidade é uma prioridade e o custo é ajustável, recomendam-se leituras diretas.
A API Storage Read tem limites de cota. Use as métricas do Google Cloud para monitorar o uso da sua cota.
Ao usar a API Storage Read, você poderá ver erros de expiração da alocação e de tempo limite da sessão nos registros, como:
DEADLINE_EXCEEDED
Server Unresponsive
StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session
Esses erros podem ocorrer quando uma operação leva mais tempo do que o tempo limite, geralmente em pipelines executados por mais de seis horas. Para atenuar esse problema, mude para exportações de arquivos.
Ao usar o SDK do Java, crie uma classe que represente o esquema da tabela do BigQuery. Em seguida, chame
useBeamSchema
no pipeline para converter automaticamente entre os tiposRow
do Apache Beam eTableRow
do BigQuery. Para conferir um exemplo de classe de esquema, confiraExampleModel.java
.
Exemplos
Os exemplos de código nesta seção usam leituras diretas das tabelas.
Para usar um job de exportação, omita a chamada para withMethod
ou especifique Method.EXPORT
. Em seguida, defina a
opção de pipeline --tempLocation
para especificar um bucket do Cloud Storage para os arquivos exportados.
Esses exemplos de código presumem que a tabela de origem tenha as seguintes colunas:
name
(string)age
(inteiro)
Especificado como um arquivo de esquema JSON:
[
{"name":"user_name","type":"STRING","mode":"REQUIRED"},
{"name":"age","type":"INTEGER","mode":"REQUIRED"}
]
Ler registros formatados em Avro
Para ler dados do BigQuery em registros formatados em Avro, use o método read(SerializableFunction)
. Esse método usa uma função definida pelo aplicativo que analisa objetos SchemaAndRecord
e retorna um tipo de dados personalizado. A saída do conector é um PCollection
do seu tipo de dados personalizado.
O código a seguir lê um PCollection<MyData>
de uma tabela do BigQuery, em que MyData
é uma classe definida pelo aplicativo.
Java
Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
O método read
usa uma interface SerializableFunction<SchemaAndRecord, T>
, que define uma função para converter de registros Avro em uma classe de dados personalizada. No exemplo de código anterior, o método MyData.apply
implementa essa função de conversão. A função de exemplo analisa os campos name
e age
do registro Avro e retorna uma instância MyData
.
Para especificar qual tabela do BigQuery ler, chame o método from
, conforme mostrado no exemplo anterior. Para mais informações, consulte Nomes de tabelas na documentação do conector de E/S do BigQuery.
Objetos TableRow
lidos
O método readTableRows
lê os dados do BigQuery em um PCollection
de objetos TableRow
. Cada TableRow
é um mapa de pares de chave-valor que contém uma única linha de dados de tabela. Especifique a tabela do BigQuery a ser lida chamando o método from
.
O código a seguir lê um PCollection<TableRows>
de uma tabela do BigQuery.
Java
Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Este exemplo também mostra como acessar os valores do dicionário TableRow
.
Valores inteiros são codificados como strings para corresponder ao formato JSON exportado do BigQuery.
Projeção e filtragem de colunas
Ao usar leituras diretas (Method.DIRECT_READ
), é possível tornar as operações de leitura mais eficientes, reduzindo a quantidade de dados lidos do BigQuery e enviados pela rede.
- Projeção de colunas: chame
withSelectedFields
para ler um subconjunto de colunas da tabela. Isso permite leituras eficientes quando as tabelas contêm muitas colunas. - Filtragem de linha: chame
withRowRestriction
para especificar um predicado que filtre dados no lado do servidor.
Os predicados de filtro precisam ser determinísticos, e a agregação não é compatível.
O exemplo a seguir projeta as colunas "user_name"
e "age"
e filtra as linhas que não correspondem ao predicado "age > 18"
.
Java
Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Ler a partir do resultado de uma consulta
Os exemplos anteriores mostram como ler linhas de uma tabela. Para ler o resultado de uma consulta SQL, chame fromQuery
. Essa abordagem move parte do trabalho computacional para o BigQuery. Você também pode usar esse método para ler uma visualização do BigQuery ou uma visualização materializada, executando uma consulta na visualização.
No exemplo a seguir, uma consulta é executada em um conjunto de dados público do BigQuery e os resultados são lidos. Depois que o pipeline é executado, é possível ver o job de consulta no histórico de jobs do BigQuery.
Java
Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
A seguir
- Leia a documentação do conector de E/S do BigQuery.
- Confira a lista de modelos fornecidos pelo Google.