アプリケーション開発

BigQuery と Cloud Run による在庫管理

Containers and Kubernetics static hero

※この投稿は米国時間 2021 年 3 月 4 日に、Google Cloud blog に投稿されたものの抄訳です。

Cloud Run はウェブサイトをホストする方法にすぎないと考えているユーザーは少なくありません。Cloud Run はその点に関して優れていますが、利用方法は他にも多くあります。今回は、Cloud Run と BigQuery を併用して在庫管理システムを作成する方法についてご紹介します。Iowa Liquor Control Board データセットのサブセットを使用し、架空の店舗用に小さい在庫ファイルを作成してみます。

新しい在庫の一括インポート

この在庫管理シナリオでは、新しい在庫を一括読み込みするために CSV ファイルを Cloud Storage にドロップします。従来の在庫管理システムから CSV ファイルをエクスポートし、新しいシステムに移行可能なことは想像できるでしょう。

BigQuery は Cloud Storage から CSV ファイルを直接インポートできます。データ変換が不要な場合は、組み込みの CSV 読み込み機能を使用することをおすすめします。このユースケースでは、CSV に数個の列があれば十分であり、追加のデータ変換をいくつか実行するためにコードを記述します。

コードのデプロイを容易にするために、スクリプトのシェルとして Python Functions Framework を使用します。Cloud Storage と BigQuery にアクセスするために、これらの各プロダクトに Python クライアント ライブラリを使用し、サンプル エクスプローラのサンプルを実行します。なお、このコードを書いている最中に直面した問題は 2 つだけでした。

第 1 に、コードが実際にサーバーレスであることを確認するために、できればローカル ファイル システムを使用したくないので、CSV ファイルの内容を完全にメモリ内で処理する必要があります。第 2 に、Cloud Storage からファイルをダウンロードするためのサンプルでは、ファイルが blob のままになっており、解析できる CSV ではありません。この問題を解決するには、簡単なコマンドをいくつか実行して、blob をバイトに変換し、バイトを StringIO オブジェクトに変換します。

  # Create necessary GCP Clients
    storage_client = storage.Client()
    bq_client = bigquery.Client()
    
    # Retrieve starting inventory file from storage and parse
    bucket_name = "INSERT BUCKET NAME HERE"
    bucket = storage_client.bucket(bucket_name)

    file_name = "INSERT FILE NAME HERE" 
    blob = bucket.blob(file_name)

    bytedata = blob.download_as_bytes()
    data = bytedata.decode("UTF-8")

    csv_file_ish = StringIO(data)

    inventory_reader = csv.DictReader(csv_file_ish)

次に、BigQuery クライアントでは、テーブルに挿入する行が JSON オブジェクトである必要があります。個人的には CSV ファイルを 2D 配列と認識しているため、リストの方が良かったのですが、行を JSON オブジェクトとしてストリーミングすると、フィールドをきっちり正確な順序で取得する必要がありません。コードから次のエラーが 200 回も返ってきましたが、理由さえわかれば簡単なことでした。

  Invalid JSON payload received. Unknown name "json" at 'rows[1]': Proto field is not repeating, cannot start list.

私が行ったデータ変換は比較的軽微で、文字列を整数型にキャストすることが大半を占めていましたが、価格フィールドだけは例外でした。この CSV ファイルには浮動小数点数、つまり小数の価格があります。昔、価格は常に整数のセント単位で格納しなければならないと先輩プログラマーから教え込まれたので、$3.00 は 300 として格納することになります。また、浮動小数点数から整数に変更すると同時に、すべての価格に 10% の利幅も追加してみます。

  # Create data to import 
    rows_to_insert = []
    for row in inventory_reader:
        new_row = {}
        new_row["item_number"] = int(row["item_number"])
        new_row["price"] = int(float(row["state_bottle_retail"])) * 110
        new_row["count"] = int(row["bottles_sold"])
        new_row["category"] = row["category_name"]
        new_row["description"] = row["item_description"]

        rows_to_insert.append(new_row)

    table_id = "crbq-import-spike.crbq_import_spike.inventory"
 # Insert data
    errors = bq_client.insert_rows_json(table_id, rows_to_insert) # Make an API request.

在庫の増分更新

この簡単な在庫管理システムでは、商品アイテムの購入時や返品時に BigQuery で数量を更新する方法についても取り上げます。ここでは、もう一度コードに Python Functions Framework を使用します。ORM の操作に慣れている方は、Python 用の BigQuery クライアントに更新メソッドが含まれていないことに驚かれるでしょう。代わりに、行を更新するためにテーブルに対して更新クエリを直接実行することをおすすめします。

コードが受信する http リクエストから item_id と quantity を抽出します。これらの両方を int にキャストし、受信した値が適切にフォーマットされるようにして、入力をサニタイズします。次に、文字列補間を使用して値をクエリに挿入し、BigQuery に対してクエリを実行できます。

  # Create necessary GCP Clients
  bq_client = bigquery.Client()

  # Extract quantity and item ID from request
  request_json = request.get_json()
  
  item_id = int(request_json['item_id'])
  quantity = int(request_json['quantity'])

  update_query = (
    f"""
    UPDATE crbq-import-spike.crbq_import_spike.inventory
    SET count = count - {quantity} 
    WHERE item_number = {item_id}"""
    )

  query_job = bq_client.query(update_query)
  results = query_job.result()

  for row in results:
    print("{} : {} views".format(row.url, row.view_count))

  return "Success"

Cloud Run は自動的な水平スケーリングが可能なため、多数の在庫更新を同時に処理できます。また、BigQuery は、2 つのリクエストが同じ行を変更することで発生する可能性のある競合状態に対処します。

ご自分でお試しになる場合は、こちらで Cloud Run をご確認ください。

-デベロッパー アドボケイト Aja Hammerly