Utilizzare il connettore Bigtable Spark

Il connettore Spark di Bigtable consente di leggere e scrivere dati da e verso Bigtable. Puoi leggere i dati dall'applicazione Spark utilizzando Spark SQL e DataFrames. Il connettore Bigtable Spark supporta le seguenti operazioni di Bigtable:

  • Scrittura di dati
  • Lettura di dati
  • crea una nuova tabella

Questo documento mostra come convertire una tabella DataFrames Spark SQL in una tabella Bigtable, quindi compila e crea un file JAR per inviare un job Spark.

Stato del supporto di Spark e Scala

Il connettore Spark di Bigtable supporta solo la versione Scala 2.12 e le seguenti versioni di Spark:

Il connettore Spark di Bigtable supporta le seguenti versioni di Dataproc:

Calcolare i costi

Se decidi di utilizzare uno dei seguenti componenti fatturabili di Google Cloud, ti verranno addebitate le risorse che utilizzi:

  • Bigtable (non ti viene addebitato alcun costo per l'uso dell'emulatore Bigtable)
  • Dataproc
  • Cloud Storage

I prezzi di Dataproc si applicano all'utilizzo di Dataproc sui cluster Compute Engine. Dataproc serverless si applica ai carichi di lavoro e alle sessioni eseguite su Dataproc Serverless per Spark.

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi.

Prima di iniziare

Completa i seguenti prerequisiti prima di utilizzare il connettore Bigtable Spark.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per utilizzare il connettore Bigtable Spark, chiedi all'amministratore di concederti i seguenti ruoli IAM sul tuo progetto:

  • Bigtable Administrator (roles/bigtable.admin)(Facoltativo): ti consente di leggere o scrivere dati e creare una nuova tabella.
  • Utente Bigtable (roles/bigtable.user): consente di leggere o scrivere dati, ma non di creare una nuova tabella.

Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite la ruoli o altri ruoli predefiniti ruoli.

Se utilizzi Dataproc o Cloud Storage, potrebbero essere necessarie autorizzazioni aggiuntive. Per ulteriori informazioni, consulta le autorizzazioni Dataproc e Cloud Storage.

Configura Spark

Oltre a creare un'istanza Bigtable, devi configurare l'istanza Spark. Puoi farlo in locale o selezionare una delle seguenti opzioni per utilizzare Spark con Dataproc:

  • Cluster Dataproc
  • Dataproc Serverless

Per ulteriori informazioni sulla scelta tra un cluster Dataproc o un'opzione serverless, consulta il documento Dataproc Serverless per Spark rispetto a Dataproc su Compute Engine .

Scarica il file JAR del connettore

Puoi trovare il codice sorgente del connettore Bigtable Spark con esempi nel repository GitHub del connettore Spark di Bigtable.

In base alla configurazione di Spark, puoi accedere al file JAR nel seguente modo:

  • Se esegui PySpark localmente, devi scaricare il file JAR del connettore dalla posizione gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar Cloud Storage.

    Sostituisci SCALA_VERSION con la versione Scala, impostata su 2.12 come unica versione supportata, e CONNECTOR_VERSION con la versione del connettore che vuoi utilizzare.

  • Per l'opzione serverless o cluster Dataproc, utilizza il file JAR più recente come artefatto che può essere aggiunto nelle applicazioni Scala o Java Spark. Per saperne di più sull'utilizzo del file JAR come artefatto, consulta Gestire le dipendenze.

  • Se invii il tuo job PySpark a Dataproc, utilizza il flag gcloud dataproc jobs submit pyspark --jars per impostare l'URI sulla posizione del file JAR in Cloud Storage, ad esempio gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar.

Determina il tipo di calcolo

Per i job di sola lettura, puoi utilizzare il serverless computing Data Boost (anteprima), consente di evitare di influire sui cluster di distribuzione delle applicazioni. La tua scintilla deve usare la versione 1.1.0 o successiva del connettore Spark per usare Data Boost.

