Dataflow を使用してリレーショナル データベースから BigQuery に ETL を実行する

コレクションでコンテンツを整理 必要に応じて、コンテンツの保存と分類を行います。

このチュートリアルでは、Dataflow を使用して、オンライン トランザクション処理(OLTP)のリレーショナル データベースから分析を目的として BigQuery にデータを抽出、変換、読み込み(ETL)する方法について説明します。

このチュートリアルは、BigQuery の分析クエリ機能と Dataflow のバッチ処理機能の利用に関心があるデータベース管理者、運用の専門家、クラウド アーキテクトを対象としています。

OLTP データベースは、多くの場合、e コマースサイト、SaaS(Software as a Service)アプリケーション、ゲームなどの情報を格納し、トランザクションを処理するリレーショナル データベースです。OLTP データベースは通常、ACID プロパティ(原子性、整合性、独立性、永続性)を必要とするトランザクション用に最適化されており、そのスキーマは一般的に高度に正規化されています。一方、データ ウェアハウスは、トランザクションよりもデータの取得や分析用に最適化される傾向があり、一般的に、正規化されていないスキーマを特長としています。通常、OLTP データベースのデータを非正規化した方が BigQuery での分析には役立ちます。

目標

このチュートリアルでは、非正規化された BigQuery データに対して正規化された RDBMS データの ETL を行う 2 つの方法を示します。

  • BigQuery を使用してデータの読み込みと変換を行います。この方法では、1 回に少量のデータを BigQuery に読み込み、分析を行います。サイズの大きいデータセットや複数のデータセットの自動処理を行う前に、データセットのプロトタイプを作成する場合にも使用します。
  • Dataflow を使用してデータの読み込み、変換、クレンジングを行います。この方法は、大量のデータを読み込む場合や、複数のデータソースからデータを読み込む場合に使用します。また、データを増分的または自動的に読み込む場合にも使用します。

料金

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  4. Compute Engine と Dataflow API を有効にします。

    API を有効にする

  5. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  6. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  7. Compute Engine と Dataflow API を有効にします。

    API を有効にする

MusicBrainz データセットの使用

このチュートリアルでは、MusicBrainz データベース内のテーブルの JSON スナップショットを使用します。このデータベースは PostgreSQL 上に構築され、MusicBrainz のすべての音楽に関する情報が保存されています。MusicBrainz スキーマの要素には、次のようなものがあります。

  • アーティスト
  • リリース グループ
  • リリース
  • レコーディング
  • 作品
  • レーベル
  • これらのエンティティ間のリレーションシップ

MusicBrainz スキーマには、artistrecordingartist_credit_name という 3 つの関連テーブルが定義されています。artist_credit は、レコーディングでアーティストに与えられたクレジットを表し、artist_credit_name 行は artist_credit 値でレコーディングと対応するアーティストをリンクしています。

このチュートリアルで使用する PostgreSQL テーブルは、すでに改行区切りの JSON 形式で抽出され、Cloud Storage の公開バケットに保存されています。gs://solutions-public-assets/bqetl

この手順をご自身で行う場合は、MusicBrainz データセットを含む PostgreSQL データベースを用意し、次のコマンドを使用して各テーブルをエクスポートする必要があります。

host=POSTGRES_HOST
user=POSTGRES_USER
database=POSTGRES_DATABASE

for table in artist recording artist_credit_name
do
    pg_cmd="\\copy (select row_to_json(r) from (select * from ${table}) r ) to exported_${table}.json"
    psql -w -h ${host} -U ${user} -d ${db} -c $pg_cmd
    # clean up extra '\' characters
    sed -i -e 's/\\\\/\\/g' exported_${table}.json
done

方法 1: BigQuery で ETL を行う

この方法では、1 回に少量のデータを BigQuery に読み込み、分析を行います。サイズの大きいデータセットや複数のデータセットの自動処理を行う前に、データセットのプロトタイプを作成する場合にも使用します。

BigQuery データセットを作成する

BigQuery データセットを作成するには、MusicBrainz のテーブルを BigQuery に個別に読み込み、各行に必要なデータリンクが含まれるように、読み込んだテーブルを結合します。結合結果を新しい BigQuery テーブルに保存します。この処理が完了したら、読み込んだ元のテーブルを削除できます。

  1. Google Cloud コンソールで BigQuery を開きます。

    BigQuery を開く

  2. [エクスプローラ] パネルで、プロジェクト名の横にあるメニュー をクリックし、[データセットを作成] をクリックします。

  3. [データセットを作成] ダイアログで、次の手順を実施します。

    1. [データセット ID] フィールドに「musicbrainz」と入力します。
    2. [データのロケーション] を [us] に設定します。
    3. [データセットを作成] をクリックします。

