このチュートリアルでは、テーブル スキーマを自動的に生成し、入力要素を変換することにより、Dataflow を使用して Avro SpecificRecord
オブジェクトを BigQuery に格納する方法について説明します。また、Avro で生成されたクラスを使用して、Dataflow パイプラインのワーカー間で中間データを具体化または転送する方法についても説明します。
Apache Avro は、データの構造化にスキーマに依存するシリアル化システムです。Avro データの読み取りまたは書き込み時にスキーマが常に存在するため、シリアル化は高速かつ小規模で実行されます。パフォーマンス上のメリットから、この方法はシステム間でメッセージを転送する際によく利用されています。たとえば、メッセージ ブローカー経由でイベントを分析システムに送信するアプリで使用されています。Avro スキーマは、BigQuery データ ウェアハウス スキーマの管理に使用できます。Avro スキーマを BigQuery テーブル構造に変換するには、このチュートリアルで説明するようにカスタムコードを作成する必要があります。
このチュートリアルは、Avro スキーマを使用して BigQuery データ ウェアハウス スキーマを管理することに関心があるデベロッパーとアーキテクトを対象としています。また、Avro と Java に精通していることを前提としています。
次の図に、このチュートリアルで説明するアーキテクチャの概要を示します。
このアーキテクチャ パターンを説明するため、このチュートリアルでは次の処理を行うシンプルな注文処理システムを使用します。
- ユーザーが購入を行うと、オンライン アプリがイベントを生成します。
- 注文オブジェクトには、一意の識別子、購入された商品のリスト、タイムスタンプが含まれます。
- Dataflow パイプラインが Pub/Sub トピックから
OrderDetails SpecificRecord
Avro メッセージを読み取ります。 - Dataflow パイプラインが Avro ファイルとしてレコードを Cloud Storage に書き込みます。
OrderDetails
クラスが、対応する BigQuery スキーマを自動的に生成します。- 汎用変換関数により、
OrderDetails
オブジェクトが BigQuery に書き込まれます。
目標
- Dataflow を使用して Pub/Sub データ ストリームから JSON 文字列を取り込む。
- JSON オブジェクトを Avro で生成されたクラスのオブジェクトに変換する。
- Avro スキーマから BigQuery テーブル スキーマを生成する。
- Avro レコードを Cloud Storage のファイルに書き込む。
- Avro レコードを BigQuery に書き込む。
費用
このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
始める前に
- Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
-
BigQuery, Cloud Storage, and Dataflow API を有効にします。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
-
BigQuery, Cloud Storage, and Dataflow API を有効にします。
-
コンソールで Cloud Shell をアクティブにします。
コンソールの下部にある Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。
環境設定
Cloud Shell で、ソース リポジトリのクローンを作成します。
cd $HOME && git clone https://github.com/GoogleCloudPlatform/bigquery-ingest-avro-dataflow-sample.git
env.sh
ファイルを開きます。cd $HOME/bigquery-ingest-avro-dataflow-sample nano env.sh
env.sh
ファイルには、このチュートリアルで使用できるデフォルト値が設定されています。この値は環境に合わせて変更できます。# pubsub topic MY_TOPIC="avro-records" # Cloud Storage Bucket MY_BUCKET="${GOOGLE_CLOUD_PROJECT}_avro_beam" # Avro file Cloud Storage output path AVRO_OUT="${MY_BUCKET}/out/" # Region for Cloud Pub/Sub and Cloud Dataflow REGION="us-central1" # Region for BigQuery BQ_REGION="US" # BigQuery dataset name BQ_DATASET="sales" # BigQuery table name BQ_TABLE="orders"
次のように置き換えます。
avro-records
: Pub/Sub トピックの名前。$GOOGLE_CLOUD_PROJECT"_avro_beam
: Cloud プロジェクト ID によって生成された Cloud Storage バケットの名前。$MY_BUCKET/""out/"
: Avro 出力を含む Cloud Storage バケットのパス。us-central1
: Pub/Sub と Dataflow に使用するリージョン。リージョンの詳細については、地域とリージョンをご覧ください。US
: BigQuery のリージョン。ロケーションの詳細については、データセットのロケーションをご覧ください。sales
: BigQuery データセット名。orders
: BigQuery テーブル名。1
: Dataflow ワーカーの最大数。
環境変数を設定します。
. ./env.sh
リソースの作成
Cloud Shell で Pub/Sub トピックを作成します。
gcloud pubsub topics create "${MY_TOPIC}"
Cloud Storage バケットを作成します。
gsutil mb -l "${REGION}" -c regional "gs://${MY_BUCKET}"
Cloud Storage バケットは、アプリによって生成された未加工のイベントをバックアップします。また、Dataproc で実行される Spark と Hadoop ジョブでオフライン分析と検証を行う際に代替ソースとして使用されます。
BigQuery データセットを作成します。
bq --location="${BQ_REGION}" mk --dataset "${GOOGLE_CLOUD_PROJECT}:${BQ_DATASET}"
BigQuery のデータセットには、単一リージョンまたは複数のリージョンを含む地域のテーブルとビューが含まれます。詳細については、データセットの作成をご覧ください。
Beam Dataflow アプリの起動
Cloud Shell で、Dataflow ランナーにパイプラインをデプロイして実行します。
cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro mvn clean package java -cp target/BeamAvro-bundled-1.0-SNAPSHOT.jar \ com.google.cloud.solutions.beamavro.AvroToBigQuery \ --runner=DataflowRunner \ --project="${GOOGLE_CLOUD_PROJECT}" \ --stagingLocation="gs://${MY_BUCKET}/stage/" \ --tempLocation="gs://${MY_BUCKET}/temp/" \ --inputPath="projects/${GOOGLE_CLOUD_PROJECT}/topics/${MY_TOPIC}" \ --workerMachineType=n1-standard-1 \ --region="${REGION}" \ --dataset="${BQ_DATASET}" \ --bigQueryTable="${BQ_TABLE}" \ --outputPath="gs://${MY_BUCKET}/out/" \ --jsonFormat=false \ --avroSchema="$(<../orderdetails.avsc)"
出力にアプリ ID が含まれます。チュートリアルの後半で必要になるため、アプリ ID をメモしておきます。
コンソールで Dataflow に移動します。
パイプラインのステータスを表示するには、アプリ ID をクリックします。パイプラインのステータスがグラフで表示されます。
コードを確認する
AvroToBigQuery.java
ファイルの pipeline オプションと必須パラメータが、コマンドライン パラメータを介して渡されています。ストリーミング モード オプションも有効になっています。BigQuery テーブル スキーマが、BigQuery IO の Beam スキーマを使用して Avro スキーマから自動生成されます。
Avro 入力形式の場合、オブジェクトは Pub/Sub から読み取られます。入力形式が JSON の場合、イベントは読み取られた後 Avro オブジェクトに変換されます。
パイプラインは分岐して、2 つの独立した書き込みを実行します。
Write AVRO file
変換は、AvroIO を使用してデータを 10 秒のチャンクに分割し、AVRO レコードを Cloud Storage に書き込みます。Write to BigQuery
変換は、Beam スキーマを使用して BigQuery テーブルにレコードを書き込みます。
BigQueryIO
が Beam スキーマを使用して Avro オブジェクトを内部で TableRow
オブジェクトに変換し、BigQuery に書き込みます。BigQuery のデータ型と Avro のデータ型の間のマッピングをご覧ください。
BigQuery で結果を見る
パイプラインをテストするため、gen.py
スクリプトを起動します。このスクリプトは、注文イベントの生成をシミュレートし、イベントを Pub/Sub トピックに push します。
Cloud Shell で、サンプル イベント生成スクリプトのディレクトリに移動し、スクリプトを実行します。
cd $HOME/bigquery-ingest-avro-dataflow-sample/generator python3 -m venv env . ./env/bin/activate pip install -r requirements.txt python3 gen.py -p $GOOGLE_CLOUD_PROJECT -t $MY_TOPIC -n 100 -f avro
コンソールで BigQuery に移動します。
テーブル スキーマを確認するには、
sales
データセットをクリックし、orders
テーブルを選択します。env.sh
のデフォルトの環境変数を変更した場合は、データセット名とテーブル名が異なる可能性があります。一部のサンプルデータを表示するには、クエリエディタでクエリを実行します。
SELECT * FROM sales.orders LIMIT 5
BigQuery テーブル スキーマが Avro レコードから自動生成され、データが自動的に BigQuery テーブル構造に変換されます。
クリーンアップ
プロジェクトの削除
- コンソールで [リソースの管理] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
個々のリソースの削除
次の手順に沿って Dataflow ジョブを停止します。
Cloud Storage バケットを削除します。
gsutil rm -r gs://$MY_BUCKET
次のステップ
- 一般的な Dataflow のユースケース パターンの詳細を確認する。
- Dataflow と Google Cloud の外部にホストされている Apache Kafka クラスタを使用する方法を学習する。
- Google Cloud に関するリファレンス アーキテクチャ、図、チュートリアル、ベスト プラクティスを確認する。Cloud Architecture Center を確認します。