Per utilizzare Data Boost, devi creare un profilo app Data Boost e poi fornire l'ID profilo app per l'opzione spark.bigtable.app_profile.idSpark quando aggiungi la configurazione Bigtable all'applicazione Spark. Se hai già creato un'app per i tuoi job di lettura Spark e vuoi continuare a utilizzarlo senza modificare il codice dell'applicazione, puoi convertire il profilo dell'app in un Profilo app Data Boost. Per ulteriori informazioni, vedi Convertire un'app profilo.

Per ulteriori informazioni, consulta Bigtable Data Boost Panoramica.

Per i job che prevedono letture e scritture, puoi utilizzare il cluster della tua istanza nodi per il computing specificando nella richiesta un profilo dell'app standard.

Identifica o crea un profilo dell'app da utilizzare

Se non specifichi un ID profilo dell'app, il connettore utilizza l'app predefinita profilo.

Ti consigliamo di utilizzare un profilo app univoco per ogni applicazione inclusa l'applicazione Spark. Per ulteriori informazioni sul profilo dell'app tipi e impostazioni, consulta la sezione Profili app Panoramica. Per le istruzioni, vedi Creare e gestire configurare i profili delle app.

Aggiungi la configurazione Bigtable all'applicazione Spark

Nell'applicazione Spark, aggiungi le opzioni Spark che consentono di interagire con Bigtable.

Opzioni Spark supportate

Utilizza le opzioni Spark disponibili come parte del pacchetto com.google.cloud.spark.bigtable.

Nome opzione Obbligatorio Valore predefinito Significato
spark.bigtable.project.id N/D Imposta l'ID progetto Bigtable.
spark.bigtable.instance.id N/D Imposta l'ID istanza Bigtable.
catalog N/D Imposta il formato JSON che specifica il formato di conversione compreso tra lo schema di tipo SQL del DataFrame e quello della tabella Bigtable.

Per saperne di più, consulta Creare metadati delle tabelle in formato JSON.
spark.bigtable.app_profile.id No default Imposta l'ID profilo dell'app Bigtable.
spark.bigtable.write.timestamp.milliseconds No Ora di sistema attuale Imposta il timestamp in millisecondi da utilizzare durante la scrittura di un DataFrame in Bigtable.

