BigQuery API - Class Google::Cloud::Bigquery::Table::AsyncInserter (v1.44.0)

Reference documentation and code samples for the BigQuery API class Google::Cloud::Bigquery::Table::AsyncInserter.

AsyncInserter

Used to insert multiple rows in batches to a topic. See #insert_async.

Inherits

  • Object

Includes

  • MonitorMixin

Example

require "google/cloud/bigquery"

bigquery = Google::Cloud::Bigquery.new
dataset = bigquery.dataset "my_dataset"
table = dataset.table "my_table"
inserter = table.insert_async do |result|
  if result.error?
    log_error result.error
  else
    log_insert "inserted #{result.insert_count} rows " \
      "with #{result.error_count} errors"
  end
end

rows = [
  { "first_name" => "Alice", "age" => 21 },
  { "first_name" => "Bob", "age" => 22 }
]
inserter.insert rows

inserter.stop.wait!

Methods

#flush

def flush() -> AsyncInserter

Forces all rows in the current batch to be inserted immediately.

Returns

#insert

def insert(rows, insert_ids: nil)

Adds rows to the async inserter to be inserted. Rows will be collected in batches and inserted together. See #insert_async.

Simple Ruby types are generally accepted per JSON rules, along with the following support for BigQuery's more complex types:

| BigQuery | Ruby | Notes | |--------------|--------------------------------------|----------------------------------------------------| | NUMERIC | BigDecimal | BigDecimal values will be rounded to scale 9. | | BIGNUMERIC | String | Pass as String to avoid rounding to scale 9. | | DATETIME | DateTime | DATETIME does not support time zone. | | DATE | Date | | | GEOGRAPHY | String | | | TIMESTAMP | Time | | | TIME | Google::Cloud::BigQuery::Time | | | BYTES | File, IO, StringIO, or similar | | | ARRAY | Array | Nested arrays, nil values are not supported. | | STRUCT | Hash | Hash keys may be strings or symbols. |

Because BigQuery's streaming API is designed for high insertion rates, modifications to the underlying table metadata are eventually consistent when interacting with the streaming system. In most cases metadata changes are propagated within minutes, but during this period API responses may reflect the inconsistent state of the table.

The value :skip can be provided to skip the generation of IDs for all rows, or to skip the generation of an ID for a specific row in the array.

Parameters
  • rows (Hash, Array<Hash>) — A hash object or array of hash objects containing the data. Required. BigDecimal values will be rounded to scale 9 to conform with the BigQuery NUMERIC data type. To avoid rounding BIGNUMERIC type values with scale greater than 9, use String instead of BigDecimal.
  • insert_ids (Array<String|Symbol>, Symbol) (defaults to: nil) — A unique ID for each row. BigQuery uses this property to detect duplicate insertion requests on a best-effort basis. For more information, see data consistency. Optional. If not provided, the client library will assign a UUID to each row before the request is sent.

#interval

def interval() -> Numeric

The number of seconds to collect rows before the batch is inserted. Default is 10.

Returns
  • (Numeric) — the current value of interval

#max_bytes

def max_bytes() -> Integer

The maximum size of rows to be collected before the batch is inserted. Default is 10,000,000 (10MB).

Returns
  • (Integer) — the current value of max_bytes

#max_rows

def max_rows() -> Integer

The maximum number of rows to be collected before the batch is inserted. Default is 500.

Returns
  • (Integer) — the current value of max_rows

#started?

def started?() -> boolean

Whether the inserter has been started.

Returns
  • (boolean) — true when started, false otherwise.

#stop

def stop() -> AsyncInserter

Begins the process of stopping the inserter. Rows already in the queue will be inserted, but no new rows can be added. Use #wait! to block until the inserter is fully stopped and all pending rows have been inserted.

Returns

#stopped?

def stopped?() -> boolean

Whether the inserter has been stopped.

Returns
  • (boolean) — true when stopped, false otherwise.

#threads

def threads() -> Integer

The number of threads used to insert rows. Default is 4.

Returns
  • (Integer) — the current value of threads

#wait!

def wait!(timeout = nil) -> AsyncInserter

Blocks until the inserter is fully stopped, all pending rows have been inserted, and all callbacks have completed. Does not stop the inserter. To stop the inserter, first call #stop and then call #wait! to block until the inserter is stopped.

Returns