Menjalankan ETL dari database relasional ke BigQuery menggunakan Dataflow

Last reviewed 2022-08-21 UTC

Tutorial ini menunjukkan cara menggunakan Dataflow untuk mengekstrak, mentransformasi, dan memuat data (ETL) dari database relasional pemrosesan transaksi online (OLTP) ke BigQuery untuk dianalisis.

Tutorial ini ditujukan bagi admin database, profesional operasi, dan arsitek cloud yang tertarik untuk memanfaatkan kemampuan kueri analisis dari BigQuery dan kemampuan batch processing dari Dataflow.

Database OLTP sering kali merupakan database relasional yang menyimpan informasi dan memproses transaksi untuk situs e-commerce, aplikasi software as a service (SaaS), atau game. Database OLTP biasanya dioptimalkan untuk transaksi yang memerlukan properti ACID: atomicity, consistency, isolation, dan durability, serta biasanya memiliki skema yang sangat dinormalisasi. Sebaliknya, data warehouse cenderung dioptimalkan untuk pengambilan dan analisis data, bukan transaksi, serta biasanya untuk fitur skema yang didenormalisasi. Secara umum, denormalisasi data dari database OLTP akan membuatnya lebih berguna untuk analisis di BigQuery.

Tujuan

Tutorial ini menunjukkan dua pendekatan terhadap data RDBMS yang dinormalisasi ETL ke dalam data BigQuery yang didenormalisasi:

  • Menggunakan BigQuery untuk memuat dan mentransformasi data. Gunakan pendekatan ini untuk melakukan pemuatan satu kali bagi sejumlah kecil data ke BigQuery untuk dianalisis. Anda juga dapat menggunakan pendekatan ini untuk membuat prototipe set data sebelum mengotomatiskan set data yang lebih besar atau banyak.
  • Menggunakan Dataflow untuk memuat, mentransformasi, dan membersihkan data. Gunakan pendekatan ini untuk memuat data dalam jumlah yang lebih besar, memuat data dari beberapa sumber data, atau memuat data secara bertahap maupun secara otomatis.

Biaya

Dalam dokumen ini, Anda menggunakan komponen Google Cloud yang dapat ditagih berikut:

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna baru Google Cloud mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Setelah menyelesaikan tugas yang dijelaskan dalam dokumen ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui informasi selengkapnya, lihat Pembersihan.

Sebelum memulai

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  3. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  4. Enable the Compute Engine dan Dataflow APIs.

    Enable the APIs

  5. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  6. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  7. Enable the Compute Engine dan Dataflow APIs.

    Enable the APIs

Menggunakan set data MusicBrainz

Tutorial ini mengandalkan snapshot tabel JSON di database MusicBrainz, yang dibangun di PostgreSQL dan berisi informasi tentang semua musik MusicBrainz. Beberapa elemen skema MusicBrainz mencakup:

  • Artis
  • Grup rilis
  • Rilis
  • Perekaman
  • Karya
  • Label
  • Banyak hubungan antara entity ini.

Skema MusicBrainz mencakup tiga tabel yang relevan: artist, recording, dan artist_credit_name. artist_credit mewakili kredit yang diberikan kepada artis untuk rekaman, dan baris artist_credit_name menautkan rekaman dengan artis yang sesuai melalui nilai artist_credit.

Tutorial ini menyediakan tabel PostgreSQL yang telah diekstrak ke dalam format JSON yang dipisahkan newline dan disimpan dalam bucket Cloud Storage publik: gs://solutions-public-assets/bqetl

Jika ingin melakukan langkah ini sendiri, Anda harus memiliki database PostgreSQL yang berisi set data MusicBrainz, dan gunakan perintah berikut untuk mengekspor setiap tabel:

host=POSTGRES_HOST
user=POSTGRES_USER
database=POSTGRES_DATABASE

for table in artist recording artist_credit_name
do
    pg_cmd="\\copy (select row_to_json(r) from (select * from ${table}) r ) to exported_${table}.json"
    psql -w -h ${host} -U ${user} -d ${db} -c $pg_cmd
    # clean up extra '\' characters
    sed -i -e 's/\\\\/\\/g' exported_${table}.json
done

Pendekatan 1: ETL dengan BigQuery

Gunakan pendekatan ini untuk melakukan pemuatan satu kali bagi sejumlah kecil data ke BigQuery untuk dianalisis. Anda juga dapat menggunakan pendekatan ini untuk membuat prototipe set data sebelum menggunakan otomatisasi dengan set data yang lebih besar atau banyak.

Membuat set data BigQuery

Untuk membuat set data BigQuery, muat tabel MusicBrainz ke BigQuery satu per satu, lalu gabungkan tabel yang telah Anda muat sehingga setiap baris berisi pautan data yang diinginkan. Simpan hasil join dalam tabel BigQuery baru. Kemudian Anda dapat menghapus tabel asli yang telah dimuat.

  1. Di konsol Google Cloud, buka BigQuery.

    BUKA BIGQUERY

  2. Di panel Explorer, klik menu di samping nama project Anda, lalu klik Create data set.

  3. Pada dialog Create data set, selesaikan langkah-langkah berikut:

    1. Di kolom Data set ID, masukkan musicbrainz.
    2. Tetapkan Data Location ke AS.
    3. Klik Create data set.