Tieni presente che, poiché tutte le righe nel DataFrame utilizzano lo stesso timestamp, le righe con la stessa colonna di chiave di riga nel DataFrame vengono mantenute come una singola versione in Bigtable poiché condividono lo stesso timestamp.
spark.bigtable.create.new.table No false Imposta true per creare una nuova tabella prima di scrivere in Bigtable.
spark.bigtable.read.timerange.start.milliseconds o spark.bigtable.read.timerange.end.milliseconds No N/D Imposta timestamp (in millisecondi dal momento dell'epoca) per filtrare le celle con una data di inizio e una data di fine specifiche, rispettivamente.
spark.bigtable.push.down.row.key.filters No true Imposta true per consentire l'applicazione di filtri a chiave di riga semplici sul lato server. Il filtro in base alle chiavi di riga composte è implementato lato client.

Per ulteriori informazioni, consulta Leggere una riga specifica del DataFrame utilizzando un filtro.
spark.bigtable.read.rows.attempt.timeout.milliseconds No 30 min Imposta la durata del timeout per un tentativo di lettura delle righe corrispondente a una partizione DataFrame nel client Bigtable per Java.
spark.bigtable.read.rows.total.timeout.milliseconds No 12 ore Imposta la durata del timeout totale per un tentativo di lettura di righe corrispondente a una partizione DataFrame nel client Bigtable per Java.
spark.bigtable.mutate.rows.attempt.timeout.milliseconds No 1 min Imposta la durata del timeout per un tentativo di riga con modifica corrispondente a una partizione DataFrame nel client Bigtable per Java.
spark.bigtable.mutate.rows.total.timeout.milliseconds No 10 min Imposta la durata del timeout totale per un tentativo di modifica di righe corrispondente a una partizione DataFrame nel client Bigtable per Java.
spark.bigtable.batch.mutate.size No 100 Impostato sul numero di mutazioni in ogni batch. Il valore massimo che puoi impostare è 100000.
spark.bigtable.enable.batch_mutate.flow_control No false Imposta su true per abilitare il controllo del flusso per le mutazioni batch.

Creare i metadati delle tabelle in formato JSON

Il formato della tabella DataFrames Spark SQL deve essere convertito in una tabella Bigtable utilizzando una stringa in formato JSON. Questo formato JSON di stringa rende il formato dei dati compatibile con Bigtable. Puoi passare il formato JSON nel codice dell'applicazione utilizzando l'opzione .option("catalog", catalog_json_string).

Considera ad esempio la seguente tabella DataFrame e la tabella Bigtable corrispondente.

In questo esempio, le colonne name e birthYear nel DataFrame vengono raggruppate nella famiglia di colonne info e rinominate rispettivamente in name e birth_year. Analogamente, la colonna address viene archiviata nella famiglia di colonne location con lo stesso nome. La colonna id del DataFrame viene convertita nella chiave di riga di Bigtable.

Le chiavi di riga non hanno un nome di colonna dedicato in Bigtable e in questo esempio, id_rowkey viene utilizzato solo per indicare al connettore che questa è la colonna della chiave di riga. Puoi utilizzare qualsiasi nome per la colonna della chiave di riga e assicurarti di utilizzare lo stesso nome quando dichiari il campo "rowkey":"column_name" in formato JSON.

DataFrame Tabella Bigtable = t1
Colonne Chiave di riga Famiglie di colonne
informazioni località
Colonne Colonne
id name birthYear address id_rowkey name birth_year address

Il formato JSON per il catalogo è il seguente:

    """
    {
      "table": {"name": "t1"},
      "rowkey": "id_rowkey",
      "columns": {
        "id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
        "name": {"cf": "info", "col": "name", "type": "string"},
        "birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
        "address": {"cf": "location", "col": "address", "type": "string"}
      }
    }
    """

Le chiavi e i valori utilizzati nel formato JSON sono i seguenti:

Chiave catalogo Valore del catalogo Formato JSON
tabella Nome della tabella Bigtable. "table":{"name":"t1"}

Se la tabella non esiste, utilizza .option("spark.bigtable.create.new.table", "true") per crearne una.
riga Nome della colonna che verrà utilizzata come chiave di riga di Bigtable. Assicurati che il nome della colonna DataFrame sia utilizzato come chiave di riga, ad esempio id_rowkey.

Le chiavi composte sono accettate anche come chiavi di riga. Ad esempio: "rowkey":"name:address". Questo approccio potrebbe comportare chiavi di riga che richiedono una scansione completa della tabella per tutte le richieste di lettura.
"rowkey":"id_rowkey",
colonne Mappatura di ogni colonna del DataFrame alla famiglia di colonne ("cf") e al nome della colonna ("col") Bigtable corrispondenti. Il nome della colonna può essere diverso da quello della colonna nella tabella del DataFrame. I tipi di dati supportati includono string, long e binary. "columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}"

In questo esempio, id_rowkey è la chiave di riga e info e location sono le famiglie di colonne.

Tipi di dati supportati

Il connettore supporta l'utilizzo dei tipi string, long e binary (array di byte) nel catalogo. Fino a quando non verrà aggiunto il supporto di altri tipi, ad esempio int e float, puoi convertire manualmente questi tipi di dati in array di byte (il comando BinaryType) prima di utilizzare il connettore per scriverli Bigtable.

Inoltre, puoi usare Avro per serializzare di testo, ad esempio ArrayType. Per ulteriori informazioni, consulta Serializzare dati complessi. utilizzando Apache Avro.

Scrivi in Bigtable

Utilizza la funzione .write() e le opzioni supportate per scrivere dati in Bigtable.

Java

Il seguente codice del repository GitHub utilizza Java e Maven per scrivere in Bigtable.

  String catalog = "{" +
        "\"table\":{\"name\":\"" + tableName + "\"," +
        "\"tableCoder\":\"PrimitiveType\"}," +
        "\"rowkey\":\"wordCol\"," +
        "\"columns\":{" +
        "\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
        "\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
        "}}".replaceAll("\\s+", "");



  private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
        String createNewTable) {
      dataframe
          .write()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .option("spark.bigtable.create.new.table", createNewTable)
          .save();
    }

