Usar o conector do Spanner com o Spark

Esta página mostra como criar um cluster do Dataproc que usa o conector do Spark Spanner para ler dados do Spanner usando o Apache Spark.

Calcular custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

Antes de usar o conector do Spanner neste tutorial, configure um cluster do Dataproc e uma instância e um banco de dados do Spanner.

Configurar um cluster do Dataproc

Crie um cluster do Dataproc ou use um cluster do Dataproc com as seguintes configurações:

  • Permissões da conta de serviço da VM. A conta de serviço da VM do cluster precisa receber as permissões do Spanner adequadas. Se você usar o Data Boost, que está ativado no código de exemplo em Ler dados do Spanner, a conta de serviço da VM também precisa ter as permissões do IAM do Data Boost necessárias.

  • Escopo de acesso. O cluster precisa ser criado com o escopo cloud-platform ou o escopo spanner adequado ativado. O escopo cloud-platform é ativado por padrão para clusters criados com a versão da imagem 2.1 ou mais recente.

    As instruções a seguir mostram como definir o escopo cloud-platform como parte de uma solicitação de criação de cluster que usa o console do Google Cloud, CLI gcloud ou a API Dataproc. Para mais instruções sobre a criação de clusters, consulte Criar um cluster.

    Console do Google Cloud

    1. No console do Google Cloud, abra a página do Dataproc Criar um cluster.
    2. No painel Gerenciar segurança, na seção Acesso ao projeto, clique em "Ativa o escopo da plataforma de nuvem para este cluster".
    3. Preencha ou confirme os outros campos de criação de cluster e clique em Criar.

    CLI da gcloud

    Execute o comando gcloud dataproc clusters create abaixo para criar um cluster com o escopo cloud-platform ativado.

    gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

    API

    É possível especificar o GceClusterConfig.serviceAccountScopes como parte de uma solicitação clusters.create.

        "serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
    

Configurar uma instância do Spanner com uma tabela de banco de dados Singers

Crie uma instância do Spanner com um banco de dados que contenha uma tabela Singers. Anote o ID da instância do Spanner e o ID do banco de dados.

Usar o conector do Spanner com o Spark

O conector do Spanner está disponível para as versões 3.1+ do Spark. Especifique a versão do conector como parte da especificação do arquivo JAR do conector do Cloud Storage quando você enviar um job para um cluster do Dataproc.

Exemplo:envio de jobs do Spark da CLI gcloud com o conector Spanner.

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Substitua:

CONNECTOR_VERSION: versão do conector do Spanner. Escolha a versão do conector Spanner na lista de versões do repositório GoogleCloudDataproc/spark-spanner-connector do GitHub.

Ler dados do Spanner

Você pode usar Python ou Scala para ler dados do Spanner em um Dataframe do Spark usando a API de fonte de dados do Spark.

PySpark

É possível executar o exemplo de código PySpark nesta seção no cluster enviando o job ao serviço do Dataproc ou executando o job no REPL spark-submit no nó mestre do cluster.

Job do Dataproc

  1. Crie um arquivo singers.py usando um editor de texto local ou o Cloud Shell com o editor de texto vi, vim ou nano pré-instalado.
    1. Cole o seguinte código no arquivo singers.py. O recurso Data Boost do Spanner está ativado, o que tem impacto quase zero na instância principal do Spanner.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
          

      Substitua:

      1. PROJECT_ID: o ID do Google Cloud projeto. Os IDs dos projetos estão listados na seção Informações do projeto no Painel do console do Google Cloud.
      2. INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte Configurar uma instância do Spanner com a tabela de banco de dados Singers.
    2. Salve o arquivo singers.py.
  2. Envie o job para o serviço do Dataproc usando o console do Google Cloud, CLI gcloud ou a API Dataproc.

    Exemplo:envio de jobs da CLI gcloud com o conector Spanner.

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION
          

    Substitua:

    1. CLUSTER_NAME: o nome do novo cluster.
    2. REGION: uma região do Compute Engine disponível para executar a carga de trabalho.
    3. CONNECTOR_VERSION: versão do conector do Spanner. Escolha a versão do conector Spanner na lista de versões do repositório GoogleCloudDataproc/spark-spanner-connector do GitHub.

Job do spark-submit

  1. Conecte-se ao nó mestre do cluster do Dataproc usando SSH.
    1. Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
    2. Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster.
      Página de detalhes do cluster do Dataproc no console do Cloud

      Uma janela de navegador é aberta no diretório inicial do nó mestre.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crie um arquivo singers.py no nó mestre usando o editor de texto vi, vim ou nano pré-instalado.
    1. Cole o seguinte código no arquivo singers.py. O recurso Data Boost do Spanner está ativado, o que tem impacto quase zero na instância principal do Spanner.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Substitua:

      1. PROJECT_ID: o ID do Google Cloud projeto. Os IDs dos projetos estão listados na seção Informações do projeto no Painel do console do Google Cloud.
      2. INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte Configurar uma instância do Spanner com a tabela de banco de dados Singers.
    2. Salve o arquivo singers.py.
  3. Execute singers.py com spark-submit para criar a tabela Singers do Spanner.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Substitua:

    1. CONNECTOR_VERSION: versão do conector do Spanner. Escolha a versão do conector Spanner na lista de versões do repositório GoogleCloudDataproc/spark-spanner-connector do GitHub.

    A resposta é:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

Para executar o exemplo de código Scala no cluster, siga estas etapas:

  1. Conecte-se ao nó mestre do cluster do Dataproc usando SSH.
    1. Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
    2. Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster.
      Página de detalhes do cluster do Dataproc no console do Cloud

      Uma janela de navegador é aberta no diretório inicial do nó mestre.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crie um arquivo singers.scala no nó mestre usando o editor de texto vi, vim ou nano pré-instalado.
    1. Cole o seguinte código no arquivo singers.scala. O recurso Data Boost do Spanner está ativado, o que tem impacto quase zero na instância principal do Spanner.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Substitua:

      1. PROJECT_ID: o ID do Google Cloud projeto. Os IDs dos projetos estão listados na seção Informações do projeto no Painel do console do Google Cloud.
      2. INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte Configurar uma instância do Spanner com a tabela de banco de dados Singers.
    2. Salve o arquivo singers.scala.
  3. Inicie o REPL spark-shell.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Substitua:

    CONNECTOR_VERSION: versão do conector do Spanner. Escolha a versão do conector Spanner na lista de versões do repositório GoogleCloudDataproc/spark-spanner-connector do GitHub.

  4. Execute singers.scala com o comando :load singers.scala para criar a tabela Singers do Spanner. A listagem de saída mostra exemplos da saída de Singers.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Limpeza

Para evitar cobranças contínuas na sua Google Cloud conta, você pode interromper ou excluir seu cluster do Dataproc e excluir sua instância do Spanner.

Para mais informações