Dataflow を使用して Avro レコードを BigQuery にストリーミングする

このチュートリアルでは、テーブル スキーマを自動的に生成し、入力要素を変換することにより、Dataflow を使用して Avro SpecificRecord オブジェクトを BigQuery に格納する方法について説明します。また、Avro で生成されたクラスを使用して、Dataflow パイプラインのワーカー間で中間データを具体化または転送する方法についても説明します。

Apache Avro は、データの構造化にスキーマに依存するシリアル化システムです。Avro データの読み取りまたは書き込み時にスキーマが常に存在するため、シリアル化は高速かつ小規模で実行されます。パフォーマンス上のメリットから、この方法はシステム間でメッセージを転送する際によく利用されています。たとえば、メッセージ ブローカー経由でイベントを分析システムに送信するアプリで使用されています。Avro スキーマは、BigQuery データ ウェアハウス スキーマの管理に使用できます。Avro スキーマを BigQuery テーブル構造に変換するには、このチュートリアルで説明するようにカスタムコードを作成する必要があります。

このチュートリアルは、Avro スキーマを使用して BigQuery データ ウェアハウス スキーマを管理することに関心があるデベロッパーとアーキテクトを対象としています。また、Avro と Java に精通していることを前提としています。

次の図に、このチュートリアルで説明するアーキテクチャの概要を示します。

BigQuery データ ウェアハウス スキーマを管理する Avro スキーマのアーキテクチャ。

このアーキテクチャ パターンを説明するため、このチュートリアルでは次の処理を行うシンプルな注文処理システムを使用します。

  • ユーザーが購入を行うと、オンライン アプリがイベントを生成します。
  • 注文オブジェクトには、一意の識別子、購入された商品のリスト、タイムスタンプが含まれます。
  • Dataflow パイプラインが Pub/Sub トピックから OrderDetails SpecificRecord Avro メッセージを読み取ります。
  • Dataflow パイプラインが Avro ファイルとしてレコードを Cloud Storage に書き込みます。
  • OrderDetails クラスが、対応する BigQuery スキーマを自動的に生成します。
  • 汎用変換関数により、OrderDetails オブジェクトが BigQuery に書き込まれます。

目標

  • Dataflow を使用して Pub/Sub データ ストリームから JSON 文字列を取り込む。
  • JSON オブジェクトを Avro で生成されたクラスのオブジェクトに変換する。
  • Avro スキーマから BigQuery テーブル スキーマを生成する。
  • Avro レコードを Cloud Storage のファイルに書き込む。
  • Avro レコードを BigQuery に書き込む。

料金

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。新しい Google Cloud ユーザーは無料トライアルをご利用いただけます。

このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳しくは、クリーンアップをご覧ください。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. BigQuery, Cloud Storage, and Dataflow API を有効にします。

    API を有効にする

  5. Cloud Console で、Cloud Shell をアクティブにします。

    Cloud Shell をアクティブにする

    Cloud Console の下部にある Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。gcloud コマンドライン ツールなどの Cloud SDK がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。

環境設定

  1. Cloud Shell で、ソース リポジトリのクローンを作成します。

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/bigquery-ingest-avro-dataflow-sample.git
    
  2. Avro クラスを生成します。

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    mvn generate-sources
    

    このコマンドを実行すると、orderdetails.avsc ファイルを使用して OrderDetails クラスと OrderItems クラスが生成されます。OrderDetails クラスには一意の識別子、タイムスタンプ、OrderItems リストがあります。OrderItems クラスには一意の識別子、名前、価格があります。Avro スキーマが BigQuery テーブルに挿入されます。このテーブルの行に、OrderItem タイプのレコードの配列を含む注文が格納されます。詳しくは、ネストされた列と繰り返し列の指定をご覧ください。

  3. env.sh ファイルを開きます。

    cd $HOME/bigquery-ingest-avro-dataflow-sample
    nano env.sh
    
  4. env.sh ファイルには、このチュートリアルで使用できるデフォルト値が設定されています。この値は環境に合わせて変更できます。

    # pubsub topic
    MY_TOPIC="avro-records"
    
    # Cloud Storage Bucket
    MY_BUCKET="$GOOGLE_CLOUD_PROJECT""_avro_beam"
    
    # Avro file Cloud Storage output path
    AVRO_OUT="$MY_BUCKET/""out/"
    
    # Region for Cloud Pub/Sub and Cloud Dataflow
    REGION="us-central1"
    
    # Region for BigQuery
    BQ_REGION="US"
    
    # BigQuery dataset name
    BQ_DATASET="sales"
    
    # BigQuery table name
    `BQ_TABLE=`"`orders`"
    
    # Maximum number of Dataflow workers
    NUM_WORKERS=1
    

    次のように置き換えます。

    • avro-records: Pub/Sub トピックの名前。
    • $GOOGLE_CLOUD_PROJECT"_avro_beam: Cloud プロジェクト ID によって生成された Cloud Storage バケットの名前。
    • $MY_BUCKET/""out/": Avro 出力を含む Cloud Storage バケットのパス。
    • us-central1: Pub/Sub と Dataflow に使用するリージョン。リージョンの詳細については、地域とリージョンをご覧ください。
    • US: BigQuery のリージョン。ロケーションの詳細については、データセットのロケーションをご覧ください。
    • sales: BigQuery データセット名。
    • orders: BigQuery テーブル名。
    • 1: Dataflow ワーカーの最大数。
  5. 環境変数を設定します。

     . ./env.sh
    

