Avro-Datensätze mithilfe von Dataflow in BigQuery streamen

In dieser Anleitung wird beschrieben, wie Avro-Objekte vom Typ SpecificRecord in BigQuery mithilfe von Dataflow gespeichert werden. Dabei wird das Tabellenschema automatisch generiert und die Eingabeelemente transformiert. Diese Anleitung zeigt auch die Verwendung von mit Avro generierten Klassen für das Erfassen oder Übertragen von Zwischendaten zwischen Workern in Ihrer Dataflow-Pipeline.

Apache Avro ist ein Serialisierungssystem, das auf Schemas zurückgreift, um Daten zu strukturieren. Weil das Schema immer vorhanden ist, wenn Avro-Daten gelesen oder geschrieben werden, ist die Serialisierung sowohl schnell als auch klein. Die Leistungsvorteile machen es zu einer beliebten Wahl beim Weiterleiten von Nachrichten zwischen Systemen, z. B. wenn eine Anwendung Ereignisse über einen Message Broker an ein Analysesystem sendet. Sie können das Avro-Schema verwenden, um Ihr BigQuery-Data-Warehouse-Schema zu verwalten. Für das Umwandeln des Avro-Schemas in die BigQuery-Tabellenstruktur ist benutzerdefinierter Code erforderlich, der in dieser Anleitung erläutert wird.

Diese Anleitung richtet sich an Entwickler und Architekten, die das Avro-Schema zur Verwaltung Ihres BigQuery-Data-Warehouse-Schemas verwenden möchten. In dieser Anleitung wird vorausgesetzt, dass Sie mit Avro und Java vertraut sind.

Das folgende Diagramm zeigt die allgemeine Architektur dieser Anleitung.

Architektur eines Avro-Schemas, das Ihr BigQuery-Data-Warehouse-Schema verwaltet

Diese Anleitung verwendet ein einfaches System zur Bestellabwicklung mit den folgenden Schritten, um dieses Architekturmuster zu demonstrieren:

  • Eine Online-Anwendung generiert Ereignisse, wenn Ihr Kunde einen Kauf tätigt.
  • Ein Order-Objekt enthält eine eindeutige Kennzeichnung, die Liste der gekauften Artikel und einen Zeitstempel.
  • Eine Dataflow-Pipeline liest die OrderDetails SpecificRecord-Avro-Nachrichten aus einem Pub/Sub-Thema.
  • Die Dataflow-Pipeline schreibt die Datensätze als Avro-Dateien in Cloud Storage.
  • Die Klasse OrderDetails generiert automatisch das entsprechende BigQuery-Schema.
  • Die OrderDetails-Objekte werden mithilfe einer generischen Transformationsfunktion in BigQuery geschrieben.

Ziele

  • Nehmen Sie JSON-Strings von einem Pub/Sub-Datenstrom mithilfe von Dataflow auf.
  • Transformieren Sie die JSON-Objekte in Objekte von Avro-generierten Klassen.
  • Generieren Sie aus dem Avro-Schema das BigQuery-Tabellenschema.
  • Schreiben Sie die Avro-Datensätze in eine Datei in Cloud Storage.
  • Schreiben Sie die Avro-Datensätze in BigQuery.

Kosten

In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss dieser Anleitung können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweise

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  3. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für Ihr Projekt aktiviert ist.

  4. BigQuery, Cloud Storage, and Dataflow APIs aktivieren.

    Aktivieren Sie die APIs

  5. Aktivieren Sie Cloud Shell in der Cloud Console.

    Cloud Shell aktivieren

    Unten in der Cloud Console wird eine Cloud Shell-Sitzung gestartet und eine Eingabeaufforderung angezeigt. Cloud Shell ist eine Shell-Umgebung, in der das Cloud SDK einschließlich des gcloud-Befehlszeilentools vorinstalliert ist. Die Werte sind bereits für Ihr aktuelles Projekt festgelegt. Das Initialisieren der Sitzung kann einige Sekunden dauern.