Python

Il seguente codice del repository GitHub utilizza Python per scrivere in Bigtable.

  catalog = ''.join(("""{
        "table":{"name":" """ + bigtable_table_name + """
        ", "tableCoder":"PrimitiveType"},
        "rowkey":"wordCol",
        "columns":{
          "word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
          "count":{"cf":"example_family", "col":"countCol", "type":"long"}
        }
        }""").split())
  

  input_data = spark.createDataFrame(data)
  print('Created the DataFrame:')
  input_data.show()

  input_data.write \
        .format('bigtable') \
        .options(catalog=catalog) \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .option('spark.bigtable.create.new.table', create_new_table) \
        .save()
  print('DataFrame was written to Bigtable.')

  

Lettura da Bigtable

Utilizza la funzione .read() per verificare se la tabella è stata importata correttamente in Bigtable.

Java

  
  private static Dataset<Row> readDataframeFromBigtable(String catalog) {
      Dataset<Row> dataframe = spark
          .read()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .load();
      return dataframe;
    }

Python

  

  records = spark.read \
        .format('bigtable') \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .options(catalog=catalog) \
        .load()

  print('Reading the DataFrame from Bigtable:')
  records.show()

Compila il progetto

Genera il file JAR utilizzato per eseguire un job in un cluster Dataproc, Dataproc serverless o un'istanza Spark locale. Puoi compilare il file JAR localmente e quindi utilizzarlo per inviare un job. Il percorso del JAR compilato viene impostato come variabile di ambiente PATH_TO_COMPILED_JAR quando invii un job.

Questo passaggio non riguarda le applicazioni PySpark.

Gestisci le dipendenze

Il connettore Spark di Bigtable supporta i seguenti strumenti di gestione delle dipendenze:

Compila il file JAR

Maven

  1. Aggiungi la dipendenza spark-bigtable al file pom.xml.

    <dependencies>
    <dependency>
      <groupId>com.google.cloud.spark.bigtable</groupId>
      <artifactId>spark-bigtable_SCALA_VERSION</artifactId>
      <version>0.1.0</version>
    </dependency>
    </dependencies>
    
  2. Aggiungi il plug-in Maven Shade al tuo file pom.xml per creare un JAR uber:

    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
    
  3. Esegui il comando mvn clean install per generare un file JAR.

sbt

  1. Aggiungi la dipendenza spark-bigtable al tuo file build.sbt:

    libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
  2. Aggiungi il plug-in sbt-assembly al file project/plugins.sbt o project/assembly.sbt per creare un file JAR Uber.

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
  3. Esegui il comando sbt clean assembly per generare il file JAR.

Gradle

  1. Aggiungi la dipendenza spark-bigtable al file build.gradle.

    dependencies {
    implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0'
    }
  2. Aggiungi il plug-in Shadow nel file build.gradle per creare un file JAR Uber:

    plugins {
    id 'com.github.johnrengelman.shadow' version '8.1.1'
    id 'java'
    }
  3. Consulta la documentazione del plug-in Shadow per ulteriori informazioni sulla configurazione e sulla compilazione JAR.

di Gemini Advanced.

Invia un job

Invia un job Spark utilizzando Dataproc, Dataproc Serverless o un'istanza Spark locale per avviare l'applicazione.

Imposta ambiente di runtime

Imposta le seguenti variabili di ambiente.

      #Google Cloud
      export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
      export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
      export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
      export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
      export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
      export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE

      #Dataproc Serverless
      export BIGTABLE_SPARK_SUBNET=SUBNET
      export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME

      #Scala/Java
      export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR

      #PySpark
      export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
      export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
      export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR

