Mutations Batching

User friendly container for Google Cloud Bigtable MutationBatcher.

exception google.cloud.bigtable.batcher.MutationsBatchError(message, exc)

Bases: Exception

Error in the batch request

class google.cloud.bigtable.batcher.MutationsBatcher(table, flush_count=100, max_row_bytes=20971520, flush_interval=1, batch_completed_callback=None)

Bases: object

A MutationsBatcher is used in batch cases where the number of mutations is large or unknown. It will store DirectRow in memory until one of the size limits is reached, or an explicit call to flush() is performed. When a flush event occurs, the DirectRow in memory will be sent to Cloud Bigtable. Batching mutations is more efficient than sending individual request.

This class is not suited for usage in systems where each mutation must be guaranteed to be sent, since calling mutate may only result in an in-memory change. In a case of a system crash, any DirectRow remaining in memory will not necessarily be sent to the service, even after the completion of the mutate() method.

Note on thread safety: The same MutationBatcher cannot be shared by multiple end-user threads.

  • Parameters

    • table (class) – class:~google.cloud.bigtable.table.Table.

    • flush_count (int) – (Optional) Max number of rows to flush. If it reaches the max number of rows it calls finish_batch() to mutate the current row batch. Default is FLUSH_COUNT (1000 rows).

    • max_row_bytes (int) – (Optional) Max number of row mutations size to flush. If it reaches the max number of row mutations size it calls finish_batch() to mutate the current row batch. Default is MAX_ROW_BYTES (5 MB).

    • flush_interval (float) – (Optional) The interval (in seconds) between asynchronous flush. Default is 1 second.

    • batch_completed_callback (Callable[list:[~google.rpc.status_pb2.Status]] = None) – (Optional) A callable for handling responses after the current batch is sent. The callable function expect a list of grpc Status.

_enter_()

Starting the MutationsBatcher as a context manager

_exit_(exc_type, exc_value, exc_traceback)

Clean up resources. Flush and shutdown the ThreadPoolExecutor.

close()

Clean up resources. Flush and shutdown the ThreadPoolExecutor. Any errors will be raised.

  • Raises

    • batcherMutationsBatchError if there’s any error in the mutations.

flush()

Sends the current batch to Cloud Bigtable synchronously. For example:

from google.cloud.bigtable import Client

client = Client(admin=True)
instance = client.instance(INSTANCE_ID)
table = instance.table(TABLE_ID)
# Batcher for max row bytes, max_row_bytes=1024 is optional.
batcher = table.mutations_batcher(max_row_bytes=1024)

# Add a single row
row_key = b"row_key"
row = table.row(row_key)
row.set_cell(COLUMN_FAMILY_ID, COL_NAME1, "value-0")

# In batcher, mutate will flush current batch if it
# reaches the max_row_bytes
batcher.mutate(row)
batcher.flush()
  • Raises

    • batcherMutationsBatchError if there’s any error in the mutations.

mutate(row)

Add a row to the batch. If the current batch meets one of the size limits, the batch is sent asynchronously.

For example:

from google.cloud.bigtable import Client

client = Client(admin=True)
instance = client.instance(INSTANCE_ID)
table = instance.table(TABLE_ID)
# Batcher for max row bytes, max_row_bytes=1024 is optional.
batcher = table.mutations_batcher(max_row_bytes=1024)

# Add a single row
row_key = b"row_key_1"
row = table.row(row_key)
row.set_cell(
    COLUMN_FAMILY_ID, COL_NAME1, "value-0", timestamp=datetime.datetime.utcnow()
)

# In batcher, mutate will flush current batch if it
# reaches the max_row_bytes
batcher.mutate(row)
batcher.flush()
  • Parameters

    row (class) – DirectRow.

  • Raises

    One of the following: * _BigtableRetryableError if any row returned a transient error. * RuntimeError if the number of responses doesn’t match the number of rows that were retried

mutate_rows(rows)

Add multiple rows to the batch. If the current batch meets one of the size limits, the batch is sent asynchronously.

For example:

from google.cloud.bigtable import Client

client = Client(admin=True)
instance = client.instance(INSTANCE_ID)
table = instance.table(TABLE_ID)
batcher = table.mutations_batcher()

row1 = table.row(b"row_key_1")
row2 = table.row(b"row_key_2")
row3 = table.row(b"row_key_3")
row4 = table.row(b"row_key_4")

row1.set_cell(COLUMN_FAMILY_ID, COL_NAME1, b"cell-val1")
row2.set_cell(COLUMN_FAMILY_ID, COL_NAME1, b"cell-val2")
row3.set_cell(COLUMN_FAMILY_ID, COL_NAME1, b"cell-val3")
row4.set_cell(COLUMN_FAMILY_ID, COL_NAME1, b"cell-val4")

batcher.mutate_rows([row1, row2, row3, row4])

# batcher will flush current batch if it
# reaches the max flush_count
# Manually send the current batch to Cloud Bigtable
batcher.flush()
  • Parameters

    rows (list:[~google.cloud.bigtable.row.DirectRow]) – list:[~google.cloud.bigtable.row.DirectRow].

  • Raises

    One of the following: * _BigtableRetryableError if any row returned a transient error. * RuntimeError if the number of responses doesn’t match the number of rows that were retried