Usar o conector do Spanner com o Spark

Nesta página, mostramos como usar o Conector Spark Spanner para ler dados do Spanner usando o Apache Spark

Cálculo de custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

Antes de executar o tutorial, verifique a versão do conector e consiga um URI.

Como especificar o URI do arquivo JAR do conector

As versões do conector do Spark Spanner estão listadas no repositório GoogleCloudDataproc/spark-spanner-connector do GitHub.

Especifique o arquivo JAR do conector substituindo as informações da versão do conector na seguinte string de URI:

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

O conector está disponível para as versões 3.1+ do Spark

Exemplo da CLI gcloud:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

Preparar o banco de dados do Spanner

Se você não tiver uma tabela do Spanner, siga o tutorial para criar uma. Depois disso, você terá um ID de instância, um ID de banco de dados e uma tabela Singers.

Criar cluster do Dataproc

Qualquer cluster do Dataproc que use o conector precisa dos escopos spanner ou cloud-platform. Os clusters do Dataproc têm o escopo padrão cloud-platform para a imagem 2.1 ou superior. Se você usar uma versão mais antiga, poderá usar o console do Google Cloud, a Google Cloud CLI e a API Dataproc para criar um cluster do Dataproc.

Console

  1. No console do Google Cloud, abra a página Criar um cluster do Dataproc
  2. Na guia "Gerenciar segurança", clique em "Ativa o escopo da plataforma de nuvem para este cluster" na seção "Acesso ao projeto".
  3. Preencha ou confirme os outros campos de criação de cluster e clique em "Criar".

CLI do Google Cloud

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

É possível especificar o GceClusterConfig.serviceAccountScopes como parte de uma solicitação clusters.create. Exemplo:
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

Você terá que garantir que a permissão do Spanner correspondente seja atribuída à conta de serviço da VM do Dataproc. Se você usar o Data Boost no tutorial, consulte a Permissão do IAM de Data Boost

Ler dados do Spanner

É possível usar Scala e Python para ler dados do Spanner em um Spark DataFrame usando a API de origem de dados do Spark.

Scala

  1. Analise o código e substitua o marcador [projectId], [instanceId], [databaseId] e [table] pelo ID do projeto, da instância, do banco de dados e da tabela que você criou anteriormente. A opção enableDataBoost habilita o recurso Data Boost do Spanner, que tem um impacto quase zero na instância principal do Spanner.
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
    
    
  2. Executar o código no seu cluster
    1. Use SSH para se conectar ao nó mestre do cluster do Dataproc
      1. Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do seu cluster
        Página de clusters do Dataproc no console do Cloud.
      2. Na página >Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster
        Página de detalhes do cluster do Dataproc no console do Cloud.

        Uma janela do navegador é aberta no diretório principal do nó mestre.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie singers.scala com o editor de texto pré-instalado vi, vim ou nano e cole o código do Scala da lista de códigos do Scala.
      nano singers.scala
        
    3. Inicie o REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. Execute singers.scala com o comando :load singers.scala para criar a tabela Singers do Spanner. A listagem de saída exibe exemplos da saída de Singers.
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. Analise o código e substitua o marcador [projectId], [instanceId], [databaseId] e [table] pelo ID do projeto, da instância, do banco de dados e da tabela que você criou anteriormente. A opção enableDataBoost habilita o recurso Data Boost do Spanner, que tem um impacto quase zero na instância principal do Spanner.
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
    
    
  2. Execute o código no cluster
    1. Use SSH para se conectar ao nó mestre do cluster do Dataproc
      1. Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do seu cluster
        na página "Clusters" no console do Cloud.
      2. Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster
        Selecione SSH na linha do nome do cluster na página de detalhes do cluster no console do Cloud.

        Uma janela do navegador é aberta no diretório principal do nó principal.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie singers.py com o editor de texto vi, vim ou nano pré-instalado e cole o código PySpark da lista de códigos do PySpark.
      nano singers.py
      
    3. Execute singers.py com spark-submit para criar a tabela Singers do Spanner.
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      O resultado é:
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

limpeza

Para limpar e evitar cobranças contínuas na sua conta do Google Cloud pelos recursos criados neste tutorial, siga as etapas abaixo.

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

Para mais informações