Esta página mostra como criar um cluster do Dataproc que usa o conector do Spark Spanner para ler dados do Spanner usando o Apache Spark.
Calcular custos
Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.
Antes de começar
Antes de usar o conector do Spanner neste tutorial, configure um cluster do Dataproc e uma instância e um banco de dados do Spanner.
Configurar um cluster do Dataproc
Crie um cluster do Dataproc ou use um cluster do Dataproc com as seguintes configurações:
Permissões da conta de serviço da VM. A conta de serviço da VM do cluster precisa receber as permissões do Spanner adequadas. Se você usar o Data Boost, que está ativado no código de exemplo em Ler dados do Spanner, a conta de serviço da VM também precisa ter as permissões do IAM do Data Boost necessárias.
Escopo de acesso. O cluster precisa ser criado com o escopo
cloud-platform
ou o escopospanner
adequado ativado. O escopocloud-platform
é ativado por padrão para clusters criados com a versão da imagem 2.1 ou mais recente.As instruções a seguir mostram como definir o escopo
cloud-platform
como parte de uma solicitação de criação de cluster que usa o console do Google Cloud, CLI gcloud ou a API Dataproc. Para mais instruções sobre a criação de clusters, consulte Criar um cluster.Console do Google Cloud
- No console do Google Cloud, abra a página do Dataproc Criar um cluster.
- No painel Gerenciar segurança, na seção Acesso ao projeto, clique em "Ativa o escopo da plataforma de nuvem para este cluster".
- Preencha ou confirme os outros campos de criação de cluster e clique em Criar.
CLI da gcloud
Execute o comando
gcloud dataproc clusters create
abaixo para criar um cluster com o escopocloud-platform
ativado.gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
É possível especificar o GceClusterConfig.serviceAccountScopes como parte de uma solicitação clusters.create.
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
Configurar uma instância do Spanner com uma tabela de banco de dados Singers
Crie uma instância do Spanner
com um banco de dados que contenha uma tabela Singers
. Anote o ID da instância
do Spanner e o ID do banco de dados.
Usar o conector do Spanner com o Spark
O conector do Spanner está disponível para as versões 3.1+
do Spark.
Especifique a
versão do conector como parte da especificação do arquivo JAR do conector do Cloud Storage quando você
enviar um job para
um cluster do Dataproc.
Exemplo:envio de jobs do Spark da CLI gcloud com o conector Spanner.
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
Substitua:
CONNECTOR_VERSION: versão do conector do Spanner.
Escolha a versão do conector Spanner na lista de versões do repositório
GoogleCloudDataproc/spark-spanner-connector
do GitHub.
Ler dados do Spanner
Você pode usar Python ou Scala para ler dados do Spanner em um Dataframe do Spark usando a API de fonte de dados do Spark.
PySpark
É possível executar o exemplo de código PySpark nesta seção no cluster enviando o job ao
serviço do Dataproc ou executando o job no REPL spark-submit
no nó mestre do cluster.
Job do Dataproc
- Crie um arquivo
singers.py
usando um editor de texto local ou o Cloud Shell com o editor de textovi
,vim
ounano
pré-instalado. - Cole o seguinte código no arquivo
singers.py
. O recurso Data Boost do Spanner está ativado, o que tem impacto quase zero na instância principal do Spanner.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Substitua:
- PROJECT_ID: o ID do Google Cloud projeto. Os IDs dos projetos estão listados na seção Informações do projeto no Painel do console do Google Cloud.
- INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte
Configurar uma instância do Spanner com a tabela de banco de dados
Singers
.
- Salve o arquivo
singers.py
. - Envie o job
para o serviço do Dataproc usando o console do Google Cloud, CLI gcloud ou a API Dataproc.
Exemplo:envio de jobs da CLI gcloud com o conector Spanner.
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION
Substitua:
- CLUSTER_NAME: o nome do novo cluster.
- REGION: uma região do Compute Engine disponível para executar a carga de trabalho.
- CONNECTOR_VERSION: versão do conector do Spanner.
Escolha a versão do conector Spanner na lista de versões do repositório
GoogleCloudDataproc/spark-spanner-connector
do GitHub.
Job do spark-submit
- Conecte-se ao nó mestre do cluster do Dataproc usando SSH.
- Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
- Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em
SSH
à direita do nome do nó mestre do cluster.Uma janela de navegador é aberta no diretório inicial do nó mestre.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crie um arquivo
singers.py
no nó mestre usando o editor de textovi
,vim
ounano
pré-instalado.- Cole o seguinte código no arquivo
singers.py
. O recurso Data Boost do Spanner está ativado, o que tem impacto quase zero na instância principal do Spanner.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Substitua:
- PROJECT_ID: o ID do Google Cloud projeto. Os IDs dos projetos estão listados na seção Informações do projeto no Painel do console do Google Cloud.
- INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte
Configurar uma instância do Spanner com a tabela de banco de dados
Singers
.
- Salve o arquivo
singers.py
.
- Cole o seguinte código no arquivo
- Execute
singers.py
comspark-submit
para criar a tabelaSingers
do Spanner.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
Substitua:
- CONNECTOR_VERSION: versão do conector do Spanner.
Escolha a versão do conector Spanner na lista de versões do repositório
GoogleCloudDataproc/spark-spanner-connector
do GitHub.
A resposta é:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: versão do conector do Spanner.
Escolha a versão do conector Spanner na lista de versões do repositório
Scala
Para executar o exemplo de código Scala no cluster, siga estas etapas:
- Conecte-se ao nó mestre do cluster do Dataproc usando SSH.
- Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
- Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em
SSH
à direita do nome do nó mestre do cluster.Uma janela de navegador é aberta no diretório inicial do nó mestre.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crie um arquivo
singers.scala
no nó mestre usando o editor de textovi
,vim
ounano
pré-instalado.- Cole o seguinte código no arquivo
singers.scala
. O recurso Data Boost do Spanner está ativado, o que tem impacto quase zero na instância principal do Spanner.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
Substitua:
- PROJECT_ID: o ID do Google Cloud projeto. Os IDs dos projetos estão listados na seção Informações do projeto no Painel do console do Google Cloud.
- INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte
Configurar uma instância do Spanner com a tabela de banco de dados
Singers
.
- Salve o arquivo
singers.scala
.
- Cole o seguinte código no arquivo
- Inicie o REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Substitua:
CONNECTOR_VERSION: versão do conector do Spanner. Escolha a versão do conector Spanner na lista de versões do repositório
GoogleCloudDataproc/spark-spanner-connector
do GitHub. - Execute
singers.scala
com o comando:load singers.scala
para criar a tabelaSingers
do Spanner. A listagem de saída mostra exemplos da saída de Singers.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Limpeza
Para evitar cobranças contínuas na sua Google Cloud conta, você pode interromper ou excluir seu cluster do Dataproc e excluir sua instância do Spanner.