Usar o conector do Spanner com o Spark

Nesta página, mostramos como usar o Conector do Spark Spanner para ler dados do Spanner usando o Apache Spark

Cálculo de custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

Antes de executar o tutorial, saiba qual é a versão do conector e obtenha um URI do conector.

Como especificar o URI do arquivo JAR do conector

As versões do conector do Spark Spanner estão listadas no GitHub Repositório GoogleCloudDataproc/spark-spanner-connector.

Especifique o arquivo JAR do conector substituindo a versão do conector informações na seguinte string de URI:

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

O conector está disponível para as versões 3.1+ do Spark

Exemplo da CLI gcloud:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

Prepare o banco de dados do Spanner

Se você não tiver uma tabela do Spanner, siga o tutorial para criar um na tabela do Spanner. Depois disso, você terá um ID de instância, um ID de banco de dados e uma tabela Singers.

Criar cluster do Dataproc

Qualquer cluster do Dataproc que use o conector precisa dos escopos spanner ou cloud-platform. Os clusters do Dataproc têm o escopo padrão cloud-platform para a imagem 2.1 ou mais recente. Se você usa uma versão mais antiga, use o console do Google Cloud, a CLI do Google Cloud e a API Dataproc para criar um cluster do Dataproc.

Console

  1. No console do Google Cloud, abra o app Dataproc Página Criar um cluster
  2. Na guia "Gerenciar segurança", clique em "Ativa o escopo da plataforma de nuvem para este cluster" na seção "Acesso ao projeto".
  3. Preencha ou confirme os outros campos de criação de cluster e, em seguida, clique em "Criar".

Google Cloud CLI

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

É possível especificar o GceClusterConfig.serviceAccountScopes como parte de uma solicitação clusters.create. Exemplo:
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

Você terá que garantir que a permissão do Spanner correspondente esteja atribuída à conta de serviço da VM do Dataproc. Se você usar o Data Boost no tutorial, consulte a permissão do IAM do Data Boost.

Ler dados do Spanner

É possível usar Scala e Python para ler dados do Spanner em um Dataframe do Spark usando a API de origem de dados do Spark.

Scala

  1. Examine o código e substitua os marcadores [projectId], [instanceId], [databaseId] e [table] pelo ID do projeto, ID da instância, ID do banco de dados e tabela que você criou anteriormente. A opção enableDataBoost ativa o recurso Data Boost do Spanner, que tem quase zero na instância principal do Spanner.
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
  2. Executar o código no seu cluster
    1. Use o SSH para se conectar ao nó mestre do cluster do Dataproc
      1. Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
        Página de clusters do Dataproc no console do Cloud.
      2. Na página >Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster
        Página de detalhes do cluster do Dataproc no console do Cloud

        Uma janela do navegador é aberta no diretório principal do nó mestre.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie singers.scala com o editor de texto vi, vim ou nano pré-instalado e cole o código da lista de códigos Scala
      nano singers.scala
        
    3. Inicie o REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. Execute singers.scala com o comando :load singers.scala para criar a tabela Singers do Spanner. O resultado a listagem exibe exemplos da saída de Singers.
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. Examine o código e substitua os marcadores [projectId], [instanceId], [databaseId] e [table] pelo ID do projeto, da instância, do banco de dados e da tabela que você criou anteriormente. A opção enableDataBoost ativa o recurso Data Boost do Spanner, que tem quase zero na instância principal do Spanner.
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
  2. Executar o código no cluster
    1. Use o SSH para se conectar ao nó mestre do cluster do Dataproc
      1. Acesse o Clusters do Dataproc página no console do Google Cloud e, em seguida, clique no nome do seu cluster
        Página "Clusters" no console do Cloud.
      2. Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster
        Selecione SSH na linha do nome do cluster na página "Detalhes do cluster" no console do Cloud.

        Uma janela do navegador é aberta no diretório principal do nó principal.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie singers.py com o editor de texto vi, vim ou nano pré-instalado e cole o código PySpark da lista de códigos PySpark
      nano singers.py
      
    3. Execute singers.py com spark-submit para criar a tabela Singers do Spanner.
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      A saída é:
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

Limpeza

Para limpar e evitar cobranças contínuas na sua conta do Google Cloud pelos recursos criados neste tutorial, siga estas etapas.

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

Para mais informações