Dataflow を使用してリレーショナル データベースから BigQuery に ETL を実行する

このチュートリアルでは、Dataflow を使用して、オンライン トランザクション処理(OLTP)のリレーショナル データベースから分析を目的として BigQuery にデータを抽出、変換、読み込み(ETL)する方法について説明します。

このチュートリアルは、BigQuery の分析クエリ機能と Dataflow のバッチ処理機能の利用に関心があるデータベース管理者、運用の専門家、クラウド アーキテクトを対象としています。

OLTP データベースは、多くの場合、e コマースサイト、SaaS(Software as a Service)アプリケーション、ゲームなどの情報を格納し、トランザクションを処理するリレーショナル データベースです。OLTP データベースは通常、ACID プロパティ(原子性、整合性、独立性、永続性)を必要とするトランザクション用に最適化されており、そのスキーマは一般的に高度に正規化されています。一方、データ ウェアハウスは、トランザクションよりもデータの取得や分析用に最適化される傾向があり、一般的に、正規化されていないスキーマを特長としています。通常、OLTP データベースのデータを非正規化した方が BigQuery での分析には役立ちます。

目標

このチュートリアルでは、非正規化された BigQuery データに対して正規化された RDBMS データの ETL を行う 2 つの方法を示します。

  • BigQuery を使用してデータの読み込みと変換を行います。この方法では、1 回に少量のデータを BigQuery に読み込み、分析を行います。サイズの大きいデータセットや複数のデータセットの自動処理を行う前に、データセットのプロトタイプを作成する場合にも使用します。
  • Dataflow を使用してデータの読み込み、変換、クレンジングを行います。この方法は、大量のデータを読み込む場合や、複数のデータソースからデータを読み込む場合に使用します。また、データを増分的または自動的に読み込む場合にも使用します。

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。新しい Google Cloud ユーザーは無料トライアルをご利用いただけます。

このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳しくは、クリーンアップをご覧ください。

始める前に

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. Cloud Console のプロジェクト セレクタページで、Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタのページに移動

  3. Google Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Compute Engine と Dataflow API を有効にします。

    API を有効にする

  5. Cloud SDK をインストールして初期化します。

MusicBrainz データセットの使用

このチュートリアルでは、MusicBrainz データベース内のテーブルの JSON スナップショットを使用します。このデータベースは PostgreSQL 上に構築され、MusicBrainz のすべての音楽に関する情報が保存されています。MusicBrainz スキーマの要素には、次のようなものがあります。

  • アーティスト
  • リリース グループ
  • リリース
  • レコーディング
  • 作品
  • ラベル
  • これらのエンティティ間のリレーションシップ

MusicBrainz スキーマには、artistrecordingartist_credit_name という 3 つの関連テーブルが定義されています。artist_credit は、レコーディングでアーティストに与えられたクレジットを表し、artist_credit_name 行は artist_credit 値でレコーディングと対応するアーティストをリンクしています。

このチュートリアルで使用する PostgreSQL テーブルは、すでに JSON 形式で抽出済みです。この操作を自身で行う場合には、次のサンプルコードを使用します。

pg_cmd="\\copy (select row_to_json(r) from (select * from artist) r ) to
exported_artist.json"
psql -w -h $host -U $user -d $db -c $pg_cmd
sed -i -e 's/\\\\/\\/g' exported_artist.json # clean up extra '\' characters

方法 1: BigQuery で ETL を行う

この方法では、1 回に少量のデータを BigQuery に読み込み、分析を行います。サイズの大きいデータセットや複数のデータセットの自動処理を行う前に、データセットのプロトタイプを作成する場合にも使用します。

BigQuery データセットを作成する

次の図は、BigQuery データセットの作成手順を表しています。

BigQuery データセットを作成する手順。

MusicBrainz のテーブルを BigQuery に個別に読み込み、各行に必要なデータリンクが含まれるように、読み込んだテーブルを結合します。結合結果を新しい BigQuery テーブルに保存します。この処理が完了したら、読み込んだ元のテーブルを削除できます。

  1. Cloud Console で、BigQuery を開きます。

    BigQuery を開く

  2. [リソース] の下で、プロジェクトの名前をクリックします。

  3. 左側のナビゲーションで、[+ データを追加] をクリックします。

  4. [データセットを作成] ダイアログで、次の手順に従います。

    1. [データセット ID] フィールドに「musicbrainz」と入力します。
    2. [データのロケーション] を [デフォルト] のままにします。
  5. [データセットを作成] をクリックします。

