Questo documento descrive come scrivere dati da Dataflow in BigQuery utilizzando il connettore BigQuery I/O di Apache Beam.
Il connettore BigQuery I/O è disponibile nell'SDK Apache Beam. Ti consigliamo di utilizzare la versione più recente dell'SDK. Per ulteriori informazioni, consulta SDK Apache Beam 2.x.
È disponibile anche il supporto multilingue per Python.
Panoramica
Il connettore BigQuery I/O supporta i seguenti metodi per scrivere in BigQuery:
STORAGE_WRITE_API
. In questa modalità, il connettore esegue scritture dirette nello spazio di archiviazione BigQuery utilizzando l'API BigQuery Storage Write. L'API Storage Write combina l'importazione di flussi di dati e il caricamento in batch in un'unica API ad alte prestazioni. Questa modalità garantisce la semantica esattamente una volta.STORAGE_API_AT_LEAST_ONCE
. Questa modalità utilizza anche l'API Storage Write, ma fornisce la semantica almeno una volta. Questa modalità comporta una latenza inferiore per la maggior parte delle pipeline. Tuttavia, sono possibili scritture duplicate.FILE_LOADS
. In questa modalità, il connettore scrive i dati di input nei file di staging in Cloud Storage. Quindi esegue un job di caricamento BigQuery per caricare i dati in BigQuery. Questa modalità è predefinita per iPCollections
delimitati, che si trovano più comunemente nelle pipeline batch.STREAMING_INSERTS
. In questa modalità, il connettore utilizza l'API di inserimento di flussi precedente. Questa modalità è predefinita perPCollections
illimitato, ma non è consigliata per i nuovi progetti.
Quando scegli un metodo di scrittura, tieni presenti i seguenti punti:
- Per i job di streaming, ti consigliamo di utilizzare
STORAGE_WRITE_API
oSTORAGE_API_AT_LEAST_ONCE
, perché queste modalità scrivono direttamente nello spazio di archiviazione BigQuery, senza utilizzare file di staging intermedi. - Se esegui la pipeline utilizzando la modalità flusso di dati Almeno una volta, imposta la modalità di scrittura su
STORAGE_API_AT_LEAST_ONCE
. Questa impostazione è più efficiente e corrisponde alla semantica della modalità di flusso di dati Almeno una volta. - I caricamenti dei file e l'API Storage Write hanno quote e limiti diversi.
- I job di caricamento utilizzano il pool di slot BigQuery condiviso o gli slot riservati. Per utilizzare gli slot riservati, esegui il job di caricamento in un progetto con un'assegnazione di prenotazione di tipo
PIPELINE
. I job di caricamento sono gratuiti se utilizzi il pool di slot BigQuery condiviso. Tuttavia, BigQuery non offre alcuna garanzia sulla capacità disponibile del pool condiviso. Per ulteriori informazioni, consulta la pagina Introduzione alle prenotazioni.
Parallelismo
Per
FILE_LOADS
eSTORAGE_WRITE_API
nelle pipeline di streaming, il connettore suddivide i dati in una serie di file o stream. In generale, consigliamo di chiamarewithAutoSharding
per attivare lo sharding automatico.Per
FILE_LOADS
nelle pipeline batch, il connettore scrive i dati in file suddivisi, che vengono poi caricati in BigQuery in parallelo.Per
STORAGE_WRITE_API
nelle pipeline batch, ogni worker crea uno o più stream da scrivere in BigQuery, in base al numero totale di shard.Per
STORAGE_API_AT_LEAST_ONCE
esiste un singolo flusso di scrittura predefinito. Più worker vengono aggiunti a questo stream.
Prestazioni
La tabella seguente mostra le metriche relative alle prestazioni per varie opzioni di lettura I/O di BigQuery. I carichi di lavoro sono stati eseguiti su un workere2-standard2
utilizzando l'SDK Apache Beam 2.49.0 per Java. Non
hanno utilizzato Runner v2.
100 milioni di record | 1 kB | 1 colonna | Velocità effettiva (byte) | Velocità effettiva (elementi) |
---|---|---|
Scrittura dati | 55 MB/s | 54.000 elementi al secondo |
Avro Load | 78 MBps | 77.000 elementi al secondo |
Caricamento JSON | 54 MB/s | 53.000 elementi al secondo |
Queste metriche si basano su semplici pipeline batch. Sono progettati per confrontare il rendimento tra i connettori I/O e non sono necessariamente rappresentativi delle pipeline reali. Le prestazioni della pipeline Dataflow sono complesse e dipendono dal tipo di VM, dai dati in fase di elaborazione, dalle prestazioni di origini e destinazioni esterne e dal codice utente. Le metriche si basano sull'esecuzione dell'SDK Java e non sono rappresentative delle caratteristiche di prestazioni di altri SDK per lingua. Per ulteriori informazioni, consulta Rendimento IO di Beam.
Best practice
Questa sezione descrive le best practice per la scrittura in BigQuery da Dataflow.
Considerazioni generali
L'API Storage Write ha limiti di quota. Il connettore gestisce questi limiti per la maggior parte delle pipeline. Tuttavia, in alcuni scenari è possibile esaurire gli stream dell'API Storage Write disponibili. Ad esempio, questo problema potrebbe verificarsi in una pipeline che utilizza lo sharding automatico e la scalabilità automatica con un numero elevato di destinazioni, in particolare in job a lungo termine con carichi di lavoro altamente variabili. Se si verifica questo problema, ti consigliamo di utilizzare
STORAGE_WRITE_API_AT_LEAST_ONCE
, che evita il problema.Utilizza le metriche di Google Cloud per monitorare l'utilizzo della quota dell'API Storage Write.
Quando utilizzi i caricamenti dei file, Avro in genere supera JSON. Per utilizzare Avro, chiama
withAvroFormatFunction
.Per impostazione predefinita, i job di caricamento vengono eseguiti nello stesso progetto del job Dataflow. Per specificare un altro progetto, chiama
withLoadJobProjectId
.Quando utilizzi l'SDK Java, ti consigliamo di creare una classe che rappresenti lo schema della tabella BigQuery. Quindi, chiama
useBeamSchema
nella pipeline per eseguire automaticamente la conversione tra i tipiRow
di Apache Beam eTableRow
di BigQuery. Per un esempio di classe dello schema, consultaExampleModel.java
.Se carichi tabelle con schemi complessi contenenti migliaia di campi, valuta la possibilità di chiamare
withMaxBytesPerPartition
per impostare una dimensione massima più piccola per ogni job di caricamento.
Pipeline in modalità flusso
I consigli riportati di seguito si applicano alle pipeline di streaming.
Per le pipeline di streaming, consigliamo di utilizzare l'API Storage Write (
STORAGE_WRITE_API
oSTORAGE_API_AT_LEAST_ONCE
).Una pipeline di streaming può utilizzare i caricamenti di file, ma questo approccio presenta degli svantaggi:
- Per scrivere i file è necessario il windowing. Non puoi utilizzare la finestra globale.
- BigQuery carica i file secondo il criterio del massimo impegno quando utilizzi il pool di slot condiviso. Può esserci un ritardo significativo tra la scrittura di un record e la sua disponibilità in BigQuery.
- Se un job di caricamento non va a buon fine, ad esempio a causa di dati errati o di una mancata corrispondenza dello schema, l'intera pipeline non va a buon fine.
Valuta la possibilità di utilizzare
STORAGE_WRITE_API_AT_LEAST_ONCE
, se possibile. Ciò può comportare la scrittura di record duplicati in BigQuery, ma è meno costoso e più scalabile diSTORAGE_WRITE_API
.In generale, evita di utilizzare
STREAMING_INSERTS
. Gli inserimenti in streaming sono più costosi dell'API Storage Write e hanno un rendimento inferiore.Lo sharding dei dati può migliorare le prestazioni nelle pipeline in streaming. Per la maggior parte delle pipeline, lo sharding automatico è un buon punto di partenza. Tuttavia, puoi ottimizzare la suddivisione in modo seguente:
- Per
STORAGE_WRITE_API
, chiamawithNumStorageWriteApiStreams
per impostare il numero di stream di scrittura. - Per
FILE_LOADS
, chiamawithNumFileShards
per impostare il numero di frammenti di file.
- Per
Se utilizzi gli inserimenti in streaming, ti consigliamo di impostare
retryTransientErrors
come criterio di ripetizione.
Pipeline batch
I consigli riportati di seguito si applicano alle pipeline batch.
Per la maggior parte delle pipeline batch di grandi dimensioni, ti consigliamo di provare prima
FILE_LOADS
. Una pipeline batch può utilizzareSTORAGE_WRITE_API
, ma è probabile che superi i limiti di quota su larga scala (più di 1000 vCPU) o se sono in esecuzione pipeline concorrenti. Apache Beam non limita il numero massimo di stream di scrittura per i jobSTORAGE_WRITE_API
batch, pertanto il job raggiunge in ultima analisi i limiti dell'API BigQuery Storage.Quando utilizzi
FILE_LOADS
, potresti esaurire il pool di slot BigQuery condiviso o il pool di slot riservati. Se si verifica questo tipo di errore, prova i seguenti approcci:- Riduci il numero massimo di worker o le dimensioni dei worker per il job.
- Acquista altri slot riservati.
- Valuta la possibilità di utilizzare
STORAGE_WRITE_API
.
Le pipeline di piccole e medie dimensioni (<1000 vCPU) potrebbero trarre vantaggio dall'utilizzo di
STORAGE_WRITE_API
. Per questi job più piccoli, ti consigliamo di utilizzareSTORAGE_WRITE_API
se vuoi una coda delle email non recapitate o quando il pool di slot condivisiFILE_LOADS
non è sufficiente.Se puoi tollerare i dati duplicati, valuta la possibilità di utilizzare
STORAGE_WRITE_API_AT_LEAST_ONCE
. Questa modalità può comportare la scrittura di record duplicati in BigQuery, ma potrebbe essere meno costosa dell'opzioneSTORAGE_WRITE_API
.Le diverse modalità di scrittura potrebbero avere prestazioni diverse a seconda delle caratteristiche della pipeline. Esegui esperimenti per trovare la modalità di scrittura migliore per il tuo carico di lavoro.
Gestire gli errori a livello di riga
Questa sezione descrive come gestire gli errori che potrebbero verificarsi a livello di riga, ad esempio a causa di dati di input con formattazione errata o mancata corrispondenza dello schema.
Per l'API Storage Write, le righe che non possono essere scritte vengono inserite in un PCollection
separato. Per ottenere questa raccolta, chiama
getFailedStorageApiInserts
sull'oggetto WriteResult
. Per un esempio di questo approccio, consulta
Trasmettere flussi di dati a BigQuery.
È buona prassi inviare gli errori a una coda o una tabella di posta in arrivo per l'elaborazione successiva. Per ulteriori informazioni su questo pattern, consulta il pattern di email inutilizzate BigQueryIO
.
Per FILE_LOADS
, se si verifica un errore durante il caricamento dei dati, il job di caricamento non va a buon fine
e la pipeline genera un'eccezione di runtime. Puoi visualizzare l'errore nei log di Dataflow o nella cronologia dei job di BigQuery.
Il connettore I/O non restituisce informazioni sulle singole righe con errori.
Per ulteriori informazioni sulla risoluzione degli errori, consulta Errori del connettore BigQuery.
Esempi
Gli esempi riportati di seguito mostrano come utilizzare Dataflow per scrivere in BigQuery.
Scrivere in una tabella esistente
L'esempio seguente crea una pipeline batch che scrive un valore PCollection<MyData>
in BigQuery, dove MyData
è un tipo di dato personalizzato.
Il metodo BigQueryIO.write()
restituisce un tipo BigQueryIO.Write<T>
, che viene utilizzato per configurare l'operazione di scrittura. Per saperne di più, consulta la sezione Scrittura in una tabella della documentazione di Apache Beam. Questo esempio di codice scrive in una tabella esistente (CREATE_NEVER
) e aggiunge le nuove righe alla tabella (WRITE_APPEND
).
Java
Per autenticarti a Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.
Scrivere in una tabella nuova o esistente
L'esempio seguente crea una nuova tabella se la tabella di destinazione non esiste, impostando la disposizione di creazione su CREATE_IF_NEEDED
. Quando utilizzi questa opzione, devi fornire uno schema tabella. Il connettore utilizza questo schema se crea una nuova tabella.
Java
Per autenticarti a Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.
Trasmetti flussi di dati a BigQuery
L'esempio seguente mostra come trasmettere i dati utilizzando la semantica esattamente una volta impostando la modalità di scrittura su STORAGE_WRITE_API
Non tutte le pipeline di streaming richiedono la semantica esattamente una volta. Ad esempio, potresti essere in grado di rimuovere manualmente i duplicati dalla tabella di destinazione. Se la possibilità di record duplicati è accettabile per il tuo scenario, ti consigliamo di utilizzare la semantica almeno una volta impostando il metodo di scrittura su STORAGE_API_AT_LEAST_ONCE
. Questo metodo è generalmente più efficiente e comporta una latenza inferiore per la maggior parte delle pipeline.
Java
Per autenticarti a Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.
Passaggi successivi
- Scopri di più sul connettore I/O BigQuery nella documentazione di Apache Beam.
- Scopri di più su come inserire flussi di dati in BigQuery utilizzando l'API Storage Write (post del blog).