Impor tabel MusicBrainz

Untuk setiap tabel MusicBrainz, lakukan langkah-langkah berikut untuk menambahkan tabel ke set data yang Anda buat:

  1. Di panel Explorer BigQuery pada konsol Google Cloud, luaskan baris dengan nama project Anda untuk menampilkan set data musicbrainz yang baru dibuat.
  2. Klik menu di samping set data musicbrainz Anda, lalu klik Create Table.
  3. Dalam dialog Create Table, selesaikan langkah-langkah berikut:

    1. Dalam menu drop-down Create table from, pilih Google Cloud Storage.
    2. Di kolom Select file from GCS bucket, masukkan jalur ke file data:

      solutions-public-assets/bqetl/artist.json
      
    3. Untuk File format, pilih JSONL (Newline Delimited JSON).

    4. Pastikan Project berisi nama project Anda.

    5. Pastikan Data set adalah musicbrainz.

    6. Untuk Table, masukkan nama tabel, artist.

    7. Untuk Table type, biarkan Native table dipilih.

    8. Di bawah bagian Schema, klik untuk mengaktifkan Edit as Text.

    9. Download file skema artist dan buka di editor teks atau penampil.

    10. Ganti konten bagian Schema dengan konten file skema yang telah didownload.

    11. Klik Create Table:

  4. Tunggu beberapa saat sampai tugas pemuatan selesai.

  5. Setelah pemuatan selesai, tabel baru akan muncul di bawah set data.

  6. Ulangi langkah 1 - 5 untuk membuat tabel artist_credit_name dengan perubahan berikut:

    • Gunakan jalur berikut untuk file data sumber:

      solutions-public-assets/bqetl/artist_credit_name.json
      
    • Gunakan artist_credit_name sebagai nama Table .

    • Download artist_credit_name file skema dan gunakan konten untuk skema tersebut.

  7. Ulangi langkah 1 - 5 untuk membuat tabel recording dengan perubahan berikut:

    • Gunakan jalur berikut untuk file data sumber:

      solutions-public-assets/bqetl/recording.json
      
    • Gunakan recording sebagai nama Table .

    • Download recording file skema. dan gunakan konten untuk skema tersebut.

Denormalisasi data secara manual

Untuk melakukan denormalisasi data, gabungkan data tersebut ke tabel BigQuery baru yang memiliki satu baris untuk setiap rekaman artis, beserta metadata pilihan yang ingin Anda simpan untuk analisis.

  1. Jika editor kueri BigQuery tidak terbuka di konsol Google Cloud, klik Compose New Query.
  2. Salin kueri berikut dan tempel ke Query Editor:

    SELECT
        artist.id,
        artist.gid AS artist_gid,
        artist.name AS artist_name,
        artist.area,
        recording.name AS recording_name,
        recording.length,
        recording.gid AS recording_gid,
        recording.video
    FROM
        `musicbrainz.artist` AS artist
    INNER JOIN
        `musicbrainz.artist_credit_name` AS artist_credit_name
    ON
        artist.id = artist_credit_name.artist
    INNER JOIN
        `musicbrainz.recording` AS recording
    ON
        artist_credit_name.artist_credit = recording.artist_credit
    
  3. Klik menu drop-down More, lalu pilih Query settings.

  4. Dalam dialog Query settings, selesaikan langkah-langkah berikut:

    1. Pilih Set a destination table for query results.
    2. Di Dataset, masukkan musicbrainz lalu pilih set data di project Anda.
    3. Di Table id masukkan recordings_by_artists_manual.
    4. Untuk Destination table write preference, klik Overwrite table.
    5. Pilih kotak centang Allow Large Results (no size limit).
    6. Klik Simpan.
  5. Klik Run.

    Setelah kueri selesai, data dari hasil kueri akan disusun ke dalam lagu untuk setiap artis dalam tabel BigQuery yang baru dibuat. Berikut contoh hasil yang ditampilkan di panel Query Results, misalnya:

    Baris id artist_gid artist_name area recording_name durasi recording_gid video
    1 97546 125ec42a... unknown 240 Horo Gun Toireamaid Hùgan Fhathast Air 174106 c8bbe048... FALSE
    2 266317 2e7119b5... Capella Istropolitana 189 Concerto Grosso in D minor, op. 2 no. 3: II. Adagio 134000 af0f294d... FALSE
    3 628060 34cd3689... Conspirare 5196 Liturgy, op. 42: 9. Praise the Lord from the Heavens 126933 8bab920d... FALSE
    4 423877 54401795... Boys Air Choir 1178 Nunc Dimittis 190000 111611eb... FALSE
    5 394456 9914f9f9... L’Orchestre de la Suisse Romande 23036 Concert Waltz no. 2, op. 51 509960 b16742d1... FALSE