MusicBrainz テーブルをインポートする

各 MusicBrainz テーブルで次の操作を行って、作成したデータセットにテーブルを追加します。

  1. Cloud Console でデータセット名をクリックし、[+テーブルを作成] をクリックします。
  2. [テーブルの作成] ダイアログで、以下の手順を行い、[テーブルを作成] をクリックします。

    1. [ソース] の [テーブルの作成元] プルダウン リストで、[Google Cloud Storage] を選択します。
    2. [Cloud Storage バケットからファイルを選択] フィールドに、データファイルの URL「gs://solutions-public-assets/bqetl/artist.json」を入力します。
    3. [ファイル形式] で [JSON(改行区切り)] を選択します。
    4. [テーブル名] にテーブル名「artist」を入力します。
    5. [テーブルタイプ] で [ネイティブ テーブル] を選択したままにします。
    6. [スキーマ] セクションの下で、[テキストとして編集] をクリックしてオンにします。
    7. artist スキーマ ファイルをダウンロードします
    8. [スキーマ] セクションの内容をダウンロードしたスキーマ ファイルの内容に置き換えます。

    ダウンロードした JSON ファイルの更新されたスキーマを含む [テーブルの作成] ダイアログ。

  3. 読み込みジョブが完了するまで少し待ちます。ジョブをモニタリングするには、[ジョブ履歴] をクリックします。

    読み込みが完了すると、データセットの下に新しいテーブルが表示されます。

  4. 次の変更を加えて、artist_credit_name テーブルに対してステップ 1~3 を繰り返します。

  5. 次の変更を加えて、recording テーブルに対してステップ 1~3 を繰り返します。

データを手動で非正規化する

データを非正規化するには、データを、分析用に保持する特定のメタデータとともに、アーティストのレコーディングを 1 行とする新しい BigQuery テーブルに結合します。

  1. Cloud Console で、次のクエリをコピーしてクエリエディタに貼り付けます。

    SELECT artist.id, artist.gid as artist_gid,
           artist.name as artist_name, artist.area,
           recording.name as recording_name, recording.length,
           recording.gid as recording_gid, recording.video
      FROM `[PROJECT_ID].[DATASET].artist` as artist
          INNER JOIN `[PROJECT_ID].[DATASET].artist_credit_name` AS artist_credit_name
               ON artist.id = artist_credit_name.artist
          INNER JOIN `[PROJECT_ID].[DATASET].recording` AS recording
               ON artist_credit_name.artist_credit = recording.artist_credit
    

    [DATASET] は以前に作成したデータセットの名前(musicbrainz など)に置き換え、[PROJECT_ID] は Google Cloud プロジェクト ID に置き換えます。

  2. [展開] プルダウン リストをクリックし、[クエリの設定] を選択します。

  3. [クエリの設定] カードで、次の操作を行います。

    1. [クエリ結果の宛先テーブルを設定する] チェックボックスをオンにします。
    2. [テーブル名] に「recordings_by_artists_manual.」と入力します。
    3. [宛先テーブルの書き込み設定] で、[テーブルを上書きする] をクリックします。
    4. [大容量の結果を許可する(サイズ上限なし)] チェックボックスをオンにします。
    5. [ジョブの優先度] をデフォルトの [インタラクティブ] のままにします。
    6. [SQL 言語] をデフォルトの [標準] のままにします。
    7. [保存] をクリックします。
  4. [実行] をクリックします。

    クエリが完了すると、新しく作成した BigQuery テーブルでは、クエリ結果からのデータがアーティストごとの曲に編成されます。

    宛先テーブルのクエリ設定。

方法 2: Dataflow で BigQuery に ETL を行う

チュートリアルのこのセクションでは、BigQuery UI を使用する代わりにサンプル プログラムを使用します。このプログラムでは、Dataflow パイプラインを使用して BigQuery にデータを読み込みます。次に、Dataflow プログラミング モデルを使用してデータの非正規化とクレンジングを行い、BigQuery に読み込みます。

