Mithilfe von Dataflow ETL von einer relationalen Datenbank in BigQuery ausführen

In dieser Anleitung wird gezeigt, wie Sie mit Dataflow Daten aus einer relationalen OLTP-Datenbank (Datenbank zur Online-Transaktionsverarbeitung) extrahieren und transformieren und anschließend zur Analyse in BigQuery laden können.

Diese Anleitung richtet sich an Datenbankadministratoren, Betriebsexperten und Cloud-Architekten, die die Abfragefunktionen von BigQuery zu Analysezwecken und die Batchverarbeitungsfunktionen von Dataflow nutzen möchten.

OLTP-Datenbanken sind oft relationale Datenbanken, die für E-Commerce-Seiten, SaaS-Anwendungen (Software as a Service) oder Spiele Informationen speichern und Transaktionen verarbeiten. OLTP-Datenbanken sind normalerweise für Transaktionen optimiert, die die ACID-Attribute erfordern: Atomarität, Konsistenz, Isolation und Langlebigkeit (Atomicity, Consistency, Isolation, Durability). Ihre Schemas sind in der Regel stark normalisiert. Data Warehouses tendieren hingegen dazu, für den Datenabruf und die Datenanalyse statt für Transaktionen optimiert zu sein, und haben meist denormalisierte Schemas. Allgemein werden Daten durch die Denormalisierung aus einer OLTP-Datenbank nützlicher für die Analyse in BigQuery.

Ziele

In der Anleitung werden zwei Ansätze dafür gezeigt, wie normalisierte RDBMS-Daten (Relational Database Management System) durch Extrahieren, Transformieren und Laden (ETL) in denormalisierte BigQuery-Daten umgewandelt werden können:

  • BigQuery zum Laden und Transformieren der Daten verwenden: Nutzen Sie diesen Ansatz, um ein einmaliges Laden einer kleinen Datenmenge in BigQuery für die Analyse durchzuführen. Sie können diesen Ansatz auch verwenden, um für Ihr Dataset einen Prototyp zu erstellen, bevor Sie die Automatisierung mit großen oder mehreren Datasets nutzen.
  • Dataflow zum Laden, Transformieren und Bereinigen der Daten verwenden: Nutzen Sie diesen Ansatz, um größere Datenmengen oder Daten aus mehreren Datenquellen zu laden oder Daten schrittweise oder automatisch zu laden.

Kosten

In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss dieser Anleitung können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Vorbereitung

  1. Melden Sie sich bei Ihrem Google-Konto an.

    Wenn Sie noch kein Konto haben, melden Sie sich hier für ein neues Konto an.

  2. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  3. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für Ihr Projekt aktiviert ist.

  4. Compute Engine und Dataflow APIs aktivieren.

    Aktivieren Sie die APIs

  5. Installieren und initialisieren Sie das Cloud SDK.

MusicBrainz-Datasets verwenden

Diese Anleitung beruht auf JSON-Snapshots von Tabellen in der MusicBrainz-Datenbank, die auf PostgreSQL basiert und Informationen über die gesamte MusicBrainz-Musik enthält. Zu den Elementen des MusicBrainz-Schemas gehören:

  • Interpreten
  • Versionsgruppen
  • Versionen
  • Aufnahmen
  • Works
  • Labels
  • Viele Beziehungen zwischen diesen Elementen

Das MusicBrainz-Schema umfasst drei relevante Tabellen: artist, recording und artist_credit_name. artist_credit steht für die Zuordnung einer Aufnahme zu einem Interpreten. Die Zeilen artist_credit_name verknüpfen die Aufnahme mit dem entsprechenden Interpreten über den artist_credit-Wert.

Diese Anleitung enthält die PostgreSQL-Tabellen, die bereits in das JSON-Format extrahiert wurden. Sie können den folgenden Beispielcode verwenden, um diesen Schritt selbst auszuführen:

pg_cmd="\\copy (select row_to_json(r) from (select * from artist) r ) to
exported_artist.json"
psql -w -h $host -U $user -d $db -c $pg_cmd
sed -i -e 's/\\\\/\\/g' exported_artist.json # clean up extra '\' characters

Ansatz 1: ETL mit BigQuery

Nutzen Sie diesen Ansatz, um ein einmaliges Laden einer kleinen Datenmenge in BigQuery für die Analyse durchzuführen. Sie können diesen Ansatz auch nutzen, um für Ihr Dataset einen Prototyp zu erstellen, bevor Sie die Automatisierung mit großen oder mehreren Datasets nutzen.

BigQuery-Dataset erstellen

Das folgende Diagramm zeigt die Schritte, die Sie zum Erstellen eines BigQuery-Datasets ausführen:

Schritte zum Erstellen eines BigQuery-Datasets