Umgebung einrichten

  1. Klonen Sie in Cloud Shell das Quell-Repository:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/bigquery-ingest-avro-dataflow-sample.git
    
  2. Generieren Sie Avro-Klassen:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    mvn generate-sources
    

    Dieser Befehl verwendet die Datei orderdetails.avsc, um die Klassen OrderDetails und OrderItems zu generieren. Die Klasse OrderDetails hat eine eindeutige Kennzeichnung, einen Zeitstempel und eine Liste von OrderItems. Die Klasse OrderItems hat eine eindeutige Kennzeichnung, einen Namen und einen Preis. Das Avro-Schema wird an die BigQuery-Tabelle weitergegeben, in der eine Zeile mit einer Bestellung ein Array von Datensätzen vom Typ OrderItem enthält. Weitere Informationen finden Sie unter Verschachtelte und wiederkehrende Spalten angeben.

  3. Öffnen Sie die Datei env.sh.

    cd $HOME/bigquery-ingest-avro-dataflow-sample
    nano env.sh
    
  4. Die Datei env.sh enthält voreingestellte Standardwerte, die Sie für diese Anleitung verwenden können. Sie können diese Dateien jedoch für Ihre Umgebung ändern.

    # pubsub topic
    MY_TOPIC="avro-records"
    
    # Cloud Storage Bucket
    MY_BUCKET="$GOOGLE_CLOUD_PROJECT""_avro_beam"
    
    # Avro file Cloud Storage output path
    AVRO_OUT="$MY_BUCKET/""out/"
    
    # Region for Cloud Pub/Sub and Cloud Dataflow
    REGION="us-central1"
    
    # Region for BigQuery
    BQ_REGION="US"
    
    # BigQuery dataset name
    BQ_DATASET="sales"
    
    # BigQuery table name
    `BQ_TABLE=`"`orders`"
    
    # Maximum number of Dataflow workers
    NUM_WORKERS=1
    

    Dabei gilt:

    • avro-records: Der Name für Ihr Pub/Sub-Thema.
    • $GOOGLE_CLOUD_PROJECT"_avro_beam: Der Name Ihres Cloud Storage-Buckets, der durch Ihre Cloud-Projekt-ID generiert wird.
    • $MY_BUCKET/""out/": Der Pfad für den Cloud Storage-Bucket, der Ihre Avro-Ausgabe enthält.
    • us-central1: Die Region, die Sie für Pub/Sub und Dataflow verwenden. Weitere Informationen zu Regionen finden Sie unter Geografie und Regionen.
    • US: Die Region für BigQuery. Weitere Informationen zu Standorten finden Sie unter Dataset-Standorte.
    • sales: Der Name des BigQuery-Datasets.
    • orders: Der Name der BigQuery-Tabelle.
    • 1: Die maximale Anzahl von Dataflow-Workern.
  5. Legen Sie die Umgebungsvariablen fest:

     . ./env.sh
    

Ressourcen erstellen

  1. Erstellen Sie in Cloud Shell ein Pub/Sub-Thema:

    gcloud pubsub topics create $MY_TOPIC
    
  2. Erstellen Sie einen Cloud Storage-Bucket:

    gsutil mb -l $REGION -c regional gs://$MY_BUCKET
    

    Der Cloud Storage-Bucket sichert die von der Anwendung generierten Rohereignisse. Der Bucket kann auch als alternative Quelle für Offlineanalysen und Validierung dienen. Dabei werden Spark- und Hadoop-Jobs verwendet, die auf Dataproc ausgeführt werden.

  3. Erstellen Sie ein BigQuery-Dataset:

    bq --location=$BQ_REGION mk --dataset $GOOGLE_CLOUD_PROJECT:$BQ_DATASET

    Ein BigQuery-Dataset enthält Tabellen und Ansichten in einer einzelnen Region oder einer Geografie, die mehrere Regionen enthält. Weitere Informationen finden Sie unter Datasets erstellen.