Pendekatan 2: ETL ke BigQuery dengan Dataflow

Di bagian tutorial ini, Anda akan menggunakan program contoh untuk memuat data ke BigQuery menggunakan pipeline Dataflow, bukan UI BigQuery. Kemudian, gunakan model pemrograman Beam untuk melakukan denormalisasi dan pembersihan data yang akan dimuat ke BigQuery.

Sebelum memulai, tinjau konsep dan kode contohnya.

Meninjau konsep

Meskipun datanya kecil dan dapat diupload dengan cepat menggunakan UI BigQuery, untuk keperluan tutorial ini Anda juga dapat menggunakan Dataflow untuk ETL. Gunakan Dataflow untuk ETL ke BigQuery, bukan UI BigQuery saat Anda melakukan penggabungan yang masif, yaitu dari sekitar 500-5000 kolom yang berisi lebih dari 10 TB data, dengan sasaran berikut:

  • Anda ingin membersihkan atau mentransformasi data saat dimuat ke BigQuery, bukan menyimpannya dan menggabungkan setelahnya. Akibatnya, pendekatan ini juga memiliki persyaratan penyimpanan yang lebih rendah karena data hanya disimpan di BigQuery dalam status digabungkan dan ditransformasi.
  • Anda berencana melakukan pembersihan data kustom (yang tidak bisa dicapai dengan SQL).
  • Anda berencana untuk menggabungkan data dengan data di luar OLTP, seperti log atau data yang diakses dari jarak jauh selama proses pemuatan.
  • Anda berencana untuk mengotomatiskan pengujian dan deployment logika pemuatan data menggunakan continuous integration atau continuous deployment (CI/CD).
  • Anda mengantisipasi iterasi, peningkatan kualitas, dan peningkatan proses ETL secara bertahap dari waktu ke waktu.
  • Anda berencana untuk menambahkan data secara bertahap, bukan melakukan ETL satu kali.

Berikut adalah diagram pipeline data yang dibuat oleh program contoh:

Pipeline data menggunakan BigQuery.

Dalam kode contoh, banyak langkah pipeline dikelompokkan atau digabungkan dalam metode yang praktis, disertai dengan nama deskriptif, dan digunakan kembali. Dalam diagram, langkah-langkah yang digunakan kembali ditunjukkan dengan batas putus-putus.

Meninjau kode pipeline