Sie laden die MusicBrainz-Tabellen einzeln in BigQuery und führen die geladenen Tabellen anschließend zusammen, sodass jede Zeile die von Ihnen gewünschte Datenverknüpfung enthält. Die Join-Ergebnisse speichern Sie in einer neuen BigQuery-Tabelle. Dann können Sie die ursprünglichen Tabellen, die Sie geladen haben, löschen.

  1. Öffnen Sie in der Cloud Console BigQuery.

    BIGQUERY ÖFFNEN

  2. Klicken Sie unter Ressourcen auf den Namen Ihres Projekts.

  3. Klicken Sie im linken Navigationsbereich auf + Daten hinzufügen.

  4. Führen Sie im Dialogfeld Dataset erstellen die folgenden Schritte aus:

    1. Geben Sie im Feld Dataset-ID den Wert musicbrainz ein.
    2. Behalten Sie für Speicherort der Daten den Wert Standard bei.
  5. Klicken Sie auf Dataset erstellen.

MusicBrainz-Tabellen importieren

Führen Sie für jede MusicBrainz-Tabelle die folgenden Schritte aus, um die Tabellen in das von Ihnen erstellte Dataset einzufügen:

  1. Klicken Sie in der Cloud Console auf den Dataset-Namen und dann auf Tabelle erstellen.
  2. Führen Sie im Dialogfeld Tabelle erstellen die folgenden Schritte aus und klicken Sie dann auf Tabelle erstellen:

    1. Wählen Sie unter Quelle in der Drop-down-Liste Tabelle erstellen aus die Option Google Cloud Storage aus.
    2. Geben Sie im Feld Select file from GCS bucket die URL für die Datendatei ein: gs://solutions-public-assets/bqetl/artist.json.
    3. Wählen Sie für Dateiformat den Eintrag JSON (Newline Delimited) aus.
    4. Geben Sie unter Tabellenname den Tabellennamen ein: artist.
    5. Lassen Sie unter Tabellentyp den Wert Native Tabelle ausgewählt.
    6. Klicken Sie unter dem Abschnitt Schema auf Als Text bearbeiten.
    7. Laden Sie die Schemadatei artist herunter.
    8. Ersetzen Sie den Inhalt im Abschnitt Schema durch den Inhalt der Schemadatei, die Sie heruntergeladen haben.

    Dialogfeld "Tabelle erstellen" mit aktualisiertem Schema aus heruntergeladener JSON-Datei

  3. Warten Sie einen Moment, bis der Ladejob beendet ist. Klicken Sie zum Überwachen des Jobs auf Job History (Jobverlauf).

    Nach Abschluss des Ladejobs wird die neue Tabelle im Dataset angezeigt.

  4. Wiederholen Sie die Schritte 1 bis 3 mit den folgenden Änderungen für die Tabelle artist_credit_name:

  5. Wiederholen Sie die Schritte 1 bis 3 mit den folgenden Änderungen für die Tabelle recording:

Daten manuell denormalisieren

Führen Sie die Daten zum Denormalisieren in einer neuen BigQuery-Tabelle zusammen, die eine Zeile für jede Aufnahme des Interpreten hat, gemeinsam mit ausgewählten Metadaten, die für die Analyse aufbewahrt werden sollen.

  1. Kopieren Sie in der Cloud Console die folgende Abfrage und fügen Sie sie in den Abfrageeditor ein:

    SELECT artist.id, artist.gid as artist_gid,
           artist.name as artist_name, artist.area,
           recording.name as recording_name, recording.length,
           recording.gid as recording_gid, recording.video
      FROM `[PROJECT_ID].[DATASET].artist` as artist
          INNER JOIN `[PROJECT_ID].[DATASET].artist_credit_name` AS artist_credit_name
               ON artist.id = artist_credit_name.artist
          INNER JOIN `[PROJECT_ID].[DATASET].recording` AS recording
               ON artist_credit_name.artist_credit = recording.artist_credit
    

    Ersetzen Sie [DATASET] durch den Namen des zuvor erstellten Datasets, z. B. musicbrainz, und [PROJECT_ID] durch Ihre Google Cloud-Projekt-ID.

  2. Klicken Sie auf die Drop-down-Liste Mehr und wählen Sie dann Abfrageeinstellungen aus.

  3. Führen Sie auf der Karte Query settings (Abfrageeinstellungen) die folgenden Schritte aus:

    1. Klicken Sie das Kästchen Legen Sie eine Zieltabelle für Abfrageergebnisse fest an.
    2. Geben Sie unter Tabellenname recordings_by_artists_manual. ein.
    3. Klicken Sie für Schreibeinstellung für Zieltabelle auf Tabelle überschreiben.
    4. Klicken Sie auf das Kästchen Allow large results (no size limit) (Große Ergebnisse zulassen (kein Größenlimit)).
    5. Behalten Sie für Job Priority (Priorität des Jobs) die Standardeinstellung Interactive (Interaktiv) bei.
    6. Behalten Sie für SQL dialect (SQL-Dialekt) die Standardeinstellung Standard bei.
    7. Klicken Sie auf Save (Speichern).
  4. Klicken Sie auf Run (Ausführen).

    Wenn die Abfrage abgeschlossen ist, werden die Daten aus dem Abfrageergebnis in der neu erstellten BigQuery-Tabelle bei jedem Interpreten in Lieder organisiert.

    Abfrageeinstellungen für Zieltabelle