Sostituisci quanto segue:

  • PROJECT_ID: l'identificatore permanente per il progetto Bigtable.
  • INSTANCE_ID: l'identificatore permanente dell'istanza Bigtable.
  • TABLE_NAME: l'identificatore permanente della tabella.
  • DATAPROC_CLUSTER: l'identificatore permanente per il cluster Dataproc.
  • DATAPROC_REGION: la regione Dataproc che contiene uno dei cluster nella tua istanza Dataproc, ad esempio northamerica-northeast2.
  • DATAPROC_ZONE: la zona in cui viene eseguito il cluster Dataproc.
  • SUBNET: il percorso completo della risorsa della subnet.
  • GCS_BUCKET_NAME: il bucket Cloud Storage in cui caricare le dipendenze dei carichi di lavoro Spark.
  • PATH_TO_COMPILED_JAR: il percorso completo o relativo del file JAR compilato, ad esempio /path/to/project/root/target/<compiled_JAR_name> per Maven.
  • GCS_PATH_TO_CONNECTOR_JAR: il bucket Cloud Storage gs://spark-lib/bigtable in cui si trova il file spark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar.
  • PATH_TO_PYTHON_FILE: per le applicazioni PySpark, il percorso del file Python che verrà utilizzato per scrivere e leggere i dati da Bigtable.
  • LOCAL_PATH_TO_CONNECTOR_JAR: per le applicazioni PySpark, percorso del file JAR del connettore Bigtable Spark scaricato.

Invia un job Spark

Per le istanze Dataproc o la configurazione locale di Spark, esegui un job Spark per caricare i dati in Bigtable.

Cluster Dataproc

Utilizza il file JAR compilato e crea un job del cluster Dataproc che legga e scriva dati da e verso Bigtable.

  1. Creare un cluster Dataproc. L'esempio seguente mostra un comando per creare un cluster Dataproc v2.0 con Debian 10, due nodi worker e configurazioni predefinite.

    gcloud dataproc clusters create \
      $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \
      --zone $BIGTABLE_SPARK_DATAPROC_ZONE \
      --master-machine-type n2-standard-4 --master-boot-disk-size 500 \
      --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \
      --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
    
  2. Invia un job.

    Scala/Java

    L'esempio seguente mostra la classe spark.bigtable.example.WordCount che include la logica per creare una tabella di test in DataFrame, scrivere la tabella su Bigtable e quindi contare il numero di parole nella tabella.

        gcloud dataproc jobs submit spark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --class=spark.bigtable.example.WordCount \
        --jar=$PATH_TO_COMPILED_JAR \
        -- \
        $BIGTABLE_SPARK_PROJECT_ID \
        $BIGTABLE_SPARK_INSTANCE_ID \
        $BIGTABLE_SPARK_TABLE_NAME \
    

    PySpark

        gcloud dataproc jobs submit pyspark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --jars=$GCS_PATH_TO_CONNECTOR_JAR \
        --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
        $PATH_TO_PYTHON_FILE \
        -- \
        --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
        --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
        --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
    

Dataproc Serverless

Utilizza il file JAR compilato e crea un job Dataproc che legge e scrive dati da e in Bigtable con un'istanza serverless Dataproc.

Scala/Java

  gcloud dataproc batches submit spark \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
  --  \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
  --jars=$GCS_PATH_TO_CONNECTOR_JAR \
  --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
  -- \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

Spark locale

Utilizza il file JAR scaricato e crea un job Spark che legga e scriva dati da e verso Bigtable con un'istanza Spark locale. Puoi anche utilizzare l'emulatore Bigtable per inviare il job Spark.

Usa l'emulatore Bigtable

Se decidi di utilizzare l'emulatore Bigtable, segui questi passaggi:

  1. Esegui questo comando per avviare l'emulatore:

    gcloud beta emulators bigtable start
    

    Per impostazione predefinita, l'emulatore sceglie localhost:8086.

  2. Imposta la variabile di ambiente BIGTABLE_EMULATOR_HOST:

    export BIGTABLE_EMULATOR_HOST=localhost:8086
    
  3. Invia il job Spark.

Per ulteriori informazioni sull'utilizzo dell'emulatore Bigtable, consulta Eseguire test con l'emulatore.

Invia un job Spark

Usa il comando spark-submit per inviare un job Spark a prescindere dal fatto che tu stia utilizzando un emulatore Bigtable locale.

