Developers & Practitioners

Inventory management with BigQuery and Cloud Run

Containers and Kubernetics static hero

Many people think of Cloud Run just as a way of hosting websites. Cloud Run is great at that, but there's so much more you can do with it. Here we'll explore how you can use Cloud Run and BigQuery together to create an inventory management system. I'm using a subset of the Iowa Liquor Control Board data set to create a smaller inventory file for my fictional store. 

Bulk import of new inventory 

In my inventory management scenario we get a csv file dropped into Cloud Storage to bulk load new inventory. You can imagine that we're migrating to this new system from an old inventory management system that can export a csv file. 

BigQuery can import CSV files from Cloud Storage directly, and if you don't need to do any data transformations I recommend you use the built in CSV loading functionality. For my use case I only need a couple of the columns from the CSV and I want to do some additional data transformations so I'll be writing some code. 

To make deploying the code easy I'm using the Python Functions Framework as a shell around my script. To access Cloud Storage and BiqQuery I used the Python Client Libraries for each of those products and I'm following the samples in the sample explorer. I only ran into two issues when writing this up. 

First, I don't want to use the local file system if possible to ensure my code is really serverless, so I have to work with the contents of the CSV file entirely in memory. And second, the sample for downloading a file from Cloud Storage leaves it as a blob instead of something CSV can parse. A few simple commands to translate the blob to bytes and the bytes to a StringIO object solves that problem.

  # Create necessary GCP Clients
    storage_client = storage.Client()
    bq_client = bigquery.Client()
    
    # Retrieve starting inventory file from storage and parse
    bucket_name = "INSERT BUCKET NAME HERE"
    bucket = storage_client.bucket(bucket_name)

    file_name = "INSERT FILE NAME HERE" 
    blob = bucket.blob(file_name)

    bytedata = blob.download_as_bytes()
    data = bytedata.decode("UTF-8")

    csv_file_ish = StringIO(data)

    inventory_reader = csv.DictReader(csv_file_ish)

Second, the BigQuery client requires that any rows I'm inserting into a table are JSON objects. Personally, I would have preferred lists because I think of a CSV file as a 2D array, but streaming rows in as JSON objects does mean I don't have to get the fields in exactly the right order. It was easy to address this once I understood why my code was returning 200 copies of the error Invalid JSON payload received. Unknown name "json" at 'rows[1]': Proto field is not repeating, cannot start list.

The data transformations I did are relatively minor, consisting mostly of casting strings to integer types. The price field was the one exception. The CSV file has prices as floats, that is with the decimal place. Long ago it was drilled into me by my coding elders that prices should always be stored as an integer number of cents, so $3.00 would be stored as 300. I also want to add 10% markup to all the prices which I do at the same time as I change from a float to an integer.

  # Create data to import 
    rows_to_insert = []
    for row in inventory_reader:
        new_row = {}
        new_row["item_number"] = int(row["item_number"])
        new_row["price"] = int(float(row["state_bottle_retail"])) * 110
        new_row["count"] = int(row["bottles_sold"])
        new_row["category"] = row["category_name"]
        new_row["description"] = row["item_description"]

        rows_to_insert.append(new_row)

    table_id = "crbq-import-spike.crbq_import_spike.inventory"
 # Insert data
    errors = bq_client.insert_rows_json(table_id, rows_to_insert) # Make an API request.

Incremental update of inventory 

The second part of my simple inventory management system is a way to update the quantities in BigQuery when an item is purchased or returned. I'm again using the Python Functions Framework for my code. If you are used to working with ORMs you'll be surprised to learn that the BigQuery client for Python doesn't include an update method. Instead the recommended way to update a row is by directly executing an update query on your table.

I extract an item_id and a quantity from the http request that my code receives. I cast both of these to an int to ensure that the values I received are well formatted and as a way to sanitize my inputs. I can then use string interpolation to inject the values into a query and execute the query against BigQuery.

  # Create necessary GCP Clients
  bq_client = bigquery.Client()

  # Extract quantity and item ID from request
  request_json = request.get_json()
  
  item_id = int(request_json['item_id'])
  quantity = int(request_json['quantity'])

  update_query = (
    f"""
    UPDATE crbq-import-spike.crbq_import_spike.inventory
    SET count = count - {quantity} 
    WHERE item_number = {item_id}"""
    )

  query_job = bq_client.query(update_query)
  results = query_job.result()

  for row in results:
    print("{} : {} views".format(row.url, row.view_count))

  return "Success"

Since Cloud Run can scale horizontally automatically I can process many, many inventory updates simultaneously. And BigQuery handles any race conditions that may occur from two requests to modify the same row. 

If you'd like to try this yourself you can check out Cloud Run here.