Ansatz 2: ETL in BigQuery mit Dataflow

In diesem Abschnitt der Anleitung verwenden Sie anstelle der BigQuery-UI ein Beispielprogramm, um Daten mithilfe einer Dataflow-Pipeline in BigQuery zu laden. Anschließend verwenden Sie das Programmiermodell von Dataflow, um die Daten, die in BigQuery geladen werden sollen, zu denormalisieren und zu bereinigen.

Sehen Sie sich zuerst die Konzepte und den Beispielcode an.

Konzepte

Obwohl die Datenmenge klein ist und mithilfe der BigQuery-UI schnell hochgeladen werden kann, verwenden Sie jetzt zu Lernzwecken Dataflow, um sie zu extrahieren, zu transformieren und zu laden (ETL). Normalerweise bietet es sich eher bei sehr umfassenden Joins an, anstelle der BigQuery-UI Dataflow zum Extrahieren, Transformieren und Laden von Daten in BigQuery zu verwenden, also bei etwa 500 bis 5.000 Spalten mit mehr als 10 TB Daten. Dabei wird Folgendes angestrebt:

  • Sie möchten die Daten bereinigen oder transformieren, während sie in BigQuery geladen werden, statt sie zu speichern und anschließend zusammenzuführen. Bei diesem Ansatz sind daher die Speicheranforderungen geringer, weil die Daten in BigQuery nur in ihrem zusammengeführten, transformierten Zustand gespeichert werden.
  • Sie möchten eine benutzerdefinierte Datenbereinigung vornehmen, was mit SQL allein nicht möglich ist.
  • Sie planen, die Daten während des Ladevorgangs mit Daten außerhalb der OLTP zu kombinieren, wie etwa Logs oder per Fernzugriff gesteuerten Daten.
  • Sie möchten das Testen und Bereitstellen der Logik zum Laden von Daten mit einer kontinuierlichen Integration oder kontinuierlichen Bereitstellung (Continuous Integration/Continuous Deployment, CI/CD) automatisieren.
  • Sie erwarten eine allmähliche Iteration, Optimierung und Verbesserung des ETL-Prozesses.
  • Sie planen, Daten stufenweise hinzuzufügen, statt einen einmaligen ETL-Schritt durchzuführen.

Es folgt ein Diagramm der Datenpipeline, die vom Beispielprogramm erstellt wird:

Datenpipeline mit BigQuery

Im Beispielcode sind viele der Pipelineschritte gruppiert oder in praktische Methoden eingebettet. Sie erhalten aussagekräftige Namen und werden wiederverwendet. Im Diagramm werden wiederverwendete Schritte durch gestrichelte Linien dargestellt.

Pipelinecode

