Reference documentation and code samples for the BigQuery API class Google::Cloud::Bigquery::Table::AsyncInserter.
AsyncInserter
Used to insert multiple rows in batches to a topic. See #insert_async.
Inherits
- Object
Includes
- MonitorMixin
Example
require "google/cloud/bigquery" bigquery = Google::Cloud::Bigquery.new dataset = bigquery.dataset "my_dataset" table = dataset.table "my_table" inserter = table.insert_async do |result| if result.error? log_error result.error else log_insert "inserted #{result.insert_count} rows " \ "with #{result.error_count} errors" end end rows = [ { "first_name" => "Alice", "age" => 21 }, { "first_name" => "Bob", "age" => 22 } ] inserter.insert rows inserter.stop.wait!
Methods
#flush
def flush() -> AsyncInserter
Forces all rows in the current batch to be inserted immediately.
- (AsyncInserter) — returns self so calls can be chained.
#insert
def insert(rows, insert_ids: nil)
Adds rows to the async inserter to be inserted. Rows will be collected in batches and inserted together. See #insert_async.
Simple Ruby types are generally accepted per JSON rules, along with the following support for BigQuery's more complex types:
| BigQuery | Ruby | Notes |
|--------------|--------------------------------------|----------------------------------------------------|
| NUMERIC
| BigDecimal
| BigDecimal
values will be rounded to scale 9. |
| BIGNUMERIC
| String
| Pass as String
to avoid rounding to scale 9. |
| DATETIME
| DateTime
| DATETIME
does not support time zone. |
| DATE
| Date
| |
| GEOGRAPHY
| String
| |
| JSON
| String
(Stringified JSON) | String, as JSON does not have a schema to verify. |
| TIMESTAMP
| Time
| |
| TIME
| Google::Cloud::BigQuery::Time
| |
| BYTES
| File
, IO
, StringIO
, or similar | |
| ARRAY
| Array
| Nested arrays, nil
values are not supported. |
| STRUCT
| Hash
| Hash keys may be strings or symbols. |
Because BigQuery's streaming API is designed for high insertion rates, modifications to the underlying table metadata are eventually consistent when interacting with the streaming system. In most cases metadata changes are propagated within minutes, but during this period API responses may reflect the inconsistent state of the table.
The value :skip
can be provided to skip the generation of IDs for all rows, or to skip the generation of
an ID for a specific row in the array.
-
rows (Hash, Array<Hash>) — A hash object or array of hash objects
containing the data. Required.
BigDecimal
values will be rounded to scale 9 to conform with the BigQueryNUMERIC
data type. To avoid roundingBIGNUMERIC
type values with scale greater than 9, useString
instead ofBigDecimal
. - insert_ids (Array<String|Symbol>, Symbol) (defaults to: nil) — A unique ID for each row. BigQuery uses this property to detect duplicate insertion requests on a best-effort basis. For more information, see data consistency. Optional. If not provided, the client library will assign a UUID to each row before the request is sent.
#interval
def interval() -> Numeric
The number of seconds to collect rows before the batch is inserted. Default is 10.
- (Numeric) — the current value of interval
#max_bytes
def max_bytes() -> Integer
The maximum size of rows to be collected before the batch is inserted. Default is 10,000,000 (10MB).
- (Integer) — the current value of max_bytes
#max_rows
def max_rows() -> Integer
The maximum number of rows to be collected before the batch is inserted. Default is 500.
- (Integer) — the current value of max_rows
#started?
def started?() -> boolean
Whether the inserter has been started.
-
(boolean) —
true
when started,false
otherwise.
#stop
def stop() -> AsyncInserter
Begins the process of stopping the inserter. Rows already in the queue will be inserted, but no new rows can be added. Use #wait! to block until the inserter is fully stopped and all pending rows have been inserted.
- (AsyncInserter) — returns self so calls can be chained.
#stopped?
def stopped?() -> boolean
Whether the inserter has been stopped.
-
(boolean) —
true
when stopped,false
otherwise.
#threads
def threads() -> Integer
The number of threads used to insert rows. Default is 4.
- (Integer) — the current value of threads
#wait!
def wait!(timeout = nil) -> AsyncInserter
Blocks until the inserter is fully stopped, all pending rows have been inserted, and all callbacks have completed. Does not stop the inserter. To stop the inserter, first call #stop and then call #wait! to block until the inserter is stopped.
- (AsyncInserter) — returns self so calls can be chained.