Use o conetor do Spark Spanner

Esta página mostra-lhe como criar um cluster do Dataproc que usa o conetor do Spark Spanner para ler dados do Spanner através do Apache Spark

O conetor do Spanner funciona com o Spark para ler dados da base de dados do Spanner através da biblioteca Java do Spanner. O conetor do Spanner suporta a leitura de tabelas e gráficos do Spanner em DataFrames do Spark e GraphFrames.

Custos

Neste documento, usa os seguintes componentes faturáveis do Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Para gerar uma estimativa de custos com base na sua utilização projetada, use a calculadora de preços.

Os novos Google Cloud utilizadores podem ser elegíveis para uma avaliação gratuita.

Antes de começar

Antes de usar o conetor do Spanner neste tutorial, configure um cluster do Dataproc e uma instância e base de dados do Spanner.

Configure um cluster do Dataproc

Crie um cluster do Dataproc ou use um cluster do Dataproc existente com as seguintes definições:

Configure uma instância do Spanner com uma tabela de base de dados Singers

Crie uma instância do Spanner com uma base de dados que contenha uma tabela Singers. Anote o ID da instância e o ID da base de dados do Spanner.

Use o conetor do Spanner com o Spark

O conetor do Spanner está disponível para as versões do Spark 3.1+. Especifica a versão do conetor como parte da especificação do ficheiro JAR do conetor do Cloud Storage quando envia uma tarefa para um cluster do Dataproc.

Exemplo: envio de tarefas do Spark da CLI gcloud com o conetor do Spanner.

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Substitua o seguinte:

CONNECTOR_VERSION: versão do conetor do Spanner. Escolha a versão do conetor do Spanner na lista de versões no repositório do GitHub GoogleCloudDataproc/spark-spanner-connector.

Ler tabelas do Spanner

Pode usar Python ou Scala para ler dados de tabelas do Spanner num Dataframe do Spark através da API de origem de dados do Spark.

PySpark

Pode executar o código PySpark de exemplo nesta secção no seu cluster enviando a tarefa para o serviço Dataproc ou executando a tarefa a partir do spark-submitREPL no nó principal do cluster.

Tarefa do Dataproc

  1. Crie um ficheiro singers.py com um editor de texto local ou no Cloud Shell com o editor de texto vi, vim ou nano pré-instalado.
    1. Depois de preencher as variáveis de marcador de posição, cole o seguinte código no ficheiro singers.py. Tenha em atenção que a funcionalidade Data Boost do Spanner está ativada, o que tem um impacto quase nulo na instância principal do Spanner.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Substitua o seguinte:

      1. PROJECT_ID: o ID do seu Google Cloud projeto. Os IDs dos projetos estão listados na secção Informações do projeto no Google Cloud painel de controlo da consola.
      2. INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte Configure uma instância do Spanner com a Singerstabela da base de dados.
    2. Guarde o ficheiro singers.py.
  2. Envie a tarefa para o serviço Dataproc através da Google Cloud consola, da CLI gcloud ou da API Dataproc.

    Exemplo: envio de tarefas da CLI gcloud com o conetor do Spanner.

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
          

    Substitua o seguinte:

    1. CLUSTER_NAME: o nome do novo cluster.
    2. REGION: uma região do Compute Engine disponível para executar a carga de trabalho.
    3. CONNECTOR_VERSION: versão do conetor do Spanner. Escolha a versão do conetor do Spanner na lista de versões no repositório do GitHub GoogleCloudDataproc/spark-spanner-connector.

