Dataflow を使用して Avro レコードを BigQuery にストリーミングする

このチュートリアルでは、テーブル スキーマを自動的に生成し、入力要素を変換することにより、Dataflow を使用して Avro SpecificRecord オブジェクトを BigQuery に格納する方法について説明します。また、Avro で生成されたクラスを使用して、Dataflow パイプラインのワーカー間で中間データを具体化または転送する方法についても説明します。

Apache Avro は、データの構造化にスキーマに依存するシリアル化システムです。Avro データの読み取りまたは書き込み時にスキーマが常に存在するため、シリアル化は高速かつ小規模で実行されます。パフォーマンス上のメリットから、この方法はシステム間でメッセージを転送する際によく利用されています。たとえば、メッセージ ブローカー経由でイベントを分析システムに送信するアプリで使用されています。Avro スキーマは、BigQuery データ ウェアハウス スキーマの管理に使用できます。Avro スキーマを BigQuery テーブル構造に変換するには、このチュートリアルで説明するようにカスタムコードを作成する必要があります。

このチュートリアルは、Avro スキーマを使用して BigQuery データ ウェアハウス スキーマを管理することに関心があるデベロッパーとアーキテクトを対象としています。また、Avro と Java に精通していることを前提としています。

次の図に、このチュートリアルで説明するアーキテクチャの概要を示します。

BigQuery データ ウェアハウス スキーマを管理する Avro スキーマのアーキテクチャ。

このアーキテクチャ パターンを説明するため、このチュートリアルでは次の処理を行うシンプルな注文処理システムを使用します。

  • ユーザーが購入を行うと、オンライン アプリがイベントを生成します。
  • 注文オブジェクトには、一意の識別子、購入された商品のリスト、タイムスタンプが含まれます。
  • Dataflow パイプラインが Pub/Sub トピックから OrderDetails SpecificRecord Avro メッセージを読み取ります。
  • Dataflow パイプラインが Avro ファイルとしてレコードを Cloud Storage に書き込みます。
  • OrderDetails クラスが、対応する BigQuery スキーマを自動的に生成します。
  • 汎用変換関数により、OrderDetails オブジェクトが BigQuery に書き込まれます。

目標

  • Dataflow を使用して Pub/Sub データ ストリームから JSON 文字列を取り込む。
  • JSON オブジェクトを Avro で生成されたクラスのオブジェクトに変換する。
  • Avro スキーマから BigQuery テーブル スキーマを生成する。
  • Avro レコードを Cloud Storage のファイルに書き込む。
  • Avro レコードを BigQuery に書き込む。

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  4. BigQuery, Cloud Storage, and Dataflow API を有効にします。

    API を有効にする

  5. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  6. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  7. BigQuery, Cloud Storage, and Dataflow API を有効にします。

    API を有効にする

  8. コンソールで Cloud Shell をアクティブにします。

    Cloud Shell をアクティブにする

    コンソールの下部にある Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。

環境設定

  1. Cloud Shell で、ソース リポジトリのクローンを作成します。

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/bigquery-ingest-avro-dataflow-sample.git
    
  2. env.sh ファイルを開きます。

    cd $HOME/bigquery-ingest-avro-dataflow-sample
    nano env.sh
    
  3. env.sh ファイルには、このチュートリアルで使用できるデフォルト値が設定されています。この値は環境に合わせて変更できます。

    # pubsub topic
    MY_TOPIC="avro-records"
    
    # Cloud Storage Bucket
    MY_BUCKET="${GOOGLE_CLOUD_PROJECT}_avro_beam"
    
    # Avro file Cloud Storage output path
    AVRO_OUT="${MY_BUCKET}/out/"
    
    # Region for Cloud Pub/Sub and Cloud Dataflow
    REGION="us-central1"
    
    # Region for BigQuery
    BQ_REGION="US"
    
    # BigQuery dataset name
    BQ_DATASET="sales"
    
    # BigQuery table name
    BQ_TABLE="orders"
    

    次のように置き換えます。

    • avro-records: Pub/Sub トピックの名前。
    • $GOOGLE_CLOUD_PROJECT"_avro_beam: Cloud プロジェクト ID によって生成された Cloud Storage バケットの名前。
    • $MY_BUCKET/""out/": Avro 出力を含む Cloud Storage バケットのパス。
    • us-central1: Pub/Sub と Dataflow に使用するリージョン。リージョンの詳細については、地域とリージョンをご覧ください。
    • US: BigQuery のリージョン。ロケーションの詳細については、データセットのロケーションをご覧ください。
    • sales: BigQuery データセット名。
    • orders: BigQuery テーブル名。
    • 1: Dataflow ワーカーの最大数。
  4. 環境変数を設定します。

     . ./env.sh
    