Beam Dataflow-Anwendung starten

  1. Stellen Sie in Cloud Shell die Pipeline auf dem Dataflow-Runner bereit und führen Sie sie aus:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    mvn compile exec:java \
        -Dexec.mainClass=com.google.cloud.solutions.beamavro.AvroToBigQuery \
        -Dexec.cleanupDaemonThreads=false \
        -Dexec.args=" \
        --project=$GOOGLE_CLOUD_PROJECT \
        --runner=DataflowRunner \
        --stagingLocation=gs://$MY_BUCKET/stage/ \
        --tempLocation=gs://$MY_BUCKET/temp/ \
        --inputPath=projects/$GOOGLE_CLOUD_PROJECT/topics/$MY_TOPIC \
        --workerMachineType=n1-standard-1 \
        --maxNumWorkers=$NUM_WORKERS \
        --region=$REGION \
        --dataset=$BQ_DATASET \
        --bqTable=$BQ_TABLE \
        --outputPath=$AVRO_OUT"
    

    Die Ausgabe enthält Ihre Anwendungs-ID. Notieren Sie sich Ihre Anwendungs-ID, da diese später in der Anleitung benötigt wird.

  2. Rufen Sie Dataflow in der Cloud Console auf.

    Zu Dataflow

  3. Klicken Sie auf Ihre Anwendungs-ID, um den Pipelinestatus aufzurufen. Der Pipelinestatus wird als Grafik angezeigt.

    Grafik eines Pipelinestatus.

Code ansehen

In der Datei AvroToBigQuery.java haben die Pipeline-Optionen mit den erforderlichen Parametern die Befehlszeilenparameter durchlaufen. Die Option Streamingmodus ist ebenfalls aktiviert. Das BigQuery-Tabellenschema wird aus dem Avro-Klassenschema generiert und später von der BigQuery IO-Klasse verwendet:

TableSchema ts = BigQueryAvroUtils.getTableSchema(OrderDetails.SCHEMA$);

Die Klasse AvroUtils durchläuft die Felder im Avro-Schemaobjekt und generiert rekursiv entsprechende Objekte des Typs TableFieldSchema. Die Objekte werden dann in ein TableSchema-Objekt eingeschlossen und zurückgegeben.

Für das Avro-Eingabeformat werden Objekte aus Pub/Sub gelesen. Wenn das Eingabeformat JSON ist, werden die Ereignisse gelesen und in Avro-Objekte umgewandelt.

private static PCollection<OrderDetails> getInputCollection(
    Pipeline pipeline, String inputPath, FORMAT format) {
  if (format == FORMAT.JSON) {
    // Transform JSON to Avro
    return pipeline
        .apply("Read JSON from PubSub", PubsubIO.readStrings().fromTopic(inputPath))
        .apply("To binary", ParDo.of(new JSONToAvro()));
  } else {
    // Read Avro
    return pipeline.apply(
        "Read Avro from PubSub", PubsubIO.readAvros(OrderDetails.class).fromTopic(inputPath));
  }
}

Die Pipeline zweigt ab. Die Write to Cloud Storage-Transformation ist eine zusammengesetzte Transformation, die die Datensätze für 10 Sekunden im Fenster sammelt und sie dann mithilfe des AvroIO-Writers in eine Avro-Datei in Cloud Storage schreibt:

ods.apply(
    "Write to GCS",
    new AvroWriter()
        .withOutputPath(options.getOutputPath())
        .withRecordType(OrderDetails.class));

Die Write to BigQuery-Transformation schreibt die Datensätze in die BigQuery-Tabelle:

ods.apply(
    "Write to BigQuery",
    BigQueryIO.write()
        .to(bqStr)
        .withSchema(ts)
        .withWriteDisposition(WRITE_APPEND)
        .withCreateDisposition(CREATE_IF_NEEDED)
        .withFormatFunction(TABLE_ROW_PARSER));

