Avro-Datensätze mithilfe von Dataflow in BigQuery streamen

In dieser Anleitung wird beschrieben, wie Avro-Objekte vom Typ SpecificRecord in BigQuery mithilfe von Dataflow gespeichert werden. Dabei wird das Tabellenschema automatisch generiert und die Eingabeelemente transformiert. Diese Anleitung zeigt auch die Verwendung von mit Avro generierten Klassen für das Erfassen oder Übertragen von Zwischendaten zwischen Workern in Ihrer Dataflow-Pipeline.

Apache Avro ist ein Serialisierungssystem, das auf Schemas zurückgreift, um Daten zu strukturieren. Weil das Schema immer vorhanden ist, wenn Avro-Daten gelesen oder geschrieben werden, ist die Serialisierung sowohl schnell als auch klein. Die Leistungsvorteile machen es zu einer beliebten Wahl beim Weiterleiten von Nachrichten zwischen Systemen, z. B. wenn eine Anwendung Ereignisse über einen Message Broker an ein Analysesystem sendet. Sie können das Avro-Schema verwenden, um Ihr BigQuery-Data-Warehouse-Schema zu verwalten. Für das Umwandeln des Avro-Schemas in die BigQuery-Tabellenstruktur ist benutzerdefinierter Code erforderlich, der in dieser Anleitung erläutert wird.

Diese Anleitung richtet sich an Entwickler und Architekten, die das Avro-Schema zur Verwaltung Ihres BigQuery-Data-Warehouse-Schemas verwenden möchten. In dieser Anleitung wird vorausgesetzt, dass Sie mit Avro und Java vertraut sind.

Das folgende Diagramm zeigt die allgemeine Architektur dieser Anleitung.

Architektur eines Avro-Schemas, das Ihr BigQuery-Data-Warehouse-Schema verwaltet

Diese Anleitung verwendet ein einfaches System zur Bestellabwicklung mit den folgenden Schritten, um dieses Architekturmuster zu demonstrieren:

  • Eine Online-Anwendung generiert Ereignisse, wenn Ihr Kunde einen Kauf tätigt.
  • Ein Order-Objekt enthält eine eindeutige Kennzeichnung, die Liste der gekauften Artikel und einen Zeitstempel.
  • Eine Dataflow-Pipeline liest die OrderDetails SpecificRecord-Avro-Nachrichten aus einem Pub/Sub-Thema.
  • Die Dataflow-Pipeline schreibt die Datensätze als Avro-Dateien in Cloud Storage.
  • Die Klasse OrderDetails generiert automatisch das entsprechende BigQuery-Schema.
  • Die OrderDetails-Objekte werden mithilfe einer generischen Transformationsfunktion in BigQuery geschrieben.

Ziele

  • Nehmen Sie JSON-Strings von einem Pub/Sub-Datenstrom mithilfe von Dataflow auf.
  • Transformieren Sie die JSON-Objekte in Objekte von Avro-generierten Klassen.
  • Generieren Sie aus dem Avro-Schema das BigQuery-Tabellenschema.
  • Schreiben Sie die Avro-Datensätze in eine Datei in Cloud Storage.
  • Schreiben Sie die Avro-Datensätze in BigQuery.

Kosten

In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss dieser Anleitung können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweise

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  3. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist.

  4. BigQuery, Cloud Storage, and Dataflow APIs aktivieren.

    Aktivieren Sie die APIs

  5. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  6. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist.

  7. BigQuery, Cloud Storage, and Dataflow APIs aktivieren.

    Aktivieren Sie die APIs

  8. Aktivieren Sie Cloud Shell in der Console.

    Cloud Shell aktivieren

    Unten in der Console wird eine Cloud Shell-Sitzung gestartet und eine Eingabeaufforderung angezeigt. Cloud Shell ist eine Shell-Umgebung, in der das Google Cloud CLI bereits installiert ist und Werte für Ihr aktuelles Projekt bereits festgelegt sind. Das Initialisieren der Sitzung kann einige Sekunden dauern.