リソースの作成

  1. Cloud Shell で Pub/Sub トピックを作成します。

    gcloud pubsub topics create "${MY_TOPIC}"
    
  2. Cloud Storage バケットを作成します。

    gsutil mb -l "${REGION}" -c regional "gs://${MY_BUCKET}"
    

    Cloud Storage バケットは、アプリによって生成された未加工のイベントをバックアップします。また、Dataproc で実行される Spark と Hadoop ジョブでオフライン分析と検証を行う際に代替ソースとして使用されます。

  3. BigQuery データセットを作成します。

    bq --location="${BQ_REGION}" mk --dataset "${GOOGLE_CLOUD_PROJECT}:${BQ_DATASET}"
    

    BigQuery のデータセットには、単一リージョンまたは複数のリージョンを含む地域のテーブルとビューが含まれます。詳細については、データセットの作成をご覧ください。

Beam Dataflow アプリの起動

  1. Cloud Shell で、Dataflow ランナーにパイプラインをデプロイして実行します。

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    
    mvn clean package
    
    java -cp target/BeamAvro-bundled-1.0-SNAPSHOT.jar \
    com.google.cloud.solutions.beamavro.AvroToBigQuery \
    --runner=DataflowRunner \
    --project="${GOOGLE_CLOUD_PROJECT}" \
    --stagingLocation="gs://${MY_BUCKET}/stage/" \
    --tempLocation="gs://${MY_BUCKET}/temp/" \
    --inputPath="projects/${GOOGLE_CLOUD_PROJECT}/topics/${MY_TOPIC}" \
    --workerMachineType=n1-standard-1 \
    --region="${REGION}" \
    --dataset="${BQ_DATASET}" \
    --bigQueryTable="${BQ_TABLE}" \
    --outputPath="gs://${MY_BUCKET}/out/" \
    --jsonFormat=false \
    --avroSchema="$(<../orderdetails.avsc)"
    

    出力にアプリ ID が含まれます。チュートリアルの後半で必要になるため、アプリ ID をメモしておきます。

  2. コンソールで Dataflow に移動します。

    Dataflow に移動

  3. パイプラインのステータスを表示するには、アプリ ID をクリックします。パイプラインのステータスがグラフで表示されます。

    パイプライン ステータスのグラフ。

コードを確認する

AvroToBigQuery.java ファイルの pipeline オプションと必須パラメータが、コマンドライン パラメータを介して渡されています。ストリーミング モード オプションも有効になっています。BigQuery テーブル スキーマが、BigQuery IOBeam スキーマを使用して Avro スキーマから自動生成されます。

Avro 入力形式の場合、オブジェクトは Pub/Sub から読み取られます。入力形式が JSON の場合、イベントは読み取られた後 Avro オブジェクトに変換されます。

Schema avroSchema = new Schema.Parser().parse(options.getAvroSchema());

if (options.getJsonFormat()) {
  return input
      .apply("Read Json", PubsubIO.readStrings().fromTopic(options.getInputPath()))
      .apply("Make GenericRecord", MapElements.via(JsonToAvroFn.of(avroSchema)));
} else {
  return input.apply("Read GenericRecord", PubsubIO.readAvroGenericRecords(avroSchema)
      .fromTopic(options.getInputPath()));
}

パイプラインは分岐して、2 つの独立した書き込みを実行します。

BigQueryIO が Beam スキーマを使用して Avro オブジェクトを内部で TableRow オブジェクトに変換し、BigQuery に書き込みます。BigQuery のデータ型と Avro のデータ型の間のマッピングをご覧ください。

BigQuery で結果を見る

パイプラインをテストするため、gen.py スクリプトを起動します。このスクリプトは、注文イベントの生成をシミュレートし、イベントを Pub/Sub トピックに push します。

  1. Cloud Shell で、サンプル イベント生成スクリプトのディレクトリに移動し、スクリプトを実行します。

    cd $HOME/bigquery-ingest-avro-dataflow-sample/generator
    python3 -m venv env
    . ./env/bin/activate
    pip install -r requirements.txt
    python3 gen.py -p $GOOGLE_CLOUD_PROJECT -t $MY_TOPIC -n 100 -f avro
    
  2. コンソールで BigQuery に移動します。

    BigQuery に移動

  3. テーブル スキーマを確認するには、sales データセットをクリックし、orders テーブルを選択します。env.sh のデフォルトの環境変数を変更した場合は、データセット名とテーブル名が異なる可能性があります。

    orders テーブルのテーブル スキーマ。

  4. 一部のサンプルデータを表示するには、クエリエディタでクエリを実行します。

    SELECT * FROM sales.orders LIMIT 5
    

    サンプルデータのクエリ結果。

    BigQuery テーブル スキーマが Avro レコードから自動生成され、データが自動的に BigQuery テーブル構造に変換されます。

クリーンアップ

プロジェクトの削除

  1. コンソールで [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

個々のリソースの削除

  1. 次の手順に沿って Dataflow ジョブを停止します。

  2. Cloud Storage バケットを削除します。

    gsutil rm -r gs://$MY_BUCKET
    

次のステップ