MusicBrainz テーブルをインポートする

各 MusicBrainz テーブルで次の操作を行って、作成したデータセットにテーブルを追加します。

  1. Google Cloud コンソールの BigQuery [エクスプローラ] パネルで、プロジェクト名を含む行を展開し、新しく作成された musicbrainz データセットを表示します。
  2. musicbrainz データセットの横にあるメニュー をクリックし、[テーブルを作成] をクリックします。
  3. [テーブルを作成] ダイアログで、次の手順を実施します。

    1. [テーブルの作成元] プルダウン リストから [Google Cloud Storage] を選択します。
    2. [GCS バケットからファイルを選択] フィールドに、データファイルのパスを入力します。

      solutions-public-assets/bqetl/artist.json
      
    3. [ファイル形式] で [JSONL(改行区切り JSON)] を選択します。

    4. プロジェクトにプロジェクト名が含まれていることを確認します。

    5. データセットmusicbrainz であることを確認します。

    6. [テーブル] にテーブル名「artist」を入力します。

    7. [テーブルタイプ] で [ネイティブ テーブル] を選択したままにします。

    8. [スキーマ] セクションの下で、[テキストとして編集] をクリックしてオンにします。

    9. artist スキーマ ファイルをダウンロードし、テキスト エディタまたはビューアで開きます。

    10. [スキーマ] セクションの内容をダウンロードしたスキーマ ファイルの内容に置き換えます。

    11. [テーブルを作成] をクリックします。

  4. 読み込みジョブが完了するまで少し待ちます。

  5. 読み込みが完了すると、データセットの下に新しいテーブルが表示されます。

  6. 手順 1 ~ 5 を繰り返して、次の変更を加えて artist_credit_name テーブルを作成します。

    • ソース データファイルには次のパスを使用します。

      solutions-public-assets/bqetl/artist_credit_name.json
      
    • テーブル名として artist_credit_name を使用します。

    • artist_credit_name スキーマ ファイルをダウンロードし、スキーマの内容を使用します。

  7. 手順 1 ~ 5 を繰り返して、次の変更を加えて recording テーブルを作成します。

    • ソース データファイルには次のパスを使用します。

      solutions-public-assets/bqetl/recording.json
      
    • テーブル名として recording を使用します。

    • recording スキーマ ファイルをダウンロードし、スキーマの内容を使用します。

データを手動で非正規化する

データを非正規化するには、データを、分析用に保持する特定のメタデータとともに、アーティストのレコーディングを 1 行とする新しい BigQuery テーブルに結合します。

  1. Google Cloud コンソールで BigQuery クエリエディタが開いていない場合は、 [クエリを新規作成] をクリックします。
  2. 次のクエリをコピーして、クエリエディタに貼り付けます。

    SELECT
        artist.id,
        artist.gid AS artist_gid,
        artist.name AS artist_name,
        artist.area,
        recording.name AS recording_name,
        recording.length,
        recording.gid AS recording_gid,
        recording.video
    FROM
        `musicbrainz.artist` AS artist
    INNER JOIN
        `musicbrainz.artist_credit_name` AS artist_credit_name
    ON
        artist.id = artist_credit_name.artist
    INNER JOIN
        `musicbrainz.recording` AS recording
    ON
        artist_credit_name.artist_credit = recording.artist_credit
    
  3. [その他] プルダウン リストをクリックし、[クエリの設定] を選択します。

  4. [クエリの設定] ダイアログで、次の手順を行います。

    1. [クエリ結果の宛先テーブルを設定する] を選択して、
    2. [Dataset] に「musicbrainz」と入力し、プロジェクト内のデータセットを選択します。
    3. [テーブル ID] に「recordings_by_artists_manual」と入力します。
    4. [宛先テーブルの書き込み設定] で、[テーブルを上書きする] をクリックします。
    5. [大容量の結果を許可する(サイズ上限なし)] チェックボックスをオンにします。
    6. [保存] をクリックします。
  5. [実行] をクリックします。

    クエリが完了すると、クエリ結果からのデータが、新しく作成された BigQuery テーブルでアーティストごとの曲に編成され、結果のサンプルがクエリ結果ペインに表示されます。次に例を示します。

    id artist_gid artist_name area recording_name length recording_gid video
    1 97546 125ec42a... unknown 240 Horo Gun Toireamaid Hùgan Fhathast Air 174106 c8bbe048... FALSE
    2 266317 2e7119b5... Capella Istropolitana 189 Concerto Grosso in D minor, op. 2 no. 3: II. Adagio 134000 af0f294d... FALSE
    3 628060 34cd3689... Conspirare 5196 Liturgy, op. 42: 9. Praise the Lord from the Heavens 126933 8bab920d... FALSE
    4 423877 54401795... Boys Air Choir 1178 Nunc Dimittis 190000 111611eb... FALSE
    5 394456 9914f9f9... L’Orchestre de la Suisse Romande 23036 Concert Waltz no. 2, op. 51 509960 b16742d1... FALSE