始める前に、コンセプトとサンプルコードを確認してください。

コンセプトを確認する

データはサイズが小さく、BigQuery UI を使用して簡単にアップロードできますが、このチュートリアルでは Cloud Dataflow を使用して ETL を行うこともできます。大規模な結合処理(500~5,000 列で、10 TB を超えるデータ)の場合には、次の理由から BigQuery UI ではなく Cloud Dataflow を使用して BigQuery に ETL を行います。

  • データを保存して後で結合するのではなく、データを BigQuery に読み込むときにクレンジングまたは変換する。したがって、データは結合され変換された状態でのみ BigQuery に格納されるため、この方法ではストレージ要件も低くなります。
  • カスタムデータのクレンジングを行う(これは SQL では簡単に実現できません)。
  • 読み込み処理中に、OLTP 以外のデータ(ログやリモートからアクセスするデータなど)とデータを組み合わせる。
  • 継続的インテグレーションまたは継続的デプロイ(CI / CD)でデータの読み込みのテストとデプロイを自動化する。
  • 長い期間における段階的な繰り返し、ETL プロセスの強化、改善を期待する。
  • 1 回限りの ETL を行うのではなく、データを増分的に追加する。

次の図は、サンプル プログラムが作成するデータ パイプラインを表しています。

BigQuery を使用したデータ パイプライン

このサンプルコードでは、多くのパイプライン ステップがグループ化され、便利なメソッドでラッピングされ、わかりやすい名前が付けられ、再利用されています。上の図で、再利用されているステップは破線で囲まれています。

パイプライン コードを確認する