Der Code erstellt eine Pipeline, die die folgenden Schritte ausführt:

  1. Lädt jede Tabelle, die Teil des Joins sein soll, in eine PCollection von Strings. Jedes Element umfasst die JSON-Darstellung einer Tabellenzeile.

    public static PCollection<String> loadText(Pipeline p, String name) {
      BQETLOptions options = (BQETLOptions) p.getOptions();
      String loadingBucket = options.getLoadingBucketURL();
      String objectToLoad = storedObjectName(loadingBucket, name);
      return p.apply(name, TextIO.read().from(objectToLoad));
    }
  2. Konvertiert diese JSON-Strings in Objektdarstellungen (MusicBrainzDataObject-Objekte) und organisiert diese Objektdarstellungen nach einem der Spaltenwerte, z. B. einem Primär- oder Fremdschlüssel.

    public static PCollection<KV<Long, MusicBrainzDataObject>> loadTableFromText(PCollection<String> text, String name, String keyName) {
      final String namespacedKeyname = name + "_" + keyName;
      return text.apply("load " + name,
                        MapElements
                          .into(new TypeDescriptor<KV<Long, MusicBrainzDataObject>>() {})
                          .via( (String input) -> {
                                MusicBrainzDataObject datum = JSONReader.readObject(name, input);
                                Long key = (Long) datum.getColumnValue(namespacedKeyname);
                                return KV.of(key, datum);
                                })
             );
    }
  3. Führt die Listen anhand eines gemeinsamen Interpreten zusammen. artist_credit_name verknüpft eine Interpretenzuordnung mit der Aufnahme und umfasst den Fremdschlüssel des Interpreten. Die Tabelle artist_credit_name wird als Liste der Schlüsselwertobjekte KV geladen. Das K-Mitglied ist der Interpret.

    PCollection<MusicBrainzDataObject> artistCredits =
        MusicBrainzTransforms.innerJoin("artists with artist credits", artists, artistCreditName);
  4. Führt die Listen über die Methode MusicBrainzTransforms.innerJoin() zusammen.

    public static PCollection<MusicBrainzDataObject>
                         innerJoin(String name,
                                   PCollection<KV<Long, MusicBrainzDataObject>> table1,
                                   PCollection<KV<Long, MusicBrainzDataObject>> table2) {
      final TupleTag<MusicBrainzDataObject> t1 = new TupleTag<MusicBrainzDataObject>(){};
      final TupleTag<MusicBrainzDataObject> t2 = new TupleTag<MusicBrainzDataObject>(){};
      PCollection<KV<Long, CoGbkResult>> joinedResult = group(name, table1, table2, t1, t2);
    1. Gruppiert die Sammlungen von KV-Objekten nach dem Schlüsselmitglied, für das die Zusammenführung durchgeführt werden soll. Dies führt zu einer PCollection von KV-Objekten mit einem langen Schlüssel (dem Spaltenwert artist.id) und dem daraus resultierenden CoGbkResult (steht für "Combine Group by Key Result"). Das CoGbkResult-Objekt ist ein Tupel von Objektlisten mit dem gemeinsamen Schlüsselwert aus den ersten und zweiten PCollections. Dieses Tupel ist mit dem für jede PCollection formulierten Tupel-Tag vor dem Ausführen des CoGroupByKey-Vorgangs in der group-Methode erreichbar.
    2. Führt jeden Objektabgleich in einem Objekt MusicBrainzDataObject zusammen, das ein Verknüpfungsergebnis darstellt.

          PCollection<List<MusicBrainzDataObject>> mergedResult =
              joinedResult.apply("merge join results",
                           MapElements
                         .into(new TypeDescriptor<List<MusicBrainzDataObject>>() {})
                         .via( ( KV<Long, CoGbkResult> group ) -> {
                             List<MusicBrainzDataObject> result = new ArrayList<MusicBrainzDataObject>();
                             Iterable<MusicBrainzDataObject> leftObjects = group.getValue().getAll(t1);
                             Iterable<MusicBrainzDataObject> rightObjects = group.getValue().getAll(t2);
                             leftObjects.forEach((MusicBrainzDataObject l) -> {
                               rightObjects.forEach((MusicBrainzDataObject r) -> {
                                 result.add(l.duplicate().merge(r));
                               });
                             });
                             return result;
                           }
                         )
      );
    3. Erkennt die Sammlung in einer Liste von KV-Objekten, um mit dem nächsten Join zu beginnen. Hier ist der K-Wert die Spalte artist_credit, die für den Join mit der Tabelle verwendet wird, die die Aufnahmen enthält.

      PCollection<KV<Long,MusicBrainzDataObject>> artistCreditNamesByArtistCredit =  MusicBrainzTransforms.by("artist_credit_name_artist_credit", artistCredits);
    4. Erfasst die abschließend resultierende Sammlung von MusicBrainzDataObject-Objekten, indem dieses Ergebnis mit der geladenen Sammlung von Aufnahmen zusammengeführt wird, die nach artist_credit.id organisiert sind.

      PCollection<MusicBrainzDataObject> artistRecordings = MusicBrainzTransforms.innerJoin("joined recordings",
         artistCreditNamesByArtistCredit, recordingsByArtistCredit);
    5. Bildet die resultierenden MusicBrainzDataObjects-Objekte in TableRows ab.

      PCollection<TableRow> tableRows = MusicBrainzTransforms.transformToTableRows(artistRecordings, bqTableSchema);
    6. Schreibt die resultierenden TableRows in BigQuery.

      tableRows.apply(
           "Write to BigQuery",
           BigQueryIO.writeTableRows()
          .to(BQETLOptions.getBigQueryTablename())
          .withSchema(bqTableSchema)
          .withCustomGcsTempLocation(StaticValueProvider.of(BQETLOptions.getTempLocation() ))
          .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Details zur Funktionsweise der Dataflow-Pipelineprogrammierung finden Sie in den folgenden Themen zum Programmiermodell:

Nachdem Sie sich die Schritte angesehen haben, die der Code durchführt, können Sie die Pipeline ausführen.

Pipelinecode ausführen

  1. Öffnen Sie Cloud Shell in der Cloud Console.

    Zu Cloud Shell

  2. Legen Sie die Umgebungsvariablen für Ihr Projekt fest:

    export PROJECT_ID=[PROJECT_ID]
    export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
    

    Ersetzen Sie [PROJECT_ID] durch die Projekt-ID Ihres Google Cloud-Projekts und [CHOOSE_AN_APPROPRIATE_ZONE] durch eine Google Cloud-Zone.

  3. Legen Sie die Umgebungsvariablen fest, die vom Pipelineskript verwendet werden:

    export DESTINATION_TABLE=recordings_by_artists_dataflow
    export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
    export DATASET=musicbrainz
    export SERVICE_ACCOUNT=project-owner
    
  4. Achten Sie darauf, dass gcloud das Projekt verwendet, das Sie zu Beginn der Anleitung erstellt oder ausgewählt haben:

    gcloud config set project $PROJECT_ID
    
  5. Erstellen Sie ein Dienstkonto, um die Pipeline auszuführen:

    gcloud iam service-accounts create ${SERVICE_ACCOUNT} \
        --display-name "Project Owner Account"
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member serviceAccount:${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com \
        --role roles/owner
    gcloud iam service-accounts keys create \
        ~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json \
        --iam-account ${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com
    

    Mit diesem Befehl laden Sie eine JSON-Datei herunter, die den Dienstkontoschlüssel enthält. Speichern Sie diese Datei an einem sicheren Ort.

  6. Legen Sie die Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS auf den Pfad der JSON-Datei fest, die den Dienstkontoschlüssel enthält:

    export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
    
  7. Klonen Sie das Repository, das den Dataflow-Code enthält:

    git clone https://github.com/GoogleCloudPlatform/bigquery-etl-dataflow-sample.git
    
  8. Ändern Sie das Verzeichnis in das Beispiel:

    cd bigquery-etl-dataflow-sample
    
  9. Erstellen Sie in Cloud Storage einen Staging-Bucket, da für Dataflow-Jobs ein Cloud Storage-Bucket erforderlich ist, um die Binärdateien bereitzustellen, die zum Ausführen der Pipeline verwendet werden.

    gsutil mb gs://$STAGING_BUCKET
    
  10. Richten Sie den Objektlebenszyklus für [STAGING_BUCKET_NAME] entsprechend jenem in der Datei dataflow-staging-policy.json ein.

    gsutil lifecycle set dataflow-staging-policy.json gs://$STAGING_BUCKET
    
  11. Führen Sie den Dataflow-Job aus:

    ./run.sh simple
    
  12. Wechseln Sie in der Cloud Console zur Seite Dataflow, um sich den Fortschritt der Pipeline anzusehen.

    Zur Seite "Dataflow"

    Der Status der Jobs wird in der Statusspalte angezeigt. Der Status Erfolgreich zeigt an, dass der Job abgeschlossen ist.

  13. Optional: Wenn Sie die Jobgrafik und Details zu den Schritten sehen möchten, klicken Sie auf den Jobnamen, z. B. etl-into-bigquery-bqetlsimple.

  14. Rufen Sie in der Cloud Console die Seite BigQuery auf.

    Zur Seite "BigQuery"

    Achten Sie darauf, dass Ihr Google Cloud-Projekt ausgewählt ist.

  15. Geben Sie im Bereich Abfrageeditor zum Ausführen einer Abfrage in der neuen Tabelle Folgendes ein:

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow
    WHERE artist_area is NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    Abfrageeditor mit Abfrage für neue Tabelle

Daten bereinigen

Als Nächstes nehmen Sie eine kleine Änderung an der Dataflow-Pipeline vor, sodass Sie Suchtabellen laden und diese als Nebeneingaben verarbeiten können. Dies ist im folgenden Diagramm zu sehen:

Für Nebeneingaben aktualisierte Dataflow-Pipeline

Wenn Sie die resultierende BigQuery-Tabelle abfragen, ist es schwierig zu erahnen, woher der Interpret stammt, ohne manuell nachzusehen, wofür die numerische Gebiets-ID aus der area-Tabelle in der MusicBrainz-Datenbank steht. Dies macht die Analyse von Abfrageergebnissen schwieriger als nötig.

Ebenso wird auch das Geschlecht des Interpreten als ID angezeigt. Die gesamte MusicBrainz-Tabelle zum Geschlecht besteht jedoch nur aus drei Zeilen. Sie können dies beheben, indem Sie einen Schritt zur Dataflow-Pipeline hinzufügen, um die IDs mithilfe der area- und gender-Tabellen von MusicBrainz ihren entsprechenden Labels zuzuordnen.

Sowohl die Tabelle artist_area als auch die Tabelle artist_gender enthalten eine erheblich geringere Anzahl von Zeilen als die Tabellen für Interpreten und für Aufnahmedaten. Die Anzahl unterschiedlicher Elemente in den erstgenannten Tabellen ist durch die Anzahl der geografischen Gebiete bzw. der Geschlechter eingeschränkt.

Infolgedessen wird beim Suchschritt das Dataflow-Feature Nebeneingabe verwendet.

Nebeneingaben werden als Tabellenexporte im durch Zeilen getrennten JSON-Format geladen und zum Denormalisieren der Tabellendaten in nur einem Schritt verwendet.

Code zum Hinzufügen von Nebeneingaben zur Pipeline

Gehen Sie vor dem Ausführen der Pipeline den Code durch, um die neuen Schritte besser nachvollziehen zu können.

Sehen Sie sich in der Datei BQETLSimple.java die auskommentierten Zeilen an. Für diese werden die Kommentarzeichen in einem folgenden Schritt entfernt.

//PCollection<KV<Long,MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p,"artist","id",
//        MusicBrainzTransforms.lookup("area", "id", "name", "area", "begin_area"),
//        MusicBrainzTransforms.lookup("gender","id","name","gender"));

PCollection<KV<Long, MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p, "artist", "id");

Dieser Code zeigt die Datenbereinigung bei Nebeneingaben. Die Klasse MusicBrainzTransforms ist für die Nutzung von Nebeneingaben besser geeignet, um Fremdschlüsselwerte den Labels zuzuordnen. Die Bibliothek MusicBrainzTransforms bietet eine Methode zum Erstellen einer internen Suchklasse. Die Suchklasse beschreibt sämtliche Suchtabellen und die Felder, die durch Labels ersetzt werden, sowie Argumente variabler Länge. keyKey ist die Bezeichnung für die Spalte, die den Schlüssel für die Suche enthält, und valueKey ist der Name der Spalte, die das entsprechende Label enthält.

public static LookupDescription lookup(String objectName, String keyKey, String valueKey, String... destinationKeys) {
  return new LookupDescription(objectName, keyKey, valueKey, destinationKeys);
}

Alle Nebeneingaben werden als einzelne Zuordnungsobjekte geladen, die für die Suche nach dem entsprechenden Label für eine ID verwendet werden.

Zuerst werden die JSON-Daten für die Suchtabelle mit einem leeren Namespace in MusicBrainzDataObjects geladen und in eine Zuordnung vom Key-Spaltenwert zum Value-Spaltenwert konvertiert.

public static PCollectionView<Map<Long, String>> loadMapFromText(PCollection<String> text, String name, String keyKey, String valueKey) {
  // column/Key names are namespaced in MusicBrainzDataObject
  String keyKeyName = name + "_" + keyKey;
  String valueKeyName = name + "_" + valueKey;

  PCollection<KV<Long, String>> entries = text.apply(
        "sideInput_" + name,
        MapElements
          .into(new TypeDescriptor<KV<Long, String>>() {})
          .via((String input) -> {
                 MusicBrainzDataObject object = JSONReader.readObject(name, input);
                 Long key = (Long) object.getColumnValue(keyKeyName);

                 String value = (String) object.getColumnValue(valueKeyName);
                 return KV.of(key, value);
               })
        );

  return entries.apply(View.<Long, String>asMap());
}

Jedes dieser Map-Objekte wird entsprechend dem Wert seines destinationKey in eine Map verschoben, wobei es sich um den Schlüssel für die Ersetzung durch den Suchwert handelt.

List<SimpleEntry<ArrayList<String>, PCollectionView<Map<Long, String>>>> mapSideInputs = new ArrayList<SimpleEntry<ArrayList<String>, PCollectionView<Map<Long, String>>>>();

for (LookupDescription mapper : mappers) {
  PCollectionView<Map<Long, String>> mapView = loadMap(text.getPipeline(), mapper.objectName, mapper.keyKey, mapper.valueKey);
  List<String> destKeyList =
      mapper.destinationKeys.stream()
                            .map( destinationKey -> name + "_" + destinationKey )
                            .collect(Collectors.toList());

    mapSideInputs.add(new SimpleEntry(destKeyList, mapView));

}

Dann wird der Wert für destinationKey (der mit einer Zahl beginnt) während der Transformation der Interpretenobjekte aus JSON durch das entsprechende Label ersetzt.

Map<Long, String> sideInputMap = c.sideInput(mapping.getValue());

List<String> keyList = mapping.getKey();

keyList.forEach( ( String key ) -> {
  Long id = (Long) result.getColumnValue(key);
  if (id != null) {
    String label = (String) sideInputMap.get(id);
    if (label == null) {
      label = "" + id;
    }
    result.replace(key, label);

Damit die Datei BQETLSimple.java so geändert wird, dass die Daten der Felder artist_area und artist_gender mithilfe von Suchvorgängen decodiert werden, führen Sie die folgenden Schritte durch:

  1. Ändern Sie den Programmablauf geringfügig:

    1. Entfernen Sie die Kommentarzeichen aus den Zeilen, die die Interpretendaten mithilfe der Suchen laden.
    2. Kommentieren Sie den Aufruf von loadTable aus, durch den Interpretendaten ohne Suchen geladen werden.
    //PCollection<KV<Long,MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p,"artist","id",
    //        MusicBrainzTransforms.lookup("area", "id", "name", "area", "begin_area"),
    //        MusicBrainzTransforms.lookup("gender","id","name","gender"));
    
    PCollection<KV<Long, MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p, "artist", "id");
  2. Ändern Sie TableFieldSchemas für artist_area und artist_gender in den Datentyp string anstelle von int. Kommentieren Sie dazu die entsprechenden int-Felder aus und entfernen Sie die Kommentarzeichen aus den jeweiligen string-Feldern.

    /*Switch these two lines when using mapping table for artist_area */
    //        .stringField("artist_area")
            .intField("artist_area")
    /*Switch these two lines when using mapping table for artist_gender */
    //        .stringField("artist_gender")
            .intField("artist_gender")
    /*Switch these two lines when using mapping table for artist_begin_area */
            .intField("artist_begin_area")
    //      .stringField("artist_begin_area")
  3. Führen Sie die folgenden Schritte aus, um den Pipelinecode noch einmal auszuführen:

    1. Legen Sie die Umgebungsvariablen für Ihr Projekt fest:

      export PROJECT_ID=[PROJECT_ID]
      export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
      
    2. Prüfen Sie, ob die Umgebung eingerichtet ist:

      export DESTINATION_TABLE=recordings_by_artists_dataflow_sideinputs
      export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
      export DATASET=musicbrainz
      export SERVICE_ACCOUNT=project-owner
      
    3. Legen Sie die Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS auf den Pfad der JSON-Datei fest, die Ihren Dienstkontoschlüssel enthält.

      export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
      
    4. Führen Sie die Pipeline aus, um Aufnahmezeilen innerhalb von Interpretenzeilen zu verschachteln:

      ./run.sh simple
      
  4. Wiederholen Sie dann die Abfrage, die artist_area und artist_gender umfasst:

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow_sideinputs
    WHERE artist_area IS NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    In der Ausgabe werden artist_area und artist_gender nun decodiert:

    Nach &quot;artist_area&quot; und &quot;artist_gender&quot; decodierte Ausgabe

BigQuery-Schema optimieren

Im letzten Teil dieser Anleitung führen Sie eine Pipeline aus, die mithilfe verschachtelter Felder ein besser geeignetes Tabellenschema generiert.

Nehmen Sie sich einen Moment Zeit, um sich den Code anzusehen, mit dem diese optimierte Version der Tabelle generiert wird.

Das folgende Diagramm zeigt eine leicht veränderte Dataflow-Pipeline, die die Aufnahmen des Interpreten in jeder Interpretenzeile verschachtelt, anstatt doppelte Interpretenzeilen zu erstellen.

Dataflow-Pipeline, die die Aufnahmen des Interpreten in jeder Interpretenzeile verschachtelt

Die Daten werden aktuell ziemlich vereinfacht dargestellt. Das heißt, dass die Darstellung eine Zeile pro zugeordneter Aufnahme umfasst, die alle Interpretenmetadaten aus dem BigQuery-Schema und alle Aufnahmen- sowie artist_credit_name-Metadaten enthält. Diese vereinfachte Darstellung hat mindestens zwei Nachteile:

  • Sie wiederholt die artist-Metadaten für jede Aufnahme, die einem Interpret zugeordnet ist, wodurch wiederum mehr Speicher benötigt wird.
  • Wenn Sie die Daten als JSON exportieren, wird ein Array exportiert, das diese Daten anstelle des Interpreten mit den verschachtelten Aufnahmedaten wiederholt – was vermutlich das ist, was Sie möchten.

Ohne Leistungseinbußen und ohne die Nutzung zusätzlichen Speichers können Sie, statt eine Aufnahme pro Zeile zu speichern, Aufnahmen als wiederkehrendes Feld im Interpreteneintrag speichern, indem Sie ein paar Änderungen an der Dataflow-Pipeline vornehmen.

Statt die Aufnahmen mit den Interpreteninformationen über artist_credit_name.artist zusammenzuführen, erstellt diese alternative Pipeline eine verschachtelte Liste mit Aufnahmen innerhalb eines Interpretenobjekts.

public static PCollection<MusicBrainzDataObject> nest(PCollection<KV<Long, MusicBrainzDataObject>> parent,
                                                      PCollection<KV<Long, MusicBrainzDataObject>> child,
                                                      String nestingKey) {
  final TupleTag<MusicBrainzDataObject> parentTag = new TupleTag<MusicBrainzDataObject>(){};
  final TupleTag<MusicBrainzDataObject> childTag = new TupleTag<MusicBrainzDataObject>(){};

  PCollection<KV<Long, CoGbkResult>> joinedResult = group("nest " + nestingKey, parent, child, parentTag, childTag);
  return joinedResult.apply("merge join results " + nestingKey,
                            MapElements
                             .into(new TypeDescriptor<MusicBrainzDataObject>() {})
                             .via((KV<Long, CoGbkResult> group) -> {
                                MusicBrainzDataObject parentObject = group.getValue().getOnly(parentTag);
                                Iterable<MusicBrainzDataObject> children = group.getValue().getAll(childTag);
                                List<MusicBrainzDataObject> childList = new ArrayList<MusicBrainzDataObject>();
                                children.forEach(childList::add);
                                parentObject = parentObject.duplicate();
                                parentObject.addColumnValue("recordings", childList);
                                return parentObject;
                              })
                         );
}

TableRow hat Größeneinschränkungen in der BigQuery API. Dadurch beschränkt der Code die Anzahl verschachtelter Aufnahmen für eine bestimmte Aufnahme auf 1.000 Elemente. Wenn ein bestimmter Interpret mehr als 1.000 Aufnahmen hat, dupliziert der Code die Zeile, einschließlich der artist-Metadaten, und fährt mit der Verschachtelung der Aufnahmedaten in der duplizierten Zeile fort.

private static List<TableRow> toTableRows(MusicBrainzDataObject mbdo, Map<String, Object> serializableSchema) {
  TableRow row = new TableRow();
  List<TableRow> result = new ArrayList<TableRow>();
  Map<String, List<MusicBrainzDataObject>> nestedLists = new HashMap<String, List<MusicBrainzDataObject>>();
  Set<String> keySet = serializableSchema.keySet();
  /*
   *  construct a row object without the nested objects
   */
  int maxListSize = 0;
  for (String key : keySet) {
    Object value = serializableSchema.get(key);
    Object fieldValue = mbdo.getColumnValue(key);
    if (fieldValue != null) {
      if (value instanceof Map) {
        List<MusicBrainzDataObject> list = (List<MusicBrainzDataObject>) fieldValue;
        if (list.size() > maxListSize) {
          maxListSize = list.size();
        }
        nestedLists.put(key, list);
      } else {
        row.set(key, fieldValue);
      }

    }
  }
  /*
   * add the nested objects but break up the nested objects across duplicate rows if nesting limit exceeded
   */
  TableRow parent = row.clone();
  Set<String> listFields = nestedLists.keySet();
  for (int i = 0; i < maxListSize; i++) {
    parent = (parent == null ? row.clone() : parent);
    final TableRow parentRow = parent;
    nestedLists.forEach((String key, List<MusicBrainzDataObject> nestedList) -> {
      if (nestedList.size() > 0) {
        if (parentRow.get(key) == null) {
          parentRow.set(key, new ArrayList<TableRow>());
        }
        List<TableRow> childRows = (List<TableRow>) parentRow.get(key);
        childRows.add(toChildRow(nestedList.remove(0), (Map<String, Object>) serializableSchema.get(key)));
      }
    });
    if ((i > 0) && (i % BIGQUERY_NESTING_LIMIT == 0)) {
      result.add(parent);
      parent = null;
    }
  }
  if (parent != null) {
    result.add(parent);
  }
  return result;
}

Das folgende Diagramm zeigt die Quellen, Transformationen und Senken der Pipeline:

Optimierte Pipeline mit Quellen, Transformationen und Senken

In den meisten Fällen werden die Schrittnamen im Code als Teil des apply-Methodenaufrufs angegeben.

Führen Sie die folgenden Schritte aus, um die optimierte Pipeline zu erstellen:

  1. Prüfen Sie in Cloud Shell, ob die Umgebung für das Pipelineskript eingerichtet ist:

    export PROJECT_ID=[PROJECT_ID]
    export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
    export DESTINATION_TABLE=recordings_by_artists_dataflow_nested
    export DATASET=musicbrainz
    export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
    export SERVICE_ACCOUNT=project-owner
    
  2. Legen Sie die Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS auf den Pfad der JSON-Datei fest, die Ihren Dienstkontoschlüssel enthält:

    export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
    
  3. Führen Sie die Pipeline aus, um Aufnahmezeilen innerhalb von Interpretenzeilen zu verschachteln:

    ./run.sh nested
    
  4. Fragen Sie Felder aus der verschachtelten Tabelle in BigQuery ab:

    SELECT artist_name, artist_gender, artist_area, artist_recordings
    FROM musicbrainz.recordings_by_artists_dataflow_nested
    WHERE artist_area IS NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    Abfrageergebnisse der verschachtelten Tabelle

  5. Führen Sie eine Abfrage aus, um Werte aus dem STRUCT zu extrahieren und zum Filtern der Ergebnisse zu verwenden:

    SELECT artist_name,
           artist_gender,
           artist_area,
           ARRAY(SELECT artist_credit_name_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS artist_credit_name_name,
           ARRAY(SELECT recording_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS recording_name
     FROM musicbrainz.recordings_by_artists_dataflow_nested,
          UNNEST(recordings_by_artists_dataflow_nested.artist_recordings) AS artist_recordings_struct
    WHERE artist_recordings_struct.recording_name LIKE "%Justin%"
    LIMIT 1000;
    

    Abfrage zum Filtern der Ergebnisse

Bereinigen

Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.

Projekt löschen

  1. Wechseln Sie in der Cloud Console zur Seite Ressourcen verwalten.

    Zur Seite „Ressourcen verwalten“

  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen.
  3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Shut down (Beenden), um das Projekt zu löschen.

Einzelne Ressourcen löschen

Gehen Sie wie unten beschrieben vor, um einzelne Ressourcen statt des ganzen Projekts zu löschen.

Cloud Storage-Bucket löschen

  1. Wechseln Sie in der Cloud Console zur Seite Cloud Storage-Browser.

    Zum Cloud Storage-Browser

  2. Klicken Sie auf das Kästchen neben dem Bucket, der gelöscht werden soll.
  3. Klicken Sie zum Löschen des Buckets auf Löschen .

BigQuery-Datasets löschen

  1. Öffnen Sie die BigQuery-Web-UI.

    BIGQUERY öffnen

  2. Wählen Sie die BigQuery-Datasets aus, die Sie in der Anleitung erstellt haben.

  3. Klicken Sie auf Löschen.

Nächste Schritte