方法 2: Dataflow で BigQuery に ETL を行う

チュートリアルのこのセクションでは、BigQuery UI を使用する代わりにサンプル プログラムを使用します。このプログラムでは、Dataflow パイプラインを使用して BigQuery にデータを読み込みます。次に、Beam プログラミング モデルを使用してデータの非正規化とクレンジングを行い、BigQuery に読み込みます。

始める前に、コンセプトとサンプルコードを確認してください。

コンセプトを確認する

データはサイズが小さく、BigQuery UI を使用して簡単にアップロードできますが、このチュートリアルでは Cloud Dataflow を使用して ETL を行うこともできます。大規模な結合処理(500~5,000 列で、10 TB を超えるデータ)の場合には、次の理由から BigQuery UI ではなく Cloud Dataflow を使用して BigQuery に ETL を行います。

  • データを保存して後で結合するのではなく、データを BigQuery に読み込むときにクレンジングまたは変換する。したがって、データは結合され変換された状態でのみ BigQuery に格納されるため、この方法ではストレージ要件も低くなります。
  • カスタムデータのクレンジングを行う(これは SQL では簡単に実現できません)。
  • 読み込み処理中に、OLTP 以外のデータ(ログやリモートからアクセスするデータなど)とデータを組み合わせる。
  • 継続的インテグレーションまたは継続的デプロイ(CI / CD)でデータの読み込みのテストとデプロイを自動化する。
  • 長い期間における段階的な繰り返し、ETL プロセスの強化、改善を期待する。
  • 1 回限りの ETL を行うのではなく、データを増分的に追加する。

次の図は、サンプル プログラムが作成するデータ パイプラインを表しています。

BigQuery を使用したデータ パイプライン

このサンプルコードでは、多くのパイプライン ステップがグループ化され、便利なメソッドでラッピングされ、わかりやすい名前が付けられ、再利用されています。上の図で、再利用されているステップは破線で囲まれています。

パイプライン コードを確認する