Kode ini membuat pipeline yang melakukan langkah-langkah berikut:

  1. Memuat setiap tabel yang ingin Anda jadikan bagian dalam gabungan dari bucket Cloud Storage publik ke dalam string PCollection. Setiap elemen terdiri dari representasi JSON dari baris tabel.

    public static PCollection<String> loadText(Pipeline p, String name) {
      BQETLOptions options = (BQETLOptions) p.getOptions();
      String loadingBucket = options.getLoadingBucketURL();
      String objectToLoad = storedObjectName(loadingBucket, name);
      return p.apply(name, TextIO.read().from(objectToLoad));
    }
  2. Melakukan konversi string JSON tersebut menjadi representasi objek, MusicBrainzDataObject objek, lalu mengatur representasi objek berdasarkan salah satu nilai kolom, seperti kunci utama atau kunci asing.

    public static PCollection<KV<Long, MusicBrainzDataObject>> loadTableFromText(
        PCollection<String> text, String name, String keyName) {
      final String namespacedKeyname = name + "_" + keyName;
      return text.apply(
          "load " + name,
          MapElements.into(new TypeDescriptor<KV<Long, MusicBrainzDataObject>>() {})
              .via(
                  (String input) -> {
                    MusicBrainzDataObject datum = JSONReader.readObject(name, input);
                    Long key = (Long) datum.getColumnValue(namespacedKeyname);
                    return KV.of(key, datum);
                  }));
    }
  3. Bergabung dalam daftar berdasarkan artis umum. artist_credit_name menautkan kredit artis dengan rekamannya dan menyertakan kunci asing artis. Tabel artist_credit_name dimuat sebagai daftar objek KV nilai kunci. Anggota K adalah artisnya.

    PCollection<MusicBrainzDataObject> artistCredits =
        MusicBrainzTransforms.innerJoin("artists with artist credits", artists, artistCreditName);
  4. Bergabung dalam daftar menggunakan metode MusicBrainzTransforms.innerJoin().

    public static PCollection<MusicBrainzDataObject> innerJoin(
        String name,
        PCollection<KV<Long, MusicBrainzDataObject>> table1,
        PCollection<KV<Long, MusicBrainzDataObject>> table2) {
      final TupleTag<MusicBrainzDataObject> t1 = new TupleTag<MusicBrainzDataObject>() {};
      final TupleTag<MusicBrainzDataObject> t2 = new TupleTag<MusicBrainzDataObject>() {};
      PCollection<KV<Long, CoGbkResult>> joinedResult = group(name, table1, table2, t1, t2);
    1. Kelompokkan koleksi objek KV berdasarkan anggota utama tempat Anda ingin bergabung. Cara ini akan menghasilkan PCollection objek KV dengan kunci panjang (nilai kolom artist.id) dan menghasilkan CoGbkResult (singkatan dari grup gabungan berdasarkan hasil kunci). Objek CoGbkResult adalah tuple daftar objek dengan nilai kunci yang sama dari pertama dan kedua PCollections. Tuple ini dapat ditangani menggunakan tag tuple yang dirumuskan untuk setiap PCollection sebelum menjalankan operasi CoGroupByKey dalam metode group.
    2. Menggabungkan setiap kecocokan objek menjadi objek MusicBrainzDataObject yang mewakili hasil penggabungan.

      PCollection<List<MusicBrainzDataObject>> mergedResult =
          joinedResult.apply(
              "merge join results",
              MapElements.into(new TypeDescriptor<List<MusicBrainzDataObject>>() {})
                  .via(
                      (KV<Long, CoGbkResult> group) -> {
                        List<MusicBrainzDataObject> result = new ArrayList<>();
                        Iterable<MusicBrainzDataObject> leftObjects = group.getValue().getAll(t1);
                        Iterable<MusicBrainzDataObject> rightObjects = group.getValue().getAll(t2);
                        leftObjects.forEach(
                            (MusicBrainzDataObject l) ->
                                rightObjects.forEach(
                                    (MusicBrainzDataObject r) -> result.add(l.duplicate().merge(r))));
                        return result;
                      }));
    3. Mengatur ulang koleksi menjadi daftar objek KV untuk memulai penggabungan berikutnya. Kali ini, nilai K adalah kolom artist_credit yang digunakan untuk menggabungkan dengan tabel rekaman.

      PCollection<KV<Long, MusicBrainzDataObject>> artistCreditNamesByArtistCredit =
          MusicBrainzTransforms.by("artist_credit_name_artist_credit", artistCredits);
    4. Memperoleh kumpulan objek akhir yang dihasilkan MusicBrainzDataObject dengan menggabungkan hasil tersebut dengan kumpulan rekaman yang dimuat dan diatur oleh artist_credit.id.

      PCollection<MusicBrainzDataObject> artistRecordings =
          MusicBrainzTransforms.innerJoin(
              "joined recordings", artistCreditNamesByArtistCredit, recordingsByArtistCredit);
    5. Memetakan objek MusicBrainzDataObjects yang dihasilkan ke dalam TableRows.

      PCollection<TableRow> tableRows =
          MusicBrainzTransforms.transformToTableRows(artistRecordings, bqTableSchema);
    6. Menulis TableRows yang dihasilkan ke BigQuery.

      tableRows.apply(
          "Write to BigQuery",
          BigQueryIO.writeTableRows()
              .to(options.getBigQueryTablename())
              .withSchema(bqTableSchema)
              .withCustomGcsTempLocation(StaticValueProvider.of(options.getTempLocation()))
              .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
              .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Untuk detail tentang mekanisme pemrograman pipeline Beam, tinjau topik berikut tentang model pemrograman:

Setelah meninjau langkah-langkah yang dijalankan oleh kode, Anda dapat menjalankan pipeline.

Membuat bucket Cloud Storage

Menjalankan kode pipeline

  1. Di konsol Google Cloud, buka Cloud Shell.

    Buka Cloud Shell

  2. Menetapkan variabel lingkungan untuk skrip project dan pipeline Anda

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    export DESTINATION_TABLE=recordings_by_artists_dataflow
    export DATASET=musicbrainz
    

    Ganti PROJECT_ID dengan ID project dari project Google Cloud Anda.

  3. Pastikan gcloud menggunakan project yang Anda buat atau pilih di awal tutorial:

    gcloud config set project $PROJECT_ID
    
  4. Dengan mengikuti prinsip keamanan hak istimewa terendah, buat akun layanan untuk pipeline Dataflow dan berikan hanya hak istimewa yang diperlukan: roles/dataflow.worker, roles/bigquery.jobUser, dan peran dataEditor pada set data musicbrainz:

    gcloud iam service-accounts create musicbrainz-dataflow
    export SERVICE_ACCOUNT=musicbrainz-dataflow@${PROJECT_ID}.iam.gserviceaccount.com
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member=serviceAccount:${SERVICE_ACCOUNT} \
        --role=roles/dataflow.worker
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member=serviceAccount:${SERVICE_ACCOUNT} \
        --role=roles/bigquery.jobUser
    bq query  --use_legacy_sql=false \
        "GRANT \`roles/bigquery.dataEditor\` ON SCHEMA musicbrainz
         TO 'serviceAccount:${SERVICE_ACCOUNT}'"
    
  5. Buat bucket pipeline Dataflow yang akan digunakan untuk file sementara, dan berikan akun layanan musicbrainz-dataflow hak istimewa Owner ke dalamnya:

    export DATAFLOW_TEMP_BUCKET=gs://temp-bucket-${PROJECT_ID}
    gsutil mb -l us ${DATAFLOW_TEMP_BUCKET}
    gsutil acl ch -u ${SERVICE_ACCOUNT}:O ${DATAFLOW_TEMP_BUCKET}
    
  6. Buat clone repositori yang berisi kode Dataflow:

    git clone https://github.com/GoogleCloudPlatform/bigquery-etl-dataflow-sample.git
    
  7. Ubah direktori menjadi sampel:

    cd bigquery-etl-dataflow-sample
    
  8. Kompilasi dan jalankan tugas Dataflow:

    ./run.sh simple
    

    Tugas ini perlu waktu sekitar 10 menit untuk dijalankan.

  9. Untuk melihat progres pipeline, di konsol Google Cloud, buka halaman Dataflow.

    Buka Dataflow

    Status tugas ditampilkan di kolom status. Status Berhasil menunjukkan bahwa tugas telah selesai.

  10. (Opsional) Untuk melihat grafik tugas dan detail tentang langkah-langkahnya, klik nama tugas, misalnya, etl-into-bigquery-bqetlsimple.

  11. Setelah tugas selesai, buka halaman BigQuery.

    Buka BigQuery

  12. Untuk menjalankan kueri di tabel baru, di panel Editor kueri, masukkan hal berikut:

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow
    WHERE artist_area is NOT NULL
          AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    Panel hasil akan menampilkan serangkaian hasil yang mirip dengan berikut ini:

    Baris artist_name artist_gender artist_area recording_name recording_length
    1 mirin 2 107 Sylphia 264000
    2 mirin 2 107 Dependence 208000
    3 Gaudiburschen 1 81 Die Hände zum Himmel 210000
    4 Sa4 1 331 Ein Tag aus meiner Sicht 221000
    5 Dpat 1 7326 Cutthroat 249000
    6 Dpat 1 7326 Deloused 178000

    Output yang sebenarnya mungkin berbeda karena hasilnya tidak diurutkan.

Membersihkan data

Selanjutnya, buatlah sedikit perubahan pada pipeline Dataflow sehingga Anda dapat memuat tabel pemeta dan memprosesnya sebagai input samping, seperti yang ditunjukkan pada diagram berikut.

Pipeline Dataflow telah diupdate untuk input samping

Saat membuat kueri tabel BigQuery yang dihasilkan, sulit untuk menentukan dari mana artis tersebut berasal tanpa mencari secara manual ID numerik area dari tabel area di database MusicBrainz. Hal ini menjadikan menganalisis hasil kueri menjadi sulit daripada yang seharusnya.

Demikian pula, gender artis ditampilkan sebagai ID, tetapi seluruh tabel gender MusicBrainz hanya terdiri dari tiga baris. Untuk memperbaikinya, Anda dapat menambahkan langkah di pipeline Dataflow untuk menggunakan tabel area dan gender MusicBrainz guna memetakan ID ke label yang tepat.

Tabel artist_area dan artist_gender berisi jumlah baris yang secara signifikan lebih sedikit daripada artis atau tabel data rekaman. Jumlah elemen dalam tabel berikutnya dibatasi oleh jumlah area geografis atau gender.

Hasilnya, langkah pencarian menggunakan fitur Dataflow yang disebut input samping.

Input samping dimuat sebagai ekspor tabel file JSON yang dipisahkan line di bucket Cloud Storage publik yang berisi set data musicbrainz, dan digunakan untuk denormalisasi data tabel dalam satu langkah.

Meninjau kode yang menambahkan input samping ke pipeline

Sebelum menjalankan pipeline, tinjau kode untuk mendapatkan pemahaman yang lebih baik tentang langkah-langkah baru.

Kode ini menunjukkan pembersihan data dengan input samping. Kelas MusicBrainzTransforms memberikan kemudahan tambahan saat menggunakan input samping untuk memetakan nilai kunci asing ke label. Library MusicBrainzTransforms menyediakan metode yang membuat kelas pencarian internal. Kelas pencarian menjelaskan setiap tabel pemeta dan kolom yang diganti dengan label dan argumen dengan panjang variabel. keyKey adalah nama kolom yang berisi kunci untuk pencarian dan valueKey adalah nama kolom yang berisi label yang sesuai.

public static LookupDescription lookup(
    String objectName, String keyKey, String valueKey, String... destinationKeys) {
  return new LookupDescription(objectName, keyKey, valueKey, destinationKeys);
}

Setiap input samping dimuat sebagai objek peta tunggal, yang digunakan untuk mencari label yang sesuai untuk ID.

Pertama, JSON untuk tabel pemeta awalnya dimuat ke MusicBrainzDataObjects dengan namespace kosong dan diubah menjadi peta dari nilai kolom Key ke nilai kolom Value.

public static PCollectionView<Map<Long, String>> loadMapFromText(
    PCollection<String> text, String name, String keyKey, String valueKey) {
  // column/Key names are namespaced in MusicBrainzDataObject
  String keyKeyName = name + "_" + keyKey;
  String valueKeyName = name + "_" + valueKey;

  PCollection<KV<Long, String>> entries =
      text.apply(
          "sideInput_" + name,
          MapElements.into(new TypeDescriptor<KV<Long, String>>() {})
              .via(
                  (String input) -> {
                    MusicBrainzDataObject object = JSONReader.readObject(name, input);
                    Long key = (Long) object.getColumnValue(keyKeyName);

                    String value = (String) object.getColumnValue(valueKeyName);
                    return KV.of(key, value);
                  }));

  return entries.apply(View.asMap());
}

Setiap objek Map ini dimasukkan ke dalam Map berdasarkan nilai destinationKey-nya, yang merupakan kunci untuk mengganti dengan nilai yang dicari.

List<SimpleEntry<List<String>, PCollectionView<Map<Long, String>>>> mapSideInputs =
    new ArrayList<>();

for (LookupDescription mapper : mappers) {
  PCollectionView<Map<Long, String>> mapView =
      loadMap(text.getPipeline(), mapper.objectName, mapper.keyKey, mapper.valueKey);
  List<String> destKeyList =
      mapper.destinationKeys.stream()
          .map(destinationKey -> name + "_" + destinationKey)
          .collect(Collectors.toList());

  mapSideInputs.add(new SimpleEntry<>(destKeyList, mapView));
}

Kemudian, saat mentransformasi objek artis dari JSON, nilai destinationKey (yang dimulai sebagai angka) akan diganti dengan labelnya.

Map<Long, String> sideInputMap = c.sideInput(mapping.getValue());

List<String> keyList = mapping.getKey();

keyList.forEach(
    (String key) -> {
      Long id = (Long) result.getColumnValue(key);
      if (id != null) {
        String label = sideInputMap.get(id);
        if (label == null) {
          label = "" + id;
        }
        result.replace(key, label);

Untuk menambahkan decoding kolom artist_area dan artist_gender, selesaikan langkah-langkah berikut:

  1. Di Cloud Shell, pastikan lingkungan telah disiapkan untuk skrip pipeline:

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    export DESTINATION_TABLE=recordings_by_artists_dataflow_sideinputs
    export DATASET=musicbrainz
    export DATAFLOW_TEMP_BUCKET=gs://temp-bucket-${PROJECT_ID}
    export SERVICE_ACCOUNT=musicbrainz-dataflow@${PROJECT_ID}.iam.gserviceaccount.com
    

    Ganti PROJECT_ID dengan ID project dari project Google Cloud Anda.

  2. Jalankan pipeline untuk membuat tabel dengan area yang didekode dan gender artis:

    ./run.sh simple-with-lookups
    
  3. Seperti sebelumnya, untuk melihat progres pipeline, buka halaman Dataflow.

    Buka Dataflow

    Pipeline memerlukan waktu sekitar 10 menit untuk diselesaikan.

  4. Setelah tugas selesai, buka halaman BigQuery.

    Buka BigQuery

  5. Jalankan kueri yang sama yang menyertakan artist_area dan artist_gender:

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
      FROM musicbrainz.recordings_by_artists_dataflow_sideinputs
     WHERE artist_area is NOT NULL
       AND artist_gender IS NOT NULL
     LIMIT 1000;
    

    Dalam output, artist_area dan artist_gender kini didekode:

    Baris artist_name artist_gender artist_area recording_name recording_length
    1 mirin Female Japan Sylphia 264000
    2 mirin Female Japan Dependence 208000
    3 Gaudiburschen Male Germany Die Hände zum Himmel 210000
    4 Sa4 Male Hamburg Ein Tag aus meiner Sicht 221000
    5 Dpat Male Houston Cutthroat 249000
    6 Dpat Male Houston Deloused 178000

    Output yang sebenarnya mungkin berbeda, karena hasilnya tidak diurutkan.

Mengoptimalkan skema BigQuery

Di bagian akhir tutorial ini, Anda akan menjalankan pipeline yang menghasilkan skema tabel yang lebih optimal menggunakan kolom bertingkat.

Luangkan waktu untuk meninjau kode yang digunakan untuk membuat versi tabel yang dioptimalkan ini.

Diagram berikut menunjukkan pipeline Dataflow yang sedikit berbeda yang menempatkan rekaman artis dalam setiap baris artis, bukan membuat baris artis duplikat.

Pipeline Dataflow yang menempatkan rekaman artis dalam setiap baris artis.

Representasi data saat ini cukup datar. Artinya, opsi ini mencakup satu baris per rekaman yang dikreditkan yang mencakup semua metadata artis dari skema BigQuery, serta semua rekaman dan metadata artist_credit_name. Representasi datar ini memiliki setidaknya dua kelemahan:

  • Tindakan ini mengulangi metadata artist untuk setiap rekaman yang dikreditkan ke artis. Pada akhirnya, tindakan ini meningkatkan penyimpanan yang diperlukan.
  • Saat Anda mengekspor data sebagai JSON, JSON akan mengekspor array yang mengulangi data tersebut, bukan artis dengan data perekaman bertingkat — yang mungkin Anda inginkan.

Tanpa adanya penalti performa dan tanpa menggunakan penyimpanan tambahan, alih-alih menyimpan satu rekaman per baris, Anda dapat menyimpan rekaman sebagai kolom berulang di setiap kumpulan data artis dengan membuat beberapa perubahan pada pipeline Dataflow.

Daripada menggabungkan rekaman dengan informasi artisnya melalui artist_credit_name.artist, pipeline alternatif ini akan membuat daftar rekaman bertingkat dalam objek artis.

public static PCollection<MusicBrainzDataObject> nest(
    PCollection<KV<Long, MusicBrainzDataObject>> parent,
    PCollection<KV<Long, MusicBrainzDataObject>> child,
    String nestingKey) {
  final TupleTag<MusicBrainzDataObject> parentTag = new TupleTag<MusicBrainzDataObject>() {};
  final TupleTag<MusicBrainzDataObject> childTag = new TupleTag<MusicBrainzDataObject>() {};

  PCollection<KV<Long, CoGbkResult>> joinedResult =
      group("nest " + nestingKey, parent, child, parentTag, childTag);
  return joinedResult.apply(
      "merge join results " + nestingKey,
      MapElements.into(new TypeDescriptor<MusicBrainzDataObject>() {})
          .via(
              (KV<Long, CoGbkResult> group) -> {
                MusicBrainzDataObject parentObject = group.getValue().getOnly(parentTag);
                Iterable<MusicBrainzDataObject> children = group.getValue().getAll(childTag);
                List<MusicBrainzDataObject> childList = new ArrayList<>();
                children.forEach(childList::add);
                parentObject = parentObject.duplicate();
                parentObject.addColumnValue("recordings", childList);
                return parentObject;
              }));
}

BigQuery API memiliki batas ukuran baris maksimum sebesar 100 MB saat melakukan penyisipan massal (10 MB untuk streaming inserts), sehingga kode ini membatasi jumlah rekaman bertingkat untuk data tertentu hingga 1000 elemen untuk memastikan bahwa batas ini tidak tercapai. Jika artis tertentu memiliki lebih dari 1000 rekaman, kode tersebut akan menduplikasi baris, termasuk metadata artist, dan terus menyusun data rekaman di baris duplikat.

private static List<TableRow> toTableRows(
    MusicBrainzDataObject mbdo, Map<String, Object> serializableSchema) {
  TableRow row = new TableRow();
  List<TableRow> result = new ArrayList<>();
  Map<String, List<MusicBrainzDataObject>> nestedLists = new HashMap<>();
  Set<String> keySet = serializableSchema.keySet();
  /*
   *  construct a row object without the nested objects
   */
  int maxListSize = 0;
  for (String key : keySet) {
    Object value = serializableSchema.get(key);
    Object fieldValue = mbdo.getColumnValue(key);
    if (fieldValue != null) {
      if (value instanceof Map) {
        @SuppressWarnings("unchecked")
        List<MusicBrainzDataObject> list = (List<MusicBrainzDataObject>) fieldValue;
        if (list.size() > maxListSize) {
          maxListSize = list.size();
        }
        nestedLists.put(key, list);
      } else {
        row.set(key, fieldValue);
      }
    }
  }
  /*
   * add the nested objects but break up the nested objects across duplicate rows if nesting
   * limit exceeded
   */
  TableRow parent = row.clone();
  Set<String> listFields = nestedLists.keySet();
  for (int i = 0; i < maxListSize; i++) {
    parent = (parent == null ? row.clone() : parent);
    final TableRow parentRow = parent;
    nestedLists.forEach(
        (String key, List<MusicBrainzDataObject> nestedList) -> {
          if (nestedList.size() > 0) {
            if (parentRow.get(key) == null) {
              parentRow.set(key, new ArrayList<TableRow>());
            }
            @SuppressWarnings("unchecked")
            List<TableRow> childRows = (List<TableRow>) parentRow.get(key);
            @SuppressWarnings("unchecked")
            Map<String, Object> map = (Map<String, Object>) serializableSchema.get(key);
            childRows.add(toChildRow(nestedList.remove(0), map));
          }
        });
    if ((i > 0) && (i % BIGQUERY_NESTING_LIMIT == 0)) {
      result.add(parent);
      parent = null;
    }
  }
  if (parent != null) {
    result.add(parent);
  }
  return result;
}

Diagram ini menunjukkan sumber, transformasi, dan sink dari pipeline.

Pipeline yang dioptimalkan dengan sumber, transformasi, dan sink.

Pada umumnya, nama langkah disediakan dalam kode sebagai bagian dari panggilan metode apply.

Untuk membuat pipeline yang dioptimalkan ini, selesaikan langkah-langkah berikut:

  1. Di Cloud Shell, pastikan lingkungan telah disiapkan untuk skrip pipeline:

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    export DESTINATION_TABLE=recordings_by_artists_dataflow_nested
    export DATASET=musicbrainz
    export DATAFLOW_TEMP_BUCKET=gs://temp-bucket-${PROJECT_ID}
    export SERVICE_ACCOUNT=musicbrainz-dataflow@${PROJECT_ID}.iam.gserviceaccount.com
    
  2. Jalankan pipeline untuk menyusun baris rekaman di dalam baris artis:

    ./run.sh nested
    
  3. Seperti sebelumnya, untuk melihat progres pipeline, buka halaman Dataflow.

    Buka Dataflow

    Pipeline memerlukan waktu sekitar 10 menit untuk diselesaikan.

  4. Setelah tugas selesai, buka halaman BigQuery.

    Buka BigQuery

  5. Kolom kueri dari tabel bertingkat di BigQuery:

    SELECT artist_name, artist_gender, artist_area, artist_recordings
    FROM musicbrainz.recordings_by_artists_dataflow_nested
    WHERE artist_area IS NOT NULL
          AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    Dalam output, artist_recordings ditampilkan sebagai baris bertingkat yang dapat diperluas:

    Baris artist_name artist_gender artist_area artist_recordings
    1 mirin Female Japan (5 rows)
    3 Gaudiburschen Male Germany (1 row)
    4 Sa4 Male Hamburg (10 rows)
    6 Dpat Male Houston (9 rows)

    Output yang sebenarnya mungkin berbeda karena hasilnya tidak diurutkan.

  6. Jalankan kueri untuk mengekstrak nilai dari STRUCT dan gunakan nilai tersebut untuk memfilter hasilnya, misalnya untuk artis yang memiliki rekaman berisi kata "Justin":

    SELECT artist_name,
           artist_gender,
           artist_area,
           ARRAY(SELECT artist_credit_name_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS artist_credit_name_name,
           ARRAY(SELECT recording_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS recording_name
     FROM musicbrainz.recordings_by_artists_dataflow_nested,
          UNNEST(recordings_by_artists_dataflow_nested.artist_recordings) AS artist_recordings_struct
    WHERE artist_recordings_struct.recording_name LIKE "%Justin%"
    LIMIT 1000;
    

    Dalam output, artist_credit_name_name dan recording_name ditampilkan sebagai baris bertingkat yang dapat diperluas, misalnya:

    Baris artist_name artist_gender artist_area artist_credit_name_name recording_name
    1 Damonkenutz null null (1 row) 1 Yellowpants (Justin Martin remix)
    3 Fabian Male Germany (10+ rows) 1 Heatwave
    . 2 Starlight Love
    . 3 Dreams To Wishes
    . 4 Last Flight (Justin Faust remix)
    . ...
    4 Digital Punk Boys null null (6 rows) 1 Come True
    . 2 We Are... (Punkgirlz remix by Justin Famous)
    . 3 Chaos (short cut)
    . ...

    Output yang sebenarnya mungkin berbeda karena hasilnya tidak diurutkan.

Pembersihan

Agar tidak perlu membayar biaya pada akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut, atau simpan project dan hapus setiap resource.

Menghapus project

  1. Di konsol Google Cloud, buka halaman Manage resource.

    Buka Manage resource

  2. Pada daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
  3. Pada dialog, ketik project ID, lalu klik Shut down untuk menghapus project.

Menghapus resource individual

Ikuti langkah-langkah berikut untuk menghapus resource individual, bukan menghapus seluruh project.

Menghapus bucket Cloud Storage

  1. Di konsol Google Cloud, buka halaman Buckets Cloud Storage.

    Buka Buckets

  2. Klik kotak centang untuk bucket yang ingin Anda dihapus.
  3. Untuk menghapus bucket, klik Hapus, lalu ikuti petunjuk.

Menghapus set data BigQuery

  1. Buka UI web BigQuery.

    Buka BIGQUERY

  2. Pilih set data BigQuery yang Anda buat selama tutorial.

  3. Klik Delete.

Langkah selanjutnya

  • Pelajari lebih lanjut cara menulis kueri untuk BigQuery. Pembuatan kueri data menjelaskan cara menjalankan kueri sinkron dan asinkron, membuat fungsi yang ditentukan pengguna (UDFs), dan lainnya.
  • Jelajahi sintaksis BigQuery. BigQuery menggunakan sintaksis mirip SQL yang dijelaskan dalam Referensi kueri (legacy SQL).
  • Pelajari arsitektur referensi, diagram, dan praktik terbaik tentang Google Cloud. Lihat Cloud Architecture Center kami.