Umgebung einrichten

  1. Klonen Sie in Cloud Shell das Quell-Repository:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/bigquery-ingest-avro-dataflow-sample.git
    
  2. Öffnen Sie die Datei env.sh.

    cd $HOME/bigquery-ingest-avro-dataflow-sample
    nano env.sh
    
  3. Die Datei env.sh enthält voreingestellte Standardwerte, die Sie für diese Anleitung verwenden können. Sie können diese Dateien jedoch für Ihre Umgebung ändern.

    # pubsub topic
    MY_TOPIC="avro-records"
    
    # Cloud Storage Bucket
    MY_BUCKET="${GOOGLE_CLOUD_PROJECT}_avro_beam"
    
    # Avro file Cloud Storage output path
    AVRO_OUT="${MY_BUCKET}/out/"
    
    # Region for Cloud Pub/Sub and Cloud Dataflow
    REGION="us-central1"
    
    # Region for BigQuery
    BQ_REGION="US"
    
    # BigQuery dataset name
    BQ_DATASET="sales"
    
    # BigQuery table name
    BQ_TABLE="orders"
    

    Dabei gilt:

    • avro-records: Der Name für Ihr Pub/Sub-Thema.
    • $GOOGLE_CLOUD_PROJECT"_avro_beam: Der Name Ihres Cloud Storage-Buckets, der durch Ihre Cloud-Projekt-ID generiert wird.
    • $MY_BUCKET/""out/": Der Pfad für den Cloud Storage-Bucket, der Ihre Avro-Ausgabe enthält.
    • us-central1: Die Region, die Sie für Pub/Sub und Dataflow verwenden. Weitere Informationen zu Regionen finden Sie unter Geografie und Regionen.
    • US: Die Region für BigQuery. Weitere Informationen zu Standorten finden Sie unter Dataset-Standorte.
    • sales: Der Name des BigQuery-Datasets.
    • orders: Der Name der BigQuery-Tabelle.
    • 1: Die maximale Anzahl von Dataflow-Workern.
  4. Legen Sie die Umgebungsvariablen fest:

     . ./env.sh
    

Ressourcen erstellen

  1. Erstellen Sie in Cloud Shell ein Pub/Sub-Thema:

    gcloud pubsub topics create "${MY_TOPIC}"
    
  2. Erstellen Sie einen Cloud Storage-Bucket:

    gsutil mb -l "${REGION}" -c regional "gs://${MY_BUCKET}"
    

    Der Cloud Storage-Bucket sichert die von der Anwendung generierten Rohereignisse. Der Bucket kann auch als alternative Quelle für Offlineanalysen und Validierung dienen. Dabei werden Spark- und Hadoop-Jobs verwendet, die auf Dataproc ausgeführt werden.

  3. Erstellen Sie ein BigQuery-Dataset:

    bq --location="${BQ_REGION}" mk --dataset "${GOOGLE_CLOUD_PROJECT}:${BQ_DATASET}"
    

    Ein BigQuery-Dataset enthält Tabellen und Ansichten in einer einzelnen Region oder einer Geografie, die mehrere Regionen enthält. Weitere Informationen finden Sie unter Datasets erstellen.

Beam Dataflow-Anwendung starten

  1. Stellen Sie in Cloud Shell die Pipeline auf dem Dataflow-Runner bereit und führen Sie sie aus:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    
    mvn clean package
    
    java -cp target/BeamAvro-bundled-1.0-SNAPSHOT.jar \
    com.google.cloud.solutions.beamavro.AvroToBigQuery \
    --runner=DataflowRunner \
    --project="${GOOGLE_CLOUD_PROJECT}" \
    --stagingLocation="gs://${MY_BUCKET}/stage/" \
    --tempLocation="gs://${MY_BUCKET}/temp/" \
    --inputPath="projects/${GOOGLE_CLOUD_PROJECT}/topics/${MY_TOPIC}" \
    --workerMachineType=n1-standard-1 \
    --region="${REGION}" \
    --dataset="${BQ_DATASET}" \
    --bigQueryTable="${BQ_TABLE}" \
    --outputPath="gs://${MY_BUCKET}/out/" \
    --jsonFormat=false \
    --avroSchema="$(<../orderdetails.avsc)"
    

    Die Ausgabe enthält Ihre Anwendungs-ID. Notieren Sie sich Ihre Anwendungs-ID, da diese später in der Anleitung benötigt wird.

  2. Wechseln Sie in der Console zu Dataflow.

    Zu Dataflow

  3. Klicken Sie auf Ihre Anwendungs-ID, um den Pipelinestatus aufzurufen. Der Pipelinestatus wird als Grafik angezeigt.

    Grafik eines Pipelinestatus.

