Flusso dei record Avro in BigQuery con Dataflow

Questo tutorial descrive l'archiviazione di oggetti Avro SpecificRecord in BigQuery tramite flow di dati generando automaticamente lo schema della tabella e trasformando gli elementi di input. Questo tutorial mostra anche l'utilizzo delle classi generate da Avro per materializzare o trasmettere dati intermedi tra worker nella tua pipeline di Dataflow.

Apache Avro è un sistema di serializzazione che si basa su schemi per strutturare i dati. Poiché lo schema è sempre presente quando i dati Avro vengono letti o scritti, la serializzazione è veloce e ridotta. I vantaggi in termini di prestazioni rendono questa opzione una soluzione ideale per il passaggio di messaggi tra sistemi, ad esempio un'app che invia eventi a un sistema di analisi tramite un broker di messaggi. Puoi utilizzare lo schema Avro per gestire lo schema del data warehouse BigQuery. La conversione dello schema Avro nella struttura della tabella BigQuery richiede codice personalizzato, che viene illustrato in questo tutorial.

Questo tutorial è rivolto a sviluppatori e architetti interessati a utilizzare lo schema Avro per gestire lo schema di data warehouse BigQuery. Questo tutorial presuppone che tu conosca Avro e Java.

Il seguente diagramma illustra l'architettura di alto livello di questo tutorial.

Architettura di uno schema Avro che gestisce lo schema di data warehouse BigQuery.

Questo tutorial utilizza un semplice sistema di elaborazione degli ordini con i seguenti passaggi per dimostrare questo pattern architetturale:

  • Un'app online genera eventi quando il cliente effettua un acquisto.
  • Un oggetto ordine contiene un identificatore univoco, l'elenco degli articoli acquistati e un timestamp.
  • Una pipeline Dataflow legge OrderDetails SpecificRecordi messaggi Avro da un argomento Pub/Sub.
  • La pipeline Dataflow scrive i record in Cloud Storage come file Avro.
  • La classe OrderDetails genera automaticamente lo schema BigQuery corrispondente.
  • Gli oggetti OrderDetails vengono scritti in BigQuery utilizzando una funzione di trasformazione generica.

Obiettivi

  • Importa stringhe JSON da uno stream di dati Pub/Sub utilizzando Dataflow.
  • Trasforma gli oggetti JSON in oggetti delle classi generate da Avro.
  • Genera lo schema della tabella BigQuery dallo schema Avro.
  • Scrivi i record Avro in un file in Cloud Storage.
  • Scrivere i record Avro in BigQuery.

Costi

Questo tutorial utilizza i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud possono beneficiare di una prova gratuita.

Al termine di questo tutorial, puoi evitare una fatturazione continua eliminando le risorse che hai creato. Per scoprire di più, vedi Pulizia.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Nella pagina del selettore dei progetti in Google Cloud Console, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata su un progetto.

  4. Abilita le API BigQuery, Cloud Storage, and Dataflow.

    Abilita le API

  5. Nella pagina del selettore dei progetti in Google Cloud Console, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  6. Assicurati che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata su un progetto.

  7. Abilita le API BigQuery, Cloud Storage, and Dataflow.

    Abilita le API

  8. Nella console, attiva Cloud Shell.

    Attiva Cloud Shell

    Nella parte inferiore della console, viene avviata una sessione di Cloud Shell e viene visualizzato un prompt della riga di comando. Cloud Shell è un ambiente shell con Google Cloud CLI già installato e con valori già impostati per il progetto corrente. L'inizializzazione della sessione può richiedere alcuni secondi.

Configurazione dell'ambiente

  1. In Cloud Shell, clona il repository del codice sorgente:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/bigquery-ingest-avro-dataflow-sample.git
    
  2. Apri il file env.sh:

    cd $HOME/bigquery-ingest-avro-dataflow-sample
    nano env.sh
    
  3. Il file env.sh contiene valori predefiniti preimpostati che puoi utilizzare per questo tutorial, ma puoi modificare questi file in base al tuo ambiente.

    # pubsub topic
    MY_TOPIC="avro-records"
    
    # Cloud Storage Bucket
    MY_BUCKET="${GOOGLE_CLOUD_PROJECT}_avro_beam"
    
    # Avro file Cloud Storage output path
    AVRO_OUT="${MY_BUCKET}/out/"
    
    # Region for Cloud Pub/Sub and Cloud Dataflow
    REGION="us-central1"
    
    # Region for BigQuery
    BQ_REGION="US"
    
    # BigQuery dataset name
    BQ_DATASET="sales"
    
    # BigQuery table name
    BQ_TABLE="orders"
    

    Sostituisci quanto segue:

    • avro-records: il nome del tuo argomento Pub/Sub.
    • $GOOGLE_CLOUD_PROJECT"_avro_beam: il nome del bucket Cloud Storage generato dall'ID progetto Cloud.
    • $MY_BUCKET/""out/": il percorso del bucket Cloud Storage che contiene l'output Avro.
    • us-central1: l'area geografica che utilizzi per Pub/Sub e Dataflow. Per saperne di più sulle aree geografiche, consulta Area geografica e regioni.
    • US: l'area geografica per BigQuery. Per ulteriori informazioni sulle località, consulta Località dei set di dati.
    • sales: il nome del set di dati BigQuery.
    • orders: il nome della tabella BigQuery.
    • 1: il numero massimo di worker di Dataflow.
  4. Imposta le variabili di ambiente:

     . ./env.sh
    