リソースの作成

  1. Cloud Shell で Pub/Sub トピックを作成します。

    gcloud pubsub topics create $MY_TOPIC
    
  2. Cloud Storage バケットを作成します。

    gsutil mb -l $REGION -c regional gs://$MY_BUCKET
    

    Cloud Storage バケットは、アプリによって生成された未加工のイベントをバックアップします。また、Dataproc で実行される Spark と Hadoop ジョブでオフライン分析と検証を行う際に代替ソースとして使用されます。

  3. BigQuery データセットを作成します。

    bq --location=$BQ_REGION mk --dataset $GOOGLE_CLOUD_PROJECT:$BQ_DATASET

    BigQuery のデータセットには、単一リージョンまたは複数のリージョンを含む地域のテーブルとビューが含まれます。詳細については、データセットの作成をご覧ください。

Beam Dataflow アプリの起動

  1. Cloud Shell で、Dataflow ランナーにパイプラインをデプロイして実行します。

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    mvn compile exec:java \
        -Dexec.mainClass=com.google.cloud.solutions.beamavro.AvroToBigQuery \
        -Dexec.cleanupDaemonThreads=false \
        -Dexec.args=" \
        --project=$GOOGLE_CLOUD_PROJECT \
        --runner=DataflowRunner \
        --stagingLocation=gs://$MY_BUCKET/stage/ \
        --tempLocation=gs://$MY_BUCKET/temp/ \
        --inputPath=projects/$GOOGLE_CLOUD_PROJECT/topics/$MY_TOPIC \
        --workerMachineType=n1-standard-1 \
        --maxNumWorkers=$NUM_WORKERS \
        --region=$REGION \
        --dataset=$BQ_DATASET \
        --bqTable=$BQ_TABLE \
        --outputPath=$AVRO_OUT"
    

    出力にアプリ ID が含まれます。チュートリアルの後半で必要になるため、アプリ ID をメモしておきます。

  2. Cloud Console で Dataflow に移動します。

    Dataflow に移動

  3. パイプラインのステータスを表示するには、アプリ ID をクリックします。パイプラインのステータスがグラフで表示されます。

    パイプライン ステータスのグラフ。

コードを確認する

AvroToBigQuery.java ファイルの pipeline オプションと必須パラメータが、コマンドライン パラメータを介して渡されています。ストリーミング モード オプションも有効になっています。BigQuery テーブル スキーマが Avro クラススキーマから生成され、後で BigQuery IO クラスで使用されています。

TableSchema ts = BigQueryAvroUtils.getTableSchema(OrderDetails.SCHEMA$);

AvroUtils クラスが Avro スキーマ オブジェクトのフィールドに対して繰り返し実行され、対応する TableFieldSchema オブジェクトが生成されます。このオブジェクトは TableSchema にラップされて返されます。

Avro 入力形式の場合、オブジェクトは Pub/Sub から読み取られます。入力形式が JSON の場合、イベントは読み取られた後 Avro オブジェクトに変換されます。