Code ansehen

In der Datei AvroToBigQuery.java haben die Pipeline-Optionen mit den erforderlichen Parametern die Befehlszeilenparameter durchlaufen. Die Option Streamingmodus ist ebenfalls aktiviert. Das BigQuery-Tabellenschema wird automatisch aus dem Avro-Schema mit Beam-Schema von BigQuery IO generiert.

Für das Avro-Eingabeformat werden Objekte aus Pub/Sub gelesen. Wenn das Eingabeformat JSON ist, werden die Ereignisse gelesen und in Avro-Objekte umgewandelt.

Schema avroSchema = new Schema.Parser().parse(options.getAvroSchema());

if (options.getJsonFormat()) {
  return input
      .apply("Read Json", PubsubIO.readStrings().fromTopic(options.getInputPath()))
      .apply("Make GenericRecord", MapElements.via(JsonToAvroFn.of(avroSchema)));
} else {
  return input.apply("Read GenericRecord", PubsubIO.readAvroGenericRecords(avroSchema)
      .fromTopic(options.getInputPath()));
}

Die Pipeline erzeugt Verzweigungen, um zwei separate Schreibvorgänge durchzuführen:

Die BigQueryIO schreibt die Avro-Objekte in BigQuery, indem sie sie intern in TableRow-Objekte mit Beam-Schemas umwandeln. Weitere Informationen finden Sie unter Zuordnung zwischen BigQuery-Datentypen und Avro-Datentypen.

Ergebnisse in BigQuery anzeigen

Starten Sie zum Testen der Pipeline das Skript gen.py. Dieses Skript simuliert die Generierung von Bestellereignissen und überträgt sie zum Pub/Sub-Thema.

  1. Wechseln Sie in Cloud Shell zum Skriptverzeichnis des Generators der Beispielereignisse und führen Sie das Skript aus:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/generator
    python3 -m venv env
    . ./env/bin/activate
    pip install -r requirements.txt
    python3 gen.py -p $GOOGLE_CLOUD_PROJECT -t $MY_TOPIC -n 100 -f avro
    
  2. Wechseln Sie in der Console zu BigQuery.

    BigQuery aufrufen

  3. Wenn Sie das Tabellenschema ansehen möchten, klicken Sie auf das sales-Dataset und wählen Sie dann die Tabelle orders aus. Wenn Sie die Standardumgebungsvariablen in env.sh geändert haben, können die Namen des Datasets und der Tabelle möglicherweise abweichen.

    Tabellenschema der Tabelle &quot;orders&quot;.

  4. Wenn Sie Beispieldaten aufrufen möchten, führen Sie eine Abfrage im Abfrageeditor aus:

    SELECT * FROM sales.orders LIMIT 5
    

    Abfrageergebnis von Beispieldaten.

    Das BigQuery-Tabellenschema wird automatisch aus den Avro-Datensätzen generiert und die Daten werden automatisch in die BigQuery-Tabellenstruktur umgewandelt.

Bereinigen

Projekt löschen

  1. Wechseln Sie in der Console zur Seite Ressourcen verwalten.

    Zur Seite „Ressourcen verwalten“

  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen.
  3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Shut down (Beenden), um das Projekt zu löschen.

Einzelne Ressourcen löschen

  1. Folgen Sie dieser Anleitung, um den Dataflow-Job zu beenden.

  2. Löschen Sie den Cloud Storage-Bucket:

    gsutil rm -r gs://$MY_BUCKET
    

Nächste Schritte