このコードでは、次の操作を実行するパイプラインを作成します。

  1. 結合に含める各テーブルを Cloud Storage の公開バケットから文字列の PCollection に読み込みます。各要素は、テーブル行の JSON 表現で構成されています。

    public static PCollection<String> loadText(Pipeline p, String name) {
      BQETLOptions options = (BQETLOptions) p.getOptions();
      String loadingBucket = options.getLoadingBucketURL();
      String objectToLoad = storedObjectName(loadingBucket, name);
      return p.apply(name, TextIO.read().from(objectToLoad));
    }
  2. これらの JSON 文字列をオブジェクト表現(MusicBrainzDataObject オブジェクト)に変換し、列の値の 1 つ(主キーまたは外部キー)で編成します。

    public static PCollection<KV<Long, MusicBrainzDataObject>> loadTableFromText(
        PCollection<String> text, String name, String keyName) {
      final String namespacedKeyname = name + "_" + keyName;
      return text.apply(
          "load " + name,
          MapElements.into(new TypeDescriptor<KV<Long, MusicBrainzDataObject>>() {})
              .via(
                  (String input) -> {
                    MusicBrainzDataObject datum = JSONReader.readObject(name, input);
                    Long key = (Long) datum.getColumnValue(namespacedKeyname);
                    return KV.of(key, datum);
                  }));
    }
  3. 共通のアーティストでリストを結合します。artist_credit_name がアーティストのクレジットとレコーディングをリンクし、アーティストの外部キーが設定されます。artist_credit_name テーブルがキー値 KV オブジェクトのリストとして読み込まれます。K のメンバーがアーティストです。

    PCollection<MusicBrainzDataObject> artistCredits =
        MusicBrainzTransforms.innerJoin("artists with artist credits", artists, artistCreditName);
  4. MusicBrainzTransforms.innerJoin() メソッドを使用してリストを結合します。

    public static PCollection<MusicBrainzDataObject> innerJoin(
        String name,
        PCollection<KV<Long, MusicBrainzDataObject>> table1,
        PCollection<KV<Long, MusicBrainzDataObject>> table2) {
      final TupleTag<MusicBrainzDataObject> t1 = new TupleTag<MusicBrainzDataObject>() {};
      final TupleTag<MusicBrainzDataObject> t2 = new TupleTag<MusicBrainzDataObject>() {};
      PCollection<KV<Long, CoGbkResult>> joinedResult = group(name, table1, table2, t1, t2);
    1. 結合するキーメンバーで KV オブジェクトのコレクションをグループ化します。KV オブジェクトの PCollection に長整数型キー(artist.id 列の値)が設定され、CoGbkResult が生成されます(キーの結果でグループが結合されていることを表します)。CoGbkResult オブジェクトは、最初と 2 番目の PCollections に共通のキー値を持つオブジェクト リストのタプルです。このタプルは、group メソッドで CoGroupByKey 操作を実行する前に各 PCollection に対して構成されるタプルタグでアドレス指定されます。
    2. オブジェクトの一致を MusicBrainzDataObject オブジェクトにマージし、結合結果を表します。

      PCollection<List<MusicBrainzDataObject>> mergedResult =
          joinedResult.apply(
              "merge join results",
              MapElements.into(new TypeDescriptor<List<MusicBrainzDataObject>>() {})
                  .via(
                      (KV<Long, CoGbkResult> group) -> {
                        List<MusicBrainzDataObject> result = new ArrayList<>();
                        Iterable<MusicBrainzDataObject> leftObjects = group.getValue().getAll(t1);
                        Iterable<MusicBrainzDataObject> rightObjects = group.getValue().getAll(t2);
                        leftObjects.forEach(
                            (MusicBrainzDataObject l) ->
                                rightObjects.forEach(
                                    (MusicBrainzDataObject r) -> result.add(l.duplicate().merge(r))));
                        return result;
                      }));
    3. コレクションを KV オブジェクトのリストに再編成し、次の結合を開始します。ここで、K 値は artist_credit 列で、recording テーブルとの結合に使用されます。

      PCollection<KV<Long, MusicBrainzDataObject>> artistCreditNamesByArtistCredit =
          MusicBrainzTransforms.by("artist_credit_name_artist_credit", artistCredits);
    4. MusicBrainzDataObject オブジェクトの最終的なコレクションを取得します。この結果を artist_credit.id で編成された recordings のコレクションと結合します。

      PCollection<MusicBrainzDataObject> artistRecordings =
          MusicBrainzTransforms.innerJoin(
              "joined recordings", artistCreditNamesByArtistCredit, recordingsByArtistCredit);
    5. 結果の MusicBrainzDataObjects オブジェクトを TableRows にマッピングします。

      PCollection<TableRow> tableRows =
          MusicBrainzTransforms.transformToTableRows(artistRecordings, bqTableSchema);
    6. 結果の TableRows を BigQuery に書き込みます。

      tableRows.apply(
          "Write to BigQuery",
          BigQueryIO.writeTableRows()
              .to(options.getBigQueryTablename())
              .withSchema(bqTableSchema)
              .withCustomGcsTempLocation(StaticValueProvider.of(options.getTempLocation()))
              .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
              .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Beam パイプライン プログラミングの仕組みの詳細については、プログラミング モデルに関する次のトピックをご覧ください。

コードによって行われるステップを確認したら、パイプラインを実行できます。

Cloud Storage バケットを作成する

パイプライン コードを実行する

  1. Google Cloud Console で Cloud Shell を開きます。

    Cloud Shell を開く

  2. プロジェクトとパイプライン スクリプトの環境変数を設定します。

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    export DESTINATION_TABLE=recordings_by_artists_dataflow
    export DATASET=musicbrainz
    

    PROJECT_ID は、Google Cloud プロジェクトの プロジェクト ID に置き換えます。

  3. gcloud が、チュートリアルの開始時に作成または選択したプロジェクトを使用していることを確認します。

    gcloud config set project $PROJECT_ID
    
  4. 最小権限のセキュリティ原則に沿って、Dataflow パイプライン用のサービス アカウントを作成し、必要な権限のみを付与します。roles/dataflow.workerroles/bigquery.jobUser および musicbrainzデータセットのdataEditorロール:

    gcloud iam service-accounts create musicbrainz-dataflow
    export SERVICE_ACCOUNT=musicbrainz-dataflow@${PROJECT_ID}.iam.gserviceaccount.com
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member=serviceAccount:${SERVICE_ACCOUNT} \
        --role=roles/dataflow.worker
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member=serviceAccount:${SERVICE_ACCOUNT} \
        --role=roles/bigquery.jobUser
    bq query  --use_legacy_sql=false \
        "GRANT \`roles/bigquery.dataEditor\` ON SCHEMA musicbrainz
         TO 'serviceAccount:${SERVICE_ACCOUNT}'"
    
  5. 一時ファイルに使用する Dataflow パイプライン用のバケットを作成し、musicbrainz-dataflow サービス アカウントに Owner 権限を付与します。

    export DATAFLOW_TEMP_BUCKET=gs://temp-bucket-${PROJECT_ID}
    gsutil mb -l us ${DATAFLOW_TEMP_BUCKET}
    gsutil acl ch -u ${SERVICE_ACCOUNT}:O ${DATAFLOW_TEMP_BUCKET}
    
  6. Dataflow コードを含むリポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/bigquery-etl-dataflow-sample.git
    
  7. サンプルのあるディレクトリに移動します。

    cd bigquery-etl-dataflow-sample
    
  8. Dataflow ジョブをコンパイルして実行します。

    ./run.sh simple
    

    ジョブの実行には約 10 分かかります。

  9. パイプラインの進行状況を確認するには、Google Cloud コンソールで [Dataflow] ページに移動します。

    Dataflow に移動

    ジョブのステータスはステータス列に表示されます。[Succeeded] というステータスは、ジョブが完了したことを示します。

  10. (省略可)ジョブグラフとステップの詳細を表示するには、ジョブ名(etl-into-bigquery-bqetlsimple など)をクリックします。

  11. ジョブが完了したら、BigQuery ページに移動します。

    BigQuery に移動

  12. 新しいテーブルでクエリを実行するには、[クエリエディタ] ペインに次のように入力します。

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow
    WHERE artist_area is NOT NULL
          AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    結果ペインに、次のような結果セットが表示されます。

    artist_name artist_gender artist_area recording_name recording_length
    1 mirin 2 107 Sylphia 264000
    2 mirin 2 107 Dependence 208000
    3 Gaudiburschen 1 81 Die Hände zum Himmel 210000
    4 Sa4 1 331 Ein Tag aus meiner Sicht 221000
    5 Dpat 1 7326 Cutthroat 249000
    6 Dpat 1 7326 Deloused 178000

    結果は順序付けされていないため、実際の出力は異なる場合があります。

データをクレンジングする

次に、Dataflow パイプラインを少し変更します。次の図のように、ルックアップ テーブルを読み込み、副入力として処理します。

副入力用に更新された Dataflow パイプライン。

結果の BigQuery テーブルにクエリを実行する場合、MusicBrainz データベースの area テーブルから地域の数値 ID を手動で検索することなくアーティストの取得元を推測するのは困難です。これによって、クエリ結果の分析は想定されるほど容易ではなくなります。

同様に、アーティストの性別が ID で表されていますが、MusicBrainz の gender テーブル全体に存在する行は 3 行のみです。この問題を解決するため、MusicBrainz の area テーブルと gender テーブルを使用して ID を適切なラベルにマッピングするステップを Dataflow パイプラインに追加します。

artist_area テーブルと artist_gender テーブルは、どちらもアーティストやレコーディングのデータテーブルよりも含まれる行数が大幅に少なくなります。後者のテーブルの要素数は、それぞれ地理的なエリアの数や性別によって制限されます。

このため、ルックアップ ステップでは副入力と呼ばれる Dataflow 機能が使用されます。

副入力は、musicbrainz データセットを含む公開 Cloud Storage バケットの行区切りの JSON ファイルのテーブル エクスポートとして読み込まれ、テーブルデータを 1 つのステップで非正規化するために使用されます。

副入力をパイプラインに追加するコードを確認する

パイプラインを実行する前に、新しいステップについて詳しく理解するためにコードを確認します。

このコードは、副入力を使用してデータのクレンジングを行います。MusicBrainzTransforms クラスにより、副入力を使用してより簡単に外部キーの値をラベルにマッピングできます。MusicBrainzTransforms ライブラリのメソッドを使用すると、内部ルックアップ クラスを作成できます。ルックアップ クラスは、各ルックアップ テーブルと、ラベルと変数の長さ引数で置換されるフィールドを記述します。keyKey はルックアップ キーを含む列の名前です。valueKey は対応するラベルを含む列の名前です。

public static LookupDescription lookup(
    String objectName, String keyKey, String valueKey, String... destinationKeys) {
  return new LookupDescription(objectName, keyKey, valueKey, destinationKeys);
}

それぞれの副入力は、1 つのマップ オブジェクトとして読み込まれ、ID に対応するラベルの検索に使用されます。

まず、ルックアップ テーブルの JSON が最初に空の名前空間を持つ MusicBrainzDataObjects に読み込まれ、Key 列の値から Value 列の値へのマップに変換されます。

public static PCollectionView<Map<Long, String>> loadMapFromText(
    PCollection<String> text, String name, String keyKey, String valueKey) {
  // column/Key names are namespaced in MusicBrainzDataObject
  String keyKeyName = name + "_" + keyKey;
  String valueKeyName = name + "_" + valueKey;

  PCollection<KV<Long, String>> entries =
      text.apply(
          "sideInput_" + name,
          MapElements.into(new TypeDescriptor<KV<Long, String>>() {})
              .via(
                  (String input) -> {
                    MusicBrainzDataObject object = JSONReader.readObject(name, input);
                    Long key = (Long) object.getColumnValue(keyKeyName);

                    String value = (String) object.getColumnValue(valueKeyName);
                    return KV.of(key, value);
                  }));

  return entries.apply(View.asMap());
}

これらの各 Map オブジェクトが、destinationKey の値で Map に読み込まれます。このキーは、検索された値で置き換わります。

List<SimpleEntry<List<String>, PCollectionView<Map<Long, String>>>> mapSideInputs =
    new ArrayList<>();

for (LookupDescription mapper : mappers) {
  PCollectionView<Map<Long, String>> mapView =
      loadMap(text.getPipeline(), mapper.objectName, mapper.keyKey, mapper.valueKey);
  List<String> destKeyList =
      mapper.destinationKeys.stream()
          .map(destinationKey -> name + "_" + destinationKey)
          .collect(Collectors.toList());

  mapSideInputs.add(new SimpleEntry<>(destKeyList, mapView));
}

JSON からアーティスト オブジェクトを変換するときに、destinationKey の値(数字で始まります)がラベルで置き換わります。

Map<Long, String> sideInputMap = c.sideInput(mapping.getValue());

List<String> keyList = mapping.getKey();

keyList.forEach(
    (String key) -> {
      Long id = (Long) result.getColumnValue(key);
      if (id != null) {
        String label = sideInputMap.get(id);
        if (label == null) {
          label = "" + id;
        }
        result.replace(key, label);

artist_area フィールドと artist_gender フィールドのデコードを追加するには、次のようにします。

  1. Cloud Shell で、パイプライン スクリプト用に環境が設定されていることを確認します。

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    export DESTINATION_TABLE=recordings_by_artists_dataflow_sideinputs
    export DATASET=musicbrainz
    export DATAFLOW_TEMP_BUCKET=gs://temp-bucket-${PROJECT_ID}
    export SERVICE_ACCOUNT=musicbrainz-dataflow@${PROJECT_ID}.iam.gserviceaccount.com
    

    PROJECT_ID は、Google Cloud プロジェクトの プロジェクト ID に置き換えます。

  2. パイプラインを実行して、デコードされた領域とアーティストの性別でテーブルを作成します。

    ./run.sh simple-with-lookups
    
  3. 前と同様に、パイプラインの進行状況を確認するには [Dataflow] ページに移動します。

    Dataflow に移動

    パイプラインが完了するまでに約 10 分かかります。

  4. ジョブが完了したら、BigQuery ページに移動します。

    BigQuery に移動

  5. artist_areaartist_gender を含む同じクエリを実行します。

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
      FROM musicbrainz.recordings_by_artists_dataflow_sideinputs
     WHERE artist_area is NOT NULL
       AND artist_gender IS NOT NULL
     LIMIT 1000;
    

    出力で、artist_areaartist_gender がデコードされました。

    artist_name artist_gender artist_area recording_name recording_length
    1 mirin Female Japan Sylphia 264000
    2 mirin Female Japan Dependence 208000
    3 Gaudiburschen Male Germany Die Hände zum Himmel 210000
    4 Sa4 Male Hamburg Ein Tag aus meiner Sicht 221000
    5 Dpat Male Houston Cutthroat 249000
    6 Dpat Male Houston Deloused 178000

    結果は順序付けされていないため、実際の出力は異なる場合があります。

BigQuery スキーマを最適化する

このチュートリアルの最後の部分では、ネストしたフィールドを使用して、より最適なテーブル スキーマを生成するパイプラインを実行します。

少し時間を取って、テーブルのこの最適化されたバージョンを生成するために使用するコードを確認してください。

次の図は、若干異なる Dataflow パイプラインを表しています。ここでは、重複するアーティスト行を作成するのではなく、アーティストのレコーディングをアーティスト行にネストします。

アーティストのレコーディングをアーティスト行にネストする Dataflow パイプライン。

現在のデータ表現はかなりフラットです。クレジットのあるレコーディングごとに 1 つの行が存在し、BigQuery スキーマから取得したアーティストのすべてのメタデータを含みます。また、すべてのレコーディングと artist_credit_name メタデータも含まれています。このフラットな表現には、少なくとも 2 つの欠点があります。

  • アーティストのレコーディングごとに artist メタデータを繰り返すため、必要なストレージが増加します。
  • データを JSON としてエクスポートすると、レコーディング データがネストされたアーティストではなく、このデータを繰り返す配列がエクスポートされます。おそらく、前者が必要なものです。

1 行に 1 つのレコーディングを保存するのではなく、Dataflow パイプラインに変更を行うことで、アーティスト レコードの繰り返しフィールドとしてレコーディングを保存できます。パフォーマンス上の問題はなく、追加のストレージも必要ありません。

アーティスト情報とレコーディングを artist_credit_name.artist で結合せずに、この代替パイプラインがレコーディングのリストを作成し、アーティスト オブジェクト内にネストします。

public static PCollection<MusicBrainzDataObject> nest(
    PCollection<KV<Long, MusicBrainzDataObject>> parent,
    PCollection<KV<Long, MusicBrainzDataObject>> child,
    String nestingKey) {
  final TupleTag<MusicBrainzDataObject> parentTag = new TupleTag<MusicBrainzDataObject>() {};
  final TupleTag<MusicBrainzDataObject> childTag = new TupleTag<MusicBrainzDataObject>() {};

  PCollection<KV<Long, CoGbkResult>> joinedResult =
      group("nest " + nestingKey, parent, child, parentTag, childTag);
  return joinedResult.apply(
      "merge join results " + nestingKey,
      MapElements.into(new TypeDescriptor<MusicBrainzDataObject>() {})
          .via(
              (KV<Long, CoGbkResult> group) -> {
                MusicBrainzDataObject parentObject = group.getValue().getOnly(parentTag);
                Iterable<MusicBrainzDataObject> children = group.getValue().getAll(childTag);
                List<MusicBrainzDataObject> childList = new ArrayList<>();
                children.forEach(childList::add);
                parentObject = parentObject.duplicate();
                parentObject.addColumnValue("recordings", childList);
                return parentObject;
              }));
}

BigQuery API では、一括挿入を行うときの最大行サイズの上限が 100 MB(ストリーミング挿入の場合は 10 MB)であるため、特定のレコードに対するネストされた録音の数はコードによって制限されます。1,000 要素まで割り当てて、この上限を超えないようにしてください。特定のアーティストに 1,000 を超えるレコーディングが存在すると、コードが artist メタデータを含む行を複製し、複製した行にレコーディング データのネストを行います。

private static List<TableRow> toTableRows(
    MusicBrainzDataObject mbdo, Map<String, Object> serializableSchema) {
  TableRow row = new TableRow();
  List<TableRow> result = new ArrayList<>();
  Map<String, List<MusicBrainzDataObject>> nestedLists = new HashMap<>();
  Set<String> keySet = serializableSchema.keySet();
  /*
   *  construct a row object without the nested objects
   */
  int maxListSize = 0;
  for (String key : keySet) {
    Object value = serializableSchema.get(key);
    Object fieldValue = mbdo.getColumnValue(key);
    if (fieldValue != null) {
      if (value instanceof Map) {
        @SuppressWarnings("unchecked")
        List<MusicBrainzDataObject> list = (List<MusicBrainzDataObject>) fieldValue;
        if (list.size() > maxListSize) {
          maxListSize = list.size();
        }
        nestedLists.put(key, list);
      } else {
        row.set(key, fieldValue);
      }
    }
  }
  /*
   * add the nested objects but break up the nested objects across duplicate rows if nesting
   * limit exceeded
   */
  TableRow parent = row.clone();
  Set<String> listFields = nestedLists.keySet();
  for (int i = 0; i < maxListSize; i++) {
    parent = (parent == null ? row.clone() : parent);
    final TableRow parentRow = parent;
    nestedLists.forEach(
        (String key, List<MusicBrainzDataObject> nestedList) -> {
          if (nestedList.size() > 0) {
            if (parentRow.get(key) == null) {
              parentRow.set(key, new ArrayList<TableRow>());
            }
            @SuppressWarnings("unchecked")
            List<TableRow> childRows = (List<TableRow>) parentRow.get(key);
            @SuppressWarnings("unchecked")
            Map<String, Object> map = (Map<String, Object>) serializableSchema.get(key);
            childRows.add(toChildRow(nestedList.remove(0), map));
          }
        });
    if ((i > 0) && (i % BIGQUERY_NESTING_LIMIT == 0)) {
      result.add(parent);
      parent = null;
    }
  }
  if (parent != null) {
    result.add(parent);
  }
  return result;
}

この図は、パイプラインのソース、変換、シンクを示しています。

ソース、変換、シンクを含む最適化されたパイプライン。

ほとんどの場合、ステップ名は apply メソッド呼び出しの一部としてコードで指定されます。

この最適化されたパイプラインを作成する手順は、次のとおりです。

  1. Cloud Shell で、パイプライン スクリプト用に環境が設定されていることを確認します。

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    export DESTINATION_TABLE=recordings_by_artists_dataflow_nested
    export DATASET=musicbrainz
    export DATAFLOW_TEMP_BUCKET=gs://temp-bucket-${PROJECT_ID}
    export SERVICE_ACCOUNT=musicbrainz-dataflow@${PROJECT_ID}.iam.gserviceaccount.com
    
  2. パイプラインを実行して、アーティスト行内にレコーディング行をネストします。

    ./run.sh nested
    
  3. 前と同様に、パイプラインの進行状況を確認するには [Dataflow] ページに移動します。

    Dataflow に移動

    パイプラインが完了するまでに約 10 分かかります。

  4. ジョブが完了したら、BigQuery ページに移動します。

    BigQuery に移動

  5. BigQuery のネストされたテーブルからフィールドに対してクエリを実行します。

    SELECT artist_name, artist_gender, artist_area, artist_recordings
    FROM musicbrainz.recordings_by_artists_dataflow_nested
    WHERE artist_area IS NOT NULL
          AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    出力では、artist_recordings が展開可能なネストされた行として表示されます。

    artist_name artist_gender artist_area artist_recordings
    1 mirin Female Japan (5 rows)
    3 Gaudiburschen Male Germany (1 row)
    4 Sa4 Male Hamburg (10 rows)
    6 Dpat Male Houston (9 rows)

    結果は順序付けされていないため、実際の出力は異なる場合があります。

  6. クエリを実行して STRUCT から値を抽出し、それらの値を使用して結果をフィルタします。たとえば、「Justin」という単語を含むレコーディングがあるアーティストなどです。

    SELECT artist_name,
           artist_gender,
           artist_area,
           ARRAY(SELECT artist_credit_name_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS artist_credit_name_name,
           ARRAY(SELECT recording_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS recording_name
     FROM musicbrainz.recordings_by_artists_dataflow_nested,
          UNNEST(recordings_by_artists_dataflow_nested.artist_recordings) AS artist_recordings_struct
    WHERE artist_recordings_struct.recording_name LIKE "%Justin%"
    LIMIT 1000;
    

    出力では、artist_credit_name_namerecording_name はネストされた行として表示され、展開できます。次に例を示します。

    artist_name artist_gender artist_area artist_credit_name_name recording_name
    1 Damonkenutz null null (1 row) 1 Yellowpants (Justin Martin remix)
    3 Fabian Male Germany (10+ rows) 1 Heatwave
    . 2 Starlight Love
    . 3 Dreams To Wishes
    . 4 Last Flight (Justin Faust remix)
    . ...
    4 Digital Punk Boys null null (6 rows) 1 Come True
    . 2 We Are... (Punkgirlz remix by Justin Famous)
    . 3 Chaos (short cut)
    . ...

    結果は順序付けされていないため、実際の出力は異なる場合があります。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトの削除

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

リソースを個別に削除する

プロジェクト全体ではなく個々のリソースを削除する場合は、以下の手順に従ってください。

Cloud Storage バケットを削除する

  1. Google Cloud コンソールで、Cloud Storage の [ブラウザ] ページに移動します。

    [ブラウザ] に移動

  2. 削除するバケットのチェックボックスをクリックします。
  3. バケットを削除するには、 [削除] をクリックして、指示に沿って操作します。

BigQuery データセットを削除する

  1. BigQuery ウェブ UI を開きます。

    BigQuery を開く

  2. チュートリアルで作成した BigQuery データセットを選択します。

  3. [削除] をクリックします。

次のステップ

  • BigQuery でのクエリの作成方法を学習する。同期または非同期のクエリの実行方法やユーザー定義関数(UDF)の作成方法などの詳細については、データクエリの説明をご覧ください。
  • BigQuery の構文を確認する。BigQuery は SQL に似た構文を使用しています。詳細については、クエリ リファレンス(レガシー SQL)をご覧ください。
  • Google Cloud に関するリファレンス アーキテクチャ、図、チュートリアル、ベスト プラクティスを確認する。Cloud Architecture Center を確認します。