Die BigQueryIO-Transformation schreibt die Avro-Objekte in BigQuery. Dafür wandelt sie mithilfe der Methode TABLE_ROW_PARSER in TableRow-Objekte um. Der Parser ruft die Methode convertSpecificRecordToTableRow in der Klasse BigQueryAvroUtils auf, die auf einer Testklasse im Apache Beam-Projekt basiert. method parst die Avro-Felder rekursiv und fügt sie einem TableRow-Objekt hinzu.

private static TableRow convertSpecificRecordToTableRow(
    SpecificRecord record, List<TableFieldSchema> fields) {
  TableRow row = new TableRow();
  for (TableFieldSchema subSchema : fields) {
    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field
    // is required, so it may not be null.
    Field field = record.getSchema().getField(subSchema.getName());
    if (field == null || field.name() == null) {
      continue;
    }
    Object convertedValue = getTypedCellValue(field.schema(), subSchema, record.get(field.pos()));
    if (convertedValue != null) {
      // To match the JSON files exported by BigQuery, do not include null values in the output.
      row.set(field.name(), convertedValue);
    }
  }

  return row;
}

Die folgende Tabelle zeigt die Zuordnung zwischen BigQuery-Datentypen und Avro-Datentypen. Achten Sie auf Typen wie Date und Timestamp, die durch den logischen Typ des Felds identifiziert werden.

BigQuery Avro
STRING STRING
GEOGRAPHY STRING
BYTES BYTES
INTEGER INT
FLOAT FLOAT
FLOAT64 DOUBLE
NUMERIC BYTES
BOOLEAN BOOLEAN
INT64 LONG
TIMESTAMP LONG
DATE INT
DATETIME STRING
TIME LONG
STRUCT RECORD
REPEATED FIELD ARRAY

Ergebnisse in BigQuery anzeigen

Starten Sie zum Testen der Pipeline das Skript gen.py. Dieses Skript simuliert die Generierung von Bestellereignissen und überträgt sie zum Pub/Sub-Thema.

  1. Wechseln Sie in Cloud Shell zum Skriptverzeichnis des Generators der Beispielereignisse und führen Sie das Skript aus:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/generator
    python3 -m venv env
    . ./env/bin/activate
    pip install -r requirements.txt
    python3 gen.py -p $GOOGLE_CLOUD_PROJECT -t $MY_TOPIC -n 100 -f avro
    
  2. Wechseln Sie in der Cloud Console zu BigQuery.

    BigQuery aufrufen

  3. Wenn Sie das Tabellenschema ansehen möchten, klicken Sie auf das sales-Dataset und wählen Sie dann die Tabelle orders aus. Wenn Sie die Standardumgebungsvariablen in env.sh geändert haben, können die Namen des Datasets und der Tabelle möglicherweise abweichen.

    Tabellenschema der Tabelle &quot;orders&quot;.

  4. Wenn Sie Beispieldaten aufrufen möchten, führen Sie eine Abfrage im Abfrageeditor aus:

    SELECT * FROM sales.orders LIMIT 5
    

    Abfrageergebnis von Beispieldaten.

    Das BigQuery-Tabellenschema wird automatisch aus den Avro-Datensätzen generiert und die Daten werden automatisch in die BigQuery-Tabellenstruktur umgewandelt.

Bereinigen

Projekt löschen

  1. Wechseln Sie in der Cloud Console zur Seite Ressourcen verwalten.

    Zur Seite „Ressourcen verwalten“

  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen.
  3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Shut down (Beenden), um das Projekt zu löschen.

Einzelne Ressourcen löschen

  1. Folgen Sie dieser Anleitung, um den Dataflow-Job zu beenden.

  2. Löschen Sie den Cloud Storage-Bucket:

    gsutil rm -r gs://$MY_BUCKET
    

Nächste Schritte