Tarefa spark-submit

  1. Ligue-se ao nó principal do cluster do Dataproc através de SSH.
    1. Aceda à página Clusters do Dataproc na Google Cloud consola e, de seguida, clique no nome do cluster.
    2. Na página Detalhes do cluster, selecione o separador Instâncias de VM. Em seguida, clique em SSH à direita do nome do nó principal do cluster.
      Captura de ecrã da página de detalhes do cluster do Dataproc na consola Google Cloud , que mostra o botão SSH usado para estabelecer ligação ao nó principal do cluster.

      É aberta uma janela do navegador no diretório base do nó principal.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crie um ficheiro singers.py no nó principal através do editor de texto vi, vim ou nano pré-instalado.
    1. Cole o seguinte código no ficheiro singers.py. Tenha em atenção que a funcionalidade Data Boost do Spanner está ativada, o que tem um impacto quase nulo na instância principal do Spanner.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Substitua o seguinte:

      1. PROJECT_ID: o ID do seu Google Cloud projeto. Os IDs dos projetos estão listados na secção Informações do projeto no Google Cloud painel de controlo da consola.
      2. INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte Configure uma instância do Spanner com a Singerstabela da base de dados.
    2. Guarde o ficheiro singers.py.
  3. Execute singers.py com spark-submit para criar a tabela Singers do Spanner.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Substitua o seguinte:

    1. CONNECTOR_VERSION: versão do conetor do Spanner. Escolha a versão do conetor do Spanner na lista de versões no repositório do GitHub GoogleCloudDataproc/spark-spanner-connector.

    O resultado é:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

Para executar o exemplo de código Scala no cluster, conclua os seguintes passos:

  1. Ligue-se ao nó principal do cluster do Dataproc através de SSH.
    1. Aceda à página do Dataproc Clusters na Google Cloud consola e, de seguida, clique no nome do cluster.
    2. Na página Detalhes do cluster, selecione o separador Instâncias de VM. Em seguida, clique em SSH à direita do nome do nó principal do cluster. Página de detalhes do cluster do Dataproc na consola Google Cloud .

      É aberta uma janela do navegador no diretório base do nó principal.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crie um ficheiro singers.scala no nó principal através do editor de texto vi, vim ou nano pré-instalado.
    1. Cole o seguinte código no ficheiro singers.scala. Tenha em atenção que a funcionalidade Data Boost está ativada, o que tem um impacto quase nulo na instância principal do Spanner.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Substitua o seguinte:

      1. PROJECT_ID: o ID do seu Google Cloud projeto. Os IDs dos projetos estão listados na secção Informações do projeto no Google Cloud painel de controlo da consola.
      2. INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte Configure uma instância do Spanner com a Singerstabela da base de dados.
    2. Guarde o ficheiro singers.scala.
  3. Inicie o spark-shell REPL.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Substitua o seguinte:

    CONNECTOR_VERSION: versão do conetor do Spanner. Escolha a versão do conetor do Spanner na lista de versões no repositório do GitHub GoogleCloudDataproc/spark-spanner-connector.

  4. Execute singers.scala com o comando :load singers.scala para criar a tabela Singers do Spanner. A saída da lista apresenta exemplos da saída Singers.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Leia gráficos do Spanner

O conector do Spanner suporta a exportação do gráfico para DataFrames de nós e arestas separados, bem como a exportação diretamente para o GraphFrames.

O exemplo seguinte exporta um Spanner para um GraphFrame. Usa a SpannerGraphConnectorclasse do Python, incluída no JAR do conetor do Spanner, para ler o gráfico do Spanner.

from pyspark.sql import SparkSession

connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar"

spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example")
         .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
         .config("spark.jars", connector_jar)
         .getOrCreate())
spark.sparkContext.addPyFile(connector_jar)

from spannergraph import SpannerGraphConnector

connector = (SpannerGraphConnector()
             .spark(spark)
             .project("PROJECT_ID")
             .instance("INSTANCE_ID")
             .database("DATABASE_ID")
             .graph("GRAPH_ID"))

g = connector.load_graph()
g.vertices.show()
g.edges.show()

Substitua o seguinte:

  • CONNECTOR_VERSION: versão do conetor do Spanner. Escolha a versão do conetor do Spanner na lista de versões no repositório do GitHub GoogleCloudDataproc/spark-spanner-connector.
  • PROJECT_ID: o ID do seu Google Cloud projeto. Os IDs dos projetos estão listados na secção Informações do projeto no Google Cloud painel de controloda consola.
  • INSTANCE_ID, DATABASE_ID e TABLE_NAME inseriram os IDs da instância, da base de dados e do gráfico.

Para exportar nós e arestas DataFrames em vez de GraphFrames, use load_dfs:

df_vertices, df_edges, df_id_map = connector.load_dfs()

Limpar

Para evitar incorrer em cobranças contínuas na sua Google Cloud conta, pode parar ou eliminar o cluster do Dataproc e eliminar a instância do Spanner.

O que se segue?