このコードでは、次の操作を実行するパイプラインを作成します。

  1. 結合に含める各テーブルを文字列の PCollection に読み込みます。各要素は、テーブル行の JSON 表現で構成されています。

    public static PCollection<String> loadText(Pipeline p, String name) {
      BQETLOptions options = (BQETLOptions) p.getOptions();
      String loadingBucket = options.getLoadingBucketURL();
      String objectToLoad = storedObjectName(loadingBucket, name);
      return p.apply(name, TextIO.read().from(objectToLoad));
    }
  2. これらの JSON 文字列をオブジェクト表現(MusicBrainzDataObject オブジェクト)に変換し、列の値の 1 つ(主キーまたは外部キー)で編成します。

    public static PCollection<KV<Long, MusicBrainzDataObject>> loadTableFromText(PCollection<String> text, String name, String keyName) {
      final String namespacedKeyname = name + "_" + keyName;
      return text.apply("load " + name,
                        MapElements
                          .into(new TypeDescriptor<KV<Long, MusicBrainzDataObject>>() {})
                          .via( (String input) -> {
                                MusicBrainzDataObject datum = JSONReader.readObject(name, input);
                                Long key = (Long) datum.getColumnValue(namespacedKeyname);
                                return KV.of(key, datum);
                                })
             );
    }
  3. 共通のアーティストでリストを結合します。artist_credit_name がアーティストのクレジットとレコーディングをリンクし、アーティストの外部キーが設定されます。artist_credit_name テーブルがキー値 KV オブジェクトのリストとして読み込まれます。K のメンバーがアーティストです。

    PCollection<MusicBrainzDataObject> artistCredits =
        MusicBrainzTransforms.innerJoin("artists with artist credits", artists, artistCreditName);
  4. MusicBrainzTransforms.innerJoin() メソッドを使用してリストを結合します。

    public static PCollection<MusicBrainzDataObject>
                         innerJoin(String name,
                                   PCollection<KV<Long, MusicBrainzDataObject>> table1,
                                   PCollection<KV<Long, MusicBrainzDataObject>> table2) {
      final TupleTag<MusicBrainzDataObject> t1 = new TupleTag<MusicBrainzDataObject>(){};
      final TupleTag<MusicBrainzDataObject> t2 = new TupleTag<MusicBrainzDataObject>(){};
      PCollection<KV<Long, CoGbkResult>> joinedResult = group(name, table1, table2, t1, t2);
    1. 結合するキーメンバーで KV オブジェクトのコレクションをグループ化します。KV オブジェクトの PCollection に長整数型キー(artist.id 列の値)が設定され、CoGbkResult が生成されます(キーの結果でグループが結合されていることを表します)。CoGbkResult オブジェクトは、最初と 2 番目の PCollections に共通のキー値を持つオブジェクト リストのタプルです。このタプルは、group メソッドで CoGroupByKey 操作を実行する前に各 PCollection に対して構成されるタプルタグでアドレス指定されます。
    2. オブジェクトの一致を MusicBrainzDataObject オブジェクトにマージし、結合結果を表します。

          PCollection<List<MusicBrainzDataObject>> mergedResult =
              joinedResult.apply("merge join results",
                           MapElements
                         .into(new TypeDescriptor<List<MusicBrainzDataObject>>() {})
                         .via( ( KV<Long, CoGbkResult> group ) -> {
                             List<MusicBrainzDataObject> result = new ArrayList<MusicBrainzDataObject>();
                             Iterable<MusicBrainzDataObject> leftObjects = group.getValue().getAll(t1);
                             Iterable<MusicBrainzDataObject> rightObjects = group.getValue().getAll(t2);
                             leftObjects.forEach((MusicBrainzDataObject l) -> {
                               rightObjects.forEach((MusicBrainzDataObject r) -> {
                                 result.add(l.duplicate().merge(r));
                               });
                             });
                             return result;
                           }
                         )
      );
    3. コレクションを KV オブジェクトのリストに再編成し、次の結合を開始します。ここで、K 値は artist_credit 列で、recording テーブルとの結合に使用されます。

      PCollection<KV<Long,MusicBrainzDataObject>> artistCreditNamesByArtistCredit =  MusicBrainzTransforms.by("artist_credit_name_artist_credit", artistCredits);
    4. MusicBrainzDataObject オブジェクトの最終的なコレクションを取得します。この結果を artist_credit.id で編成された recordings のコレクションと結合します。

      PCollection<MusicBrainzDataObject> artistRecordings = MusicBrainzTransforms.innerJoin("joined recordings",
         artistCreditNamesByArtistCredit, recordingsByArtistCredit);
    5. 結果の MusicBrainzDataObjects オブジェクトを TableRows にマッピングします。

      PCollection<TableRow> tableRows = MusicBrainzTransforms.transformToTableRows(artistRecordings, bqTableSchema);
    6. 結果の TableRows を BigQuery に書き込みます。

      tableRows.apply(
           "Write to BigQuery",
           BigQueryIO.writeTableRows()
          .to(BQETLOptions.getBigQueryTablename())
          .withSchema(bqTableSchema)
          .withCustomGcsTempLocation(StaticValueProvider.of(BQETLOptions.getTempLocation() ))
          .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Dataflow パイプライン プログラミングの詳細については、プログラミング モデルに関する次のトピックをご覧ください。

コードによって行われるステップを確認したら、パイプラインを実行できます。

パイプライン コードを実行する

  1. Cloud Console で Cloud Shell を開きます。

    Cloud Shell を開く

  2. プロジェクトの環境変数を設定します。

    export PROJECT_ID=[PROJECT_ID]
    export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
    

    [PROJECT_ID] は実際の Google Cloud プロジェクトのプロジェクト ID に置き換えます。[CHOOSE_AN_APPROPRIATE_ZONE]Google Cloud ゾーンに置き換えてください。

  3. パイプライン スクリプトで使用される環境変数を設定します。

    export DESTINATION_TABLE=recordings_by_artists_dataflow
    export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
    export DATASET=musicbrainz
    export SERVICE_ACCOUNT=project-owner
    
  4. gcloud が、チュートリアルの開始時に作成または選択したプロジェクトを使用していることを確認します。

    gcloud config set project $PROJECT_ID
    
  5. パイプラインを実行するサービス アカウントを作成します。

    gcloud iam service-accounts create ${SERVICE_ACCOUNT} \
        --display-name "Project Owner Account"
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member serviceAccount:${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com \
        --role roles/owner
    gcloud iam service-accounts keys create \
        ~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json \
        --iam-account ${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com
    

    このコマンドは、サービス アカウント キーが格納されている JSON ファイルをダウンロードします。このファイルは安全な場所に保存してください。

  6. 環境変数 GOOGLE_APPLICATION_CREDENTIALS をサービス アカウント キーが含まれる JSON ファイルのファイルパスに設定します。

    export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
    
  7. Dataflow コードを含むリポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/bigquery-etl-dataflow-sample.git
    
  8. サンプルのあるディレクトリに移動します。

    cd bigquery-etl-dataflow-sample
    
  9. Dataflow ジョブでは、パイプラインの実行に使用するバイナリ ファイルをステージングするために Cloud Storage にバケットが必要なため、Cloud Storage にステージング バケットを作成します。

    gsutil mb gs://$STAGING_BUCKET
    
  10. [STAGING_BUCKET_NAME] のオブジェクト ライフサイクルに dataflow-staging-policy.json ファイルの値を設定します。

    gsutil lifecycle set dataflow-staging-policy.json gs://$STAGING_BUCKET
    
  11. Dataflow ジョブを実行します。

    ./run.sh simple
    
  12. パイプラインの進行状況を確認するには、Cloud Console で [Dataflow] ページに移動します。

    Dataflow ページに移動

    ジョブのステータスはステータス列に表示されます。[Succeeded] というステータスは、ジョブが完了したことを示します。

  13. (省略可)ジョブグラフとステップの詳細を表示するには、ジョブ名(etl-into-bigquery-bqetlsimple など)をクリックします。

  14. Cloud Console で、[BigQuery] ページに移動します。

    BigQuery ページに移動

    Google Cloud プロジェクトが選択されていることを確認します。

  15. 新しいテーブルでクエリを実行するには、[クエリエディタ] ペインに次のように入力します。

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow
    WHERE artist_area is NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    新しいテーブルに対するクエリで更新されたクエリエディタ。

データをクレンジングする

次に、Dataflow パイプラインを少し変更します。次の図のように、ルックアップ テーブルを読み込み、副入力として処理します。

副入力用に更新された Dataflow パイプライン。

結果の BigQuery テーブルにクエリを実行する場合、MusicBrainz データベースの area テーブルから地域の数値 ID を手動で検索することなくアーティストの取得元を推測するのは困難です。これによって、クエリ結果の分析は想定されるほど容易ではなくなります。

同様に、アーティストの性別が ID で表されていますが、MusicBrainz の gender テーブル全体に存在する行は 3 行のみです。この問題を解決するため、MusicBrainz の area テーブルと gender テーブルを使用して ID を適切なラベルにマッピングするステップを Dataflow パイプラインに追加します。

artist_area テーブルと artist_gender テーブルは、どちらもアーティストやレコーディングのデータテーブルよりも含まれる行数が大幅に少なくなります。後者のテーブルの要素数は、それぞれ地理的なエリアの数や性別によって制限されます。

このため、ルックアップ ステップでは副入力と呼ばれる Dataflow 機能が使用されます。

副入力は、行で区切られた JSON 形式のテーブル エクスポートとして読み込まれ、テーブルデータを 1 つのステップで非正規化するために使用されます。

副入力をパイプラインに追加するコードを確認する

パイプラインを実行する前に、新しいステップについて詳しく理解するためにコードを確認します。

BQETLSimple.java ファイルで、コメントアウトされた行を確認します。これらは、次のステップではコメント化解除されています。

//PCollection<KV<Long,MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p,"artist","id",
//        MusicBrainzTransforms.lookup("area", "id", "name", "area", "begin_area"),
//        MusicBrainzTransforms.lookup("gender","id","name","gender"));

PCollection<KV<Long, MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p, "artist", "id");

このコードは、副入力を使用してデータのクレンジングを行います。MusicBrainzTransforms クラスにより、副入力を使用してより簡単に外部キーの値をラベルにマッピングできます。MusicBrainzTransforms ライブラリのメソッドを使用すると、内部ルックアップ クラスを作成できます。ルックアップ クラスは、各ルックアップ テーブルと、ラベルと変数の長さ引数で置換されるフィールドを記述します。keyKey はルックアップ キーを含む列の名前です。valueKey は対応するラベルを含む列の名前です。

public static LookupDescription lookup(String objectName, String keyKey, String valueKey, String... destinationKeys) {
  return new LookupDescription(objectName, keyKey, valueKey, destinationKeys);
}

それぞれの副入力は、1 つのマップ オブジェクトとして読み込まれ、ID に対応するラベルの検索に使用されます。

まず、ルックアップ テーブルの JSON が最初に空の名前空間を持つ MusicBrainzDataObjects に読み込まれ、Key 列の値から Value 列の値へのマップに変換されます。

public static PCollectionView<Map<Long, String>> loadMapFromText(PCollection<String> text, String name, String keyKey, String valueKey) {
  // column/Key names are namespaced in MusicBrainzDataObject
  String keyKeyName = name + "_" + keyKey;
  String valueKeyName = name + "_" + valueKey;

  PCollection<KV<Long, String>> entries = text.apply(
        "sideInput_" + name,
        MapElements
          .into(new TypeDescriptor<KV<Long, String>>() {})
          .via((String input) -> {
                 MusicBrainzDataObject object = JSONReader.readObject(name, input);
                 Long key = (Long) object.getColumnValue(keyKeyName);

                 String value = (String) object.getColumnValue(valueKeyName);
                 return KV.of(key, value);
               })
        );

  return entries.apply(View.<Long, String>asMap());
}

これらの各 Map オブジェクトが、destinationKey の値で Map に読み込まれます。このキーは、検索された値で置き換わります。

List<SimpleEntry<ArrayList<String>, PCollectionView<Map<Long, String>>>> mapSideInputs = new ArrayList<SimpleEntry<ArrayList<String>, PCollectionView<Map<Long, String>>>>();

for (LookupDescription mapper : mappers) {
  PCollectionView<Map<Long, String>> mapView = loadMap(text.getPipeline(), mapper.objectName, mapper.keyKey, mapper.valueKey);
  List<String> destKeyList =
      mapper.destinationKeys.stream()
                            .map( destinationKey -> name + "_" + destinationKey )
                            .collect(Collectors.toList());

    mapSideInputs.add(new SimpleEntry(destKeyList, mapView));

}

JSON からアーティスト オブジェクトを変換するときに、destinationKey の値(数字で始まります)がラベルで置き換わります。

Map<Long, String> sideInputMap = c.sideInput(mapping.getValue());

List<String> keyList = mapping.getKey();

keyList.forEach( ( String key ) -> {
  Long id = (Long) result.getColumnValue(key);
  if (id != null) {
    String label = (String) sideInputMap.get(id);
    if (label == null) {
      label = "" + id;
    }
    result.replace(key, label);

BQETLSimple.java を変更し、ルックアップを使用して artist_areaartist_gender のフィールドのデータをデコードする手順は、次のとおりです。

  1. プログラム フローを少し変更します。

    1. ルックアップを使用して、アーティストのデータを読み込む行をコメント化解除します。
    2. ルックアップを使用せずに、アーティストのデータを読み込む loadTable の呼び出しをコメントにします。
    //PCollection<KV<Long,MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p,"artist","id",
    //        MusicBrainzTransforms.lookup("area", "id", "name", "area", "begin_area"),
    //        MusicBrainzTransforms.lookup("gender","id","name","gender"));
    
    PCollection<KV<Long, MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p, "artist", "id");
  2. 対応する int フィールドをコメントにし、対応する string フィールドのコメントを解除して、artist_areaartist_genderTableFieldSchemasint から string に変更します。

    /*Switch these two lines when using mapping table for artist_area */
    //        .stringField("artist_area")
            .intField("artist_area")
    /*Switch these two lines when using mapping table for artist_gender */
    //        .stringField("artist_gender")
            .intField("artist_gender")
    /*Switch these two lines when using mapping table for artist_begin_area */
            .intField("artist_begin_area")
    //      .stringField("artist_begin_area")
  3. パイプライン コードを再度実行する手順は、次のとおりです。

    1. プロジェクトの環境変数を設定します。

      export PROJECT_ID=[PROJECT_ID]
      export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
      
    2. 環境が設定されていることを確認します。

      export DESTINATION_TABLE=recordings_by_artists_dataflow_sideinputs
      export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
      export DATASET=musicbrainz
      export SERVICE_ACCOUNT=project-owner
      
    3. 環境変数 GOOGLE_APPLICATION_CREDENTIALS をサービス アカウント キーが格納された JSON ファイルのファイルパスに設定します。

      export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
      
    4. パイプラインを実行して、アーティスト行内にレコーディング行をネストします。

      ./run.sh simple
      
  4. artist_areaartist_gender を含む同じクエリを実行します。

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow_sideinputs
    WHERE artist_area IS NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    出力で、artist_areaartist_gender がデコードされました。

    「artist_area」と「artist_gender」によってデコードされた出力。

BigQuery スキーマを最適化する

このチュートリアルの最後の部分では、ネストしたフィールドを使用して、より最適なテーブル スキーマを生成するパイプラインを実行します。

少し時間を取って、テーブルのこの最適化されたバージョンを生成するために使用するコードを確認してください。

次の図は、若干異なる Dataflow パイプラインを表しています。ここでは、重複するアーティスト行を作成するのではなく、アーティストのレコーディングをアーティスト行にネストします。

アーティストのレコーディングをアーティスト行にネストする Dataflow パイプライン。

現在のデータ表現はかなりフラットです。クレジットのあるレコーディングごとに 1 つの行が存在し、BigQuery スキーマから取得したアーティストのすべてのメタデータを含みます。また、すべてのレコーディングと artist_credit_name メタデータも含まれています。このフラットな表現には、少なくとも 2 つの欠点があります。

  • アーティストのレコーディングごとに artist メタデータを繰り返すため、必要なストレージが増加します。
  • データを JSON としてエクスポートすると、レコーディング データがネストされたアーティストではなく、このデータを繰り返す配列がエクスポートされます。おそらく、前者が必要なものです。

1 行に 1 つのレコーディングを保存するのではなく、Dataflow パイプラインに変更を行うことで、アーティスト レコードの繰り返しフィールドとしてレコーディングを保存できます。パフォーマンス上の問題はなく、追加のストレージも必要ありません。

アーティスト情報とレコーディングを artist_credit_name.artist で結合せずに、この代替パイプラインがレコーディングのリストを作成し、アーティスト オブジェクト内にネストします。

public static PCollection<MusicBrainzDataObject> nest(PCollection<KV<Long, MusicBrainzDataObject>> parent,
                                                      PCollection<KV<Long, MusicBrainzDataObject>> child,
                                                      String nestingKey) {
  final TupleTag<MusicBrainzDataObject> parentTag = new TupleTag<MusicBrainzDataObject>(){};
  final TupleTag<MusicBrainzDataObject> childTag = new TupleTag<MusicBrainzDataObject>(){};

  PCollection<KV<Long, CoGbkResult>> joinedResult = group("nest " + nestingKey, parent, child, parentTag, childTag);
  return joinedResult.apply("merge join results " + nestingKey,
                            MapElements
                             .into(new TypeDescriptor<MusicBrainzDataObject>() {})
                             .via((KV<Long, CoGbkResult> group) -> {
                                MusicBrainzDataObject parentObject = group.getValue().getOnly(parentTag);
                                Iterable<MusicBrainzDataObject> children = group.getValue().getAll(childTag);
                                List<MusicBrainzDataObject> childList = new ArrayList<MusicBrainzDataObject>();
                                children.forEach(childList::add);
                                parentObject = parentObject.duplicate();
                                parentObject.addColumnValue("recordings", childList);
                                return parentObject;
                              })
                         );
}

TableRow は、BigQuery API でサイズ制限があります。1 つのレコードにネストできるレコーディングは 1,000 要素に制限されています。特定のアーティストに 1,000 を超えるレコーディングが存在すると、コードが artist メタデータを含む行を複製し、複製した行にレコーディング データのネストを行います。

private static List<TableRow> toTableRows(MusicBrainzDataObject mbdo, Map<String, Object> serializableSchema) {
  TableRow row = new TableRow();
  List<TableRow> result = new ArrayList<TableRow>();
  Map<String, List<MusicBrainzDataObject>> nestedLists = new HashMap<String, List<MusicBrainzDataObject>>();
  Set<String> keySet = serializableSchema.keySet();
  /*
   *  construct a row object without the nested objects
   */
  int maxListSize = 0;
  for (String key : keySet) {
    Object value = serializableSchema.get(key);
    Object fieldValue = mbdo.getColumnValue(key);
    if (fieldValue != null) {
      if (value instanceof Map) {
        List<MusicBrainzDataObject> list = (List<MusicBrainzDataObject>) fieldValue;
        if (list.size() > maxListSize) {
          maxListSize = list.size();
        }
        nestedLists.put(key, list);
      } else {
        row.set(key, fieldValue);
      }

    }
  }
  /*
   * add the nested objects but break up the nested objects across duplicate rows if nesting limit exceeded
   */
  TableRow parent = row.clone();
  Set<String> listFields = nestedLists.keySet();
  for (int i = 0; i < maxListSize; i++) {
    parent = (parent == null ? row.clone() : parent);
    final TableRow parentRow = parent;
    nestedLists.forEach((String key, List<MusicBrainzDataObject> nestedList) -> {
      if (nestedList.size() > 0) {
        if (parentRow.get(key) == null) {
          parentRow.set(key, new ArrayList<TableRow>());
        }
        List<TableRow> childRows = (List<TableRow>) parentRow.get(key);
        childRows.add(toChildRow(nestedList.remove(0), (Map<String, Object>) serializableSchema.get(key)));
      }
    });
    if ((i > 0) && (i % BIGQUERY_NESTING_LIMIT == 0)) {
      result.add(parent);
      parent = null;
    }
  }
  if (parent != null) {
    result.add(parent);
  }
  return result;
}

この図は、パイプラインのソース、変換、シンクを示しています。

ソース、変換、シンクを含む最適化されたパイプライン。

ほとんどの場合、ステップ名は apply メソッド呼び出しの一部としてコードで指定されます。

この最適化されたパイプラインを作成する手順は、次のとおりです。

  1. Cloud Shell で、パイプライン スクリプト用に環境が設定されていることを確認します。

    export PROJECT_ID=[PROJECT_ID]
    export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
    export DESTINATION_TABLE=recordings_by_artists_dataflow_nested
    export DATASET=musicbrainz
    export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
    export SERVICE_ACCOUNT=project-owner
    
  2. 環境変数 GOOGLE_APPLICATION_CREDENTIALS をサービス アカウント キーが格納された JSON ファイルのファイルパスに設定します。

    export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
    
  3. パイプラインを実行して、アーティスト行内にレコーディング行をネストします。

    ./run.sh nested
    
  4. BigQuery のネストされたテーブルからフィールドに対してクエリを実行します。

    SELECT artist_name, artist_gender, artist_area, artist_recordings
    FROM musicbrainz.recordings_by_artists_dataflow_nested
    WHERE artist_area IS NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    ネストされたテーブルのクエリ結果

  5. クエリを実行して STRUCT から値を抽出し、それらの値を使用して結果をフィルタします。

    SELECT artist_name,
           artist_gender,
           artist_area,
           ARRAY(SELECT artist_credit_name_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS artist_credit_name_name,
           ARRAY(SELECT recording_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS recording_name
     FROM musicbrainz.recordings_by_artists_dataflow_nested,
          UNNEST(recordings_by_artists_dataflow_nested.artist_recordings) AS artist_recordings_struct
    WHERE artist_recordings_struct.recording_name LIKE "%Justin%"
    LIMIT 1000;
    

    結果をフィルタするクエリ。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud Platform アカウントに課金されないようにする手順は次のとおりです。

プロジェクトの削除

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] ページに移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

リソースを個別に削除する

プロジェクト全体ではなく個々のリソースを削除する場合は、以下の手順に従ってください。

Cloud Storage バケットを削除する

  1. Cloud Console で、[Cloud Storage ブラウザ] ページに移動します。

    Cloud Storage ブラウザページに移動

  2. 削除するバケットのチェックボックスをクリックします。
  3. [削除] をクリックして、バケットを削除します。

BigQuery データセットを削除する

  1. BigQuery ウェブ UI を開きます。

    BigQuery を開く

  2. チュートリアルで作成した BigQuery データセットを選択します。

  3. [削除] をクリックします。

次のステップ