private static PCollection<OrderDetails> getInputCollection(
    Pipeline pipeline, String inputPath, FORMAT format) {
  if (format == FORMAT.JSON) {
    // Transform JSON to Avro
    return pipeline
        .apply("Read JSON from PubSub", PubsubIO.readStrings().fromTopic(inputPath))
        .apply("To binary", ParDo.of(new JSONToAvro()));
  } else {
    // Read Avro
    return pipeline.apply(
        "Read Avro from PubSub", PubsubIO.readAvros(OrderDetails.class).fromTopic(inputPath));
  }
}

パイプラインが分岐します。Write to Cloud Storage 変換は複合変換で、10 秒間レコードを収集した後、AvroIO ライターを使用して、これらのレコードを Cloud Storage の Avro ファイルに書き込みます。

ods.apply(
    "Write to GCS",
    new AvroWriter()
        .withOutputPath(options.getOutputPath())
        .withRecordType(OrderDetails.class));

Write to BigQuery 変換が BigQuery テーブルにレコードを書き込みます。

ods.apply(
    "Write to BigQuery",
    BigQueryIO.write()
        .to(bqStr)
        .withSchema(ts)
        .withWriteDisposition(WRITE_APPEND)
        .withCreateDisposition(CREATE_IF_NEEDED)
        .withFormatFunction(TABLE_ROW_PARSER));

BigQueryIO 変換が TABLE_ROW_PARSER メソッドを使用して Avro オブジェクトを TableRow に変換し、変換後のオブジェクトを BigQuery に書き込みます。パーサーが BigQueryAvroUtils クラスの convertSpecificRecordToTableRow メソッドを呼び出します。これは Apache Beam プロジェクトの test クラス上に構築されています。method が Avro フィールドを繰り返し解析し、TableRow オブジェクトに追加します。

private static TableRow convertSpecificRecordToTableRow(
    SpecificRecord record, List<TableFieldSchema> fields) {
  TableRow row = new TableRow();
  for (TableFieldSchema subSchema : fields) {
    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field
    // is required, so it may not be null.
    Field field = record.getSchema().getField(subSchema.getName());
    if (field == null || field.name() == null) {
      continue;
    }
    Object convertedValue = getTypedCellValue(field.schema(), subSchema, record.get(field.pos()));
    if (convertedValue != null) {
      // To match the JSON files exported by BigQuery, do not include null values in the output.
      row.set(field.name(), convertedValue);
    }
  }

  return row;
}

以下の表に、BigQuery のデータ型と Avro のデータ型の対応を示します。DateTimestamp の型には注意してください。これらの型は、フィールドの論理型で見分けることができます。

BigQuery Avro
STRING STRING
GEOGRAPHY STRING
BYTES BYTES
INTEGER INT
FLOAT FLOAT
FLOAT64 DOUBLE
NUMERIC BYTES
BOOLEAN BOOLEAN
INT64 LONG
TIMESTAMP LONG
DATE INT
DATETIME STRING
TIME LONG
STRUCT RECORD
REPEATED FIELD ARRAY

BigQuery で結果を見る

パイプラインをテストするため、gen.py スクリプトを起動します。このスクリプトは、注文イベントの生成をシミュレートし、イベントを Pub/Sub トピックに push します。

  1. Cloud Shell で、サンプル イベント生成スクリプトのディレクトリに移動し、スクリプトを実行します。

    cd $HOME/bigquery-ingest-avro-dataflow-sample/generator
    python3 -m venv env
    . ./env/bin/activate
    pip install -r requirements.txt
    python3 gen.py -p $GOOGLE_CLOUD_PROJECT -t $MY_TOPIC -n 100 -f avro
    
  2. Cloud Console で BigQuery に移動します。

    BigQuery に移動

  3. テーブル スキーマを確認するには、sales データセットをクリックし、orders テーブルを選択します。env.sh のデフォルトの環境変数を変更した場合は、データセット名とテーブル名が異なる可能性があります。

    orders テーブルのテーブル スキーマ。

  4. 一部のサンプルデータを表示するには、クエリエディタでクエリを実行します。

    SELECT * FROM sales.orders LIMIT 5
    

    サンプルデータのクエリ結果。

    BigQuery テーブル スキーマが Avro レコードから自動生成され、データが自動的に BigQuery テーブル構造に変換されます。

クリーンアップ

プロジェクトの削除

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

個々のリソースの削除

  1. 次の手順に沿って Dataflow ジョブを停止します。

  2. Cloud Storage バケットを削除します。

    gsutil rm -r gs://$MY_BUCKET
    

次のステップ