Scala/Java

  spark-submit $PATH_TO_COMPILED_JAR \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  spark-submit \
  --jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
  --packages=org.slf4j:slf4j-reload4j:1.7.36 \
  $PATH_TO_PYTHON_FILE \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

Verificare i dati della tabella

Esegui questo comando Interfaccia a riga di comando cbt per verificare che i dati siano scritti in Bigtable. La Interfaccia a riga di comando cbt è un componente di Google Cloud CLI. Per ulteriori informazioni, consulta interfaccia a riga di comando cbt panoramica.

    cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
    read $BIGTABLE_SPARK_TABLE_NAME

Soluzioni aggiuntive

Utilizza il connettore Bigtable Spark per soluzioni specifiche, come la serializzazione di tipi Spark SQL complessi, la lettura di righe specifiche e la generazione di metriche lato client.

Leggere una riga DataFrame specifica utilizzando un filtro

Quando utilizzi DataFrame per leggere da Bigtable, puoi specificare un filtro per leggere solo righe specifiche. Semplici filtri come ==, <= e startsWith nella colonna della chiave di riga vengono applicati sul lato server per evitare una scansione completa della tabella. I filtri su chiavi di riga composte o filtri complessi come il filtro LIKE nella colonna della chiave di riga vengono applicati sul lato client.

Se stai leggendo tabelle di grandi dimensioni, ti consigliamo di utilizzare filtri semplici per le chiavi di riga per evitare di eseguire una scansione completa della tabella. L'istruzione di esempio seguente mostra come leggere utilizzando un semplice filtro. Assicurati di utilizzare nel filtro Spark il nome della colonna DataFrame convertita nella chiave di riga:

    dataframe.filter("id == 'some_id'").show()
  

Quando applichi un filtro, utilizza il nome della colonna DataFrame anziché il nome della colonna della tabella Bigtable.

Serializza tipi di dati complessi utilizzando Apache Avro

Il connettore Spark di Bigtable fornisce supporto per l'utilizzo di Apache Avro per serializzare tipi SQL Spark complessi, come ArrayType, MapType o StructType. Apache Avro fornisce la serializzazione dei dati per registrare i dati, che vengono comunemente utilizzati per l'elaborazione e l'archiviazione di strutture di dati complesse.

Utilizza una sintassi come "avro":"avroSchema" per specificare che una colonna in Bigtable deve essere codificata utilizzando Avro. Puoi quindi utilizzare .option("avroSchema", avroSchemaString) durante la lettura o la scrittura su Bigtable per specificare lo schema Avro corrispondente a quella colonna in formato stringa. Puoi utilizzare nomi di opzioni diversi, ad esempio "anotherAvroSchema" per colonne diverse e trasmettere schemi Avro per più colonne.

def catalogWithAvroColumn = s"""{
                    |"table":{"name":"ExampleAvroTable"},
                    |"rowkey":"key",
                    |"columns":{
                    |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                    |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
                    |}
                    |}""".stripMargin

Utilizza le metriche lato client

Poiché il connettore Spark di Bigtable si basa sul client Bigtable per Java, le metriche lato client sono abilitate all'interno del connettore per impostazione predefinita. Per ulteriori dettagli su come accedere a queste metriche e interpretarle, consulta la documentazione relativa alle metriche lato client.

Utilizzare il client Bigtable per Java con funzioni RDD di basso livello

Poiché il connettore Spark di Bigtable si basa sul client Bigtable per Java, puoi utilizzare direttamente il client nelle applicazioni Spark ed eseguire richieste di lettura o scrittura distribuite all'interno delle funzioni RDD di basso livello, come mapPartitions e foreachPartition.

Per utilizzare il client Bigtable per le classi Java, aggiungi il prefisso com.google.cloud.spark.bigtable.repackaged ai nomi dei pacchetti. Ad esempio, invece di utilizzare il nome della classe com.google.cloud.bigtable.data.v2.BigtableDataClient, usa com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient.

Per saperne di più sul client Bigtable per Java, consulta la pagina Client Bigtable per Java.

Passaggi successivi