Usar o conector do Spanner com o Spark

Esta página mostra como usar o conector do Spark Spanner para ler dados do Spanner usando o Apache Spark.

Cálculo de custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

Antes de executar o tutorial, saiba qual é a versão do conector e obtenha um URI do conector.

Como especificar o URI do arquivo JAR do conector

As versões do conector Spark Spanner estão listadas no repositório do GitHub GoogleCloudDataproc/spark-spanner-connector.

Especifique o arquivo JAR do conector substituindo as informações da versão do conector na seguinte string de URI:

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

O conector está disponível para as versões 3.1+ do Spark

Exemplo da CLI gcloud:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

Preparar o banco de dados do Spanner

Se você não tiver uma tabela do Spanner, siga o tutorial para criar uma. Depois disso, você terá um ID de instância, um ID de banco de dados e uma tabela Singers.

Criar cluster do Dataproc

Qualquer cluster do Dataproc que use o conector precisa dos escopos spanner ou cloud-platform. Os clusters do Dataproc têm o escopo padrão cloud-platform para a imagem 2.1 ou mais recente. Se você usar uma versão mais antiga, poderá usar o console do Google Cloud, a Google Cloud CLI e a API Dataproc para criar um cluster do Dataproc.

Console

  1. No console do Google Cloud, abra a página Criar um cluster do Dataproc.
  2. Na guia "Gerenciar segurança", clique em "Ativa o escopo da plataforma de nuvem para este cluster" na seção "Acesso ao projeto".
  3. Preencha ou confirme os outros campos de criação de clusters e clique em "Criar".

Google Cloud CLI

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

É possível especificar o GceClusterConfig.serviceAccountScopes como parte de uma solicitação clusters.create. Exemplo:
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

Verifique se a permissão do Spanner correspondente foi atribuída à conta de serviço da VM do Dataproc. Se você usar o Data Boost no tutorial, consulte a permissão do IAM do Data Boost.

Ler dados do Spanner

É possível usar Scala e Python para ler dados do Spanner em um Dataframe do Spark usando a API de origem de dados do Spark.

Scala

  1. Examine o código e substitua os marcadores [projectId], [instanceId], [databaseId] e [table] pelo ID do projeto, da instância, do banco de dados e da tabela que você criou anteriormente. A opção enableDataBoost ativa o recurso Data Boost do Spanner, que tem impacto quase zero na instância principal do Spanner.
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
  2. Executar o código no seu cluster
    1. Use o SSH para se conectar ao nó mestre do cluster do Dataproc.
      1. Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
        Página de clusters do Dataproc no Console do Cloud.
      2. Na página >Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster
        Página de detalhes do cluster do Dataproc no console do Cloud

        Uma janela do navegador é aberta no diretório principal do nó mestre.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie singers.scala com o editor de texto vi, vim ou nano pré-instalado e cole o código da lista de códigos Scala
      nano singers.scala
        
    3. Inicie o REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. Execute singers.scala com o comando :load singers.scala para criar a tabela Singers do Spanner. A listagem de saída mostra exemplos da saída de Singers.
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. Examine o código e substitua os marcadores [projectId], [instanceId], [databaseId] e [table] pelo ID do projeto, da instância, do banco de dados e da tabela que você criou anteriormente. A opção enableDataBoost ativa o recurso Data Boost do Spanner, que tem impacto quase zero na instância principal do Spanner.
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
  2. Execute o código no cluster
    1. Use o SSH para se conectar ao nó mestre do cluster do Dataproc.
      1. Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
        Página "Clusters" no console do Cloud.
      2. Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster
        Selecione SSH na linha do nome do cluster na página "Detalhes do cluster" no Console do Cloud.

        Uma janela do navegador é aberta no diretório principal do nó principal.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie singers.py com o editor de texto vi, vim ou nano pré-instalado e cole o código PySpark da lista de códigos PySpark
      nano singers.py
      
    3. Execute singers.py com spark-submit para criar a tabela Singers do Spanner.
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      O resultado é:
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

Limpeza

Para limpar e evitar cobranças contínuas na sua conta do Google Cloud pelos recursos criados neste tutorial, siga estas etapas.

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

Para mais informações