Creare le risorse

  1. In Cloud Shell, crea un argomento Pub/Sub:

    gcloud pubsub topics create "${MY_TOPIC}"
    
  2. Crea un bucket Cloud Storage:

    gsutil mb -l "${REGION}" -c regional "gs://${MY_BUCKET}"
    

    Il bucket Cloud Storage supporta gli eventi non elaborati generati dall'app. Il bucket può anche servire come origine alternativa per l'analisi e la convalida offline utilizzando i job Spark e Hadoop eseguiti su Dataproc.

  3. Crea un set di dati BigQuery:

    bq --location="${BQ_REGION}" mk --dataset "${GOOGLE_CLOUD_PROJECT}:${BQ_DATASET}"
    

    Un set di dati BigQuery contiene tabelle e visualizzazioni in una singola area geografica o in un'area geografica contenente più aree geografiche. Per ulteriori informazioni, consulta Creazione di set di dati.

Avvio dell'app Beam Dataflow

  1. In Cloud Shell, esegui il deployment ed esegui la pipeline su Runner Dataflow:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    
    mvn clean package
    
    java -cp target/BeamAvro-bundled-1.0-SNAPSHOT.jar \
    com.google.cloud.solutions.beamavro.AvroToBigQuery \
    --runner=DataflowRunner \
    --project="${GOOGLE_CLOUD_PROJECT}" \
    --stagingLocation="gs://${MY_BUCKET}/stage/" \
    --tempLocation="gs://${MY_BUCKET}/temp/" \
    --inputPath="projects/${GOOGLE_CLOUD_PROJECT}/topics/${MY_TOPIC}" \
    --workerMachineType=n1-standard-1 \
    --region="${REGION}" \
    --dataset="${BQ_DATASET}" \
    --bigQueryTable="${BQ_TABLE}" \
    --outputPath="gs://${MY_BUCKET}/out/" \
    --jsonFormat=false \
    --avroSchema="$(<../orderdetails.avsc)"
    

    L'output contiene il tuo ID app. Prendi nota del tuo ID app perché è necessario più avanti nel tutorial.

  2. Nella console, vai a Dataflow.

    Vai a Dataflow

  3. Per visualizzare lo stato della pipeline, fai clic sull'ID app. Lo stato della pipeline viene visualizzato come grafico.

    Grafico dello stato della pipeline.

Rivedi il codice

Nel file AvroToBigQuery.java, le opzioni di pipeline con i parametri obbligatori trasmessi tramite i parametri della riga di comando. Viene attivata anche l'opzione Modalità di streaming. Lo schema della tabella BigQuery viene generato automaticamente dallo schema Avro utilizzando lo schema di Beam di BigQuery IO

Per il formato di input Avro, gli oggetti vengono letti da Pub/Sub. Se il formato di input è JSON, gli eventi vengono letti e trasformati in oggetti Avro.

Schema avroSchema = new Schema.Parser().parse(options.getAvroSchema());

if (options.getJsonFormat()) {
  return input
      .apply("Read Json", PubsubIO.readStrings().fromTopic(options.getInputPath()))
      .apply("Make GenericRecord", MapElements.via(JsonToAvroFn.of(avroSchema)));
} else {
  return input.apply("Read GenericRecord", PubsubIO.readAvroGenericRecords(avroSchema)
      .fromTopic(options.getInputPath()));
}

La pipeline si dirama per eseguire due scritture separate:

BigQueryIO scrive gli oggetti Avro in BigQuery trasformandoli internamente in oggetti TableRow tramite gli schemi Beam. Fai riferimento alla mappatura tra i tipi di dati BigQuery e i tipi di dati Avro.

Visualizza risultati in BigQuery

Per testare la pipeline, avvia lo script gen.py. Questo script simula la generazione di eventi dell'ordine e li trasferisce all'argomento Pub/Sub.

  1. In Cloud Shell, passa alla directory degli script di generazione di eventi di esempio ed esegui lo script:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/generator
    python3 -m venv env
    . ./env/bin/activate
    pip install -r requirements.txt
    python3 gen.py -p $GOOGLE_CLOUD_PROJECT -t $MY_TOPIC -n 100 -f avro
    
  2. Nella console, vai a BigQuery.

    Vai a BigQuery

  3. Per visualizzare lo schema della tabella, fai clic sul set di dati sales e poi seleziona la tabella orders. Se hai modificato le variabili di ambiente predefinite in env.sh, i nomi del set di dati e della tabella potrebbero essere diversi.

    Schema della tabella &quot;ordini&quot;.

  4. Per visualizzare alcuni dati di esempio, esegui una query nell'Editor query:

    SELECT * FROM sales.orders LIMIT 5
    

    Query risultante dei dati di esempio.

    Lo schema della tabella BigQuery viene generato automaticamente dai record Avro e i dati vengono convertiti automaticamente nella struttura della tabella BigQuery.

Esegui la pulizia

Elimina il progetto

  1. Nella console, vai alla pagina Gestisci risorse.

    Vai a Gestisci risorse

  2. Nell'elenco dei progetti, seleziona il progetto da eliminare, quindi fai clic su Elimina.
  3. Nella finestra di dialogo, digita l'ID del progetto e fai clic su Chiudi per eliminare il progetto.

Eliminare le singole risorse

  1. Segui queste istruzioni per interrompere il job Dataflow.

  2. Elimina il bucket Cloud Storage:

    gsutil rm -r gs://$MY_BUCKET
    

Passaggi successivi