Esta página mostra-lhe como criar um cluster do Dataproc que usa o conetor do Spark Spanner para ler dados do Spanner através do Apache Spark
O conetor do Spanner funciona com o Spark para ler dados da base de dados do Spanner através da biblioteca Java do Spanner. O conetor do Spanner suporta a leitura de tabelas e gráficos do Spanner em DataFrames do Spark e GraphFrames.
Custos
Neste documento, usa os seguintes componentes faturáveis do Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
Para gerar uma estimativa de custos com base na sua utilização projetada,
use a calculadora de preços.
Antes de começar
Antes de usar o conetor do Spanner neste tutorial, configure um cluster do Dataproc e uma instância e base de dados do Spanner.
Configure um cluster do Dataproc
Crie um cluster do Dataproc ou use um cluster do Dataproc existente com as seguintes definições:
Autorizações da conta de serviço da VM. A conta de serviço da VM do cluster tem de ter as autorizações do Spanner adequadas atribuídas. Se usar o Data Boost (o Data Boost está ativado no código de exemplo em Exportar tabelas do Spanner), a conta de serviço da VM também tem de ter as autorizações do IAM do Data Boost necessárias.
Âmbito de acesso. O cluster tem de ser criado com o âmbito
cloud-platform
ou o âmbitospanner
adequado ativado. O âmbitocloud-platform
está ativado por predefinição para clusters criados com a versão 2.1 da imagem ou superior.As instruções seguintes mostram como definir o âmbito
cloud-platform
como parte de um pedido de criação de um cluster que usa a consola, a CLI gcloud ou a API Dataproc. Google Cloud Para ver instruções adicionais sobre a criação de clusters, consulte o artigo Crie um cluster.Google Cloud consola
- Na Google Cloud consola, abra a página Dataproc Criar um cluster.
- No painel Gerir segurança na secção Acesso ao projeto, clique em "Ativa o âmbito da plataforma na nuvem para este cluster".
- Preencha ou confirme os outros campos de criação do cluster. Em seguida, clique em Criar.
CLI gcloud
Pode executar o seguinte comando
gcloud dataproc clusters create
para criar um cluster com o âmbitocloud-platform
ativado.gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
Pode especificar o GceClusterConfig.serviceAccountScopes como parte de um pedido clusters.create.
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
Configure uma instância do Spanner com uma tabela de base de dados Singers
Crie uma instância do Spanner
com uma base de dados que contenha uma tabela Singers
. Anote o ID da instância e o ID da base de dados do Spanner.
Use o conetor do Spanner com o Spark
O conetor do Spanner está disponível para as versões do Spark 3.1+
.
Especifica a versão do conetor como parte da especificação do ficheiro JAR do conetor do Cloud Storage quando envia uma tarefa para um cluster do Dataproc.
Exemplo: envio de tarefas do Spark da CLI gcloud com o conetor do Spanner.
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
Substitua o seguinte:
CONNECTOR_VERSION: versão do conetor do Spanner.
Escolha a versão do conetor do Spanner na lista de versões no repositório do GitHub
GoogleCloudDataproc/spark-spanner-connector
.
Ler tabelas do Spanner
Pode usar Python ou Scala para ler dados de tabelas do Spanner num Dataframe do Spark através da API de origem de dados do Spark.
PySpark
Pode executar o código PySpark de exemplo nesta secção no seu cluster enviando a tarefa para o serviço Dataproc ou executando a tarefa a partir do spark-submit
REPL no nó principal do cluster.
Tarefa do Dataproc
- Crie um ficheiro
singers.py
com um editor de texto local ou no Cloud Shell com o editor de textovi
,vim
ounano
pré-instalado. - Depois de preencher as variáveis de marcador de posição, cole o seguinte código
no ficheiro
singers.py
. Tenha em atenção que a funcionalidade Data Boost do Spanner está ativada, o que tem um impacto quase nulo na instância principal do Spanner.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Substitua o seguinte:
- PROJECT_ID: o ID do seu Google Cloud projeto. Os IDs dos projetos estão listados na secção Informações do projeto no Google Cloud painel de controlo da consola.
- INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte
Configure uma instância do Spanner com a
Singers
tabela da base de dados.
- Guarde o ficheiro
singers.py
. - Envie a tarefa
para o serviço Dataproc através da Google Cloud consola, da CLI gcloud ou da
API Dataproc.
Exemplo: envio de tarefas da CLI gcloud com o conetor do Spanner.
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Substitua o seguinte:
- CLUSTER_NAME: o nome do novo cluster.
- REGION: uma região do Compute Engine disponível para executar a carga de trabalho.
- CONNECTOR_VERSION: versão do conetor do Spanner.
Escolha a versão do conetor do Spanner na lista de versões no repositório do GitHub
GoogleCloudDataproc/spark-spanner-connector
.
Tarefa spark-submit
- Ligue-se ao nó principal do cluster do Dataproc através de SSH.
- Aceda à página Clusters do Dataproc na Google Cloud consola e, de seguida, clique no nome do cluster.
- Na página Detalhes do cluster, selecione o separador Instâncias de VM. Em seguida, clique em
SSH
à direita do nome do nó principal do cluster.É aberta uma janela do navegador no diretório base do nó principal.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crie um ficheiro
singers.py
no nó principal através do editor de textovi
,vim
ounano
pré-instalado.- Cole o seguinte código no ficheiro
singers.py
. Tenha em atenção que a funcionalidade Data Boost do Spanner está ativada, o que tem um impacto quase nulo na instância principal do Spanner.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Substitua o seguinte:
- PROJECT_ID: o ID do seu Google Cloud projeto. Os IDs dos projetos estão listados na secção Informações do projeto no Google Cloud painel de controlo da consola.
- INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte
Configure uma instância do Spanner com a
Singers
tabela da base de dados.
- Guarde o ficheiro
singers.py
.
- Cole o seguinte código no ficheiro
- Execute
singers.py
comspark-submit
para criar a tabelaSingers
do Spanner.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
Substitua o seguinte:
- CONNECTOR_VERSION: versão do conetor do Spanner.
Escolha a versão do conetor do Spanner na lista de versões no repositório do GitHub
GoogleCloudDataproc/spark-spanner-connector
.
O resultado é:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: versão do conetor do Spanner.
Escolha a versão do conetor do Spanner na lista de versões no repositório do GitHub
Scala
Para executar o exemplo de código Scala no cluster, conclua os seguintes passos:
- Ligue-se ao nó principal do cluster do Dataproc através de SSH.
- Aceda à página do Dataproc Clusters na Google Cloud consola e, de seguida, clique no nome do cluster.
- Na página Detalhes do cluster, selecione o separador Instâncias de VM. Em seguida, clique em
SSH
à direita do nome do nó principal do cluster.É aberta uma janela do navegador no diretório base do nó principal.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crie um ficheiro
singers.scala
no nó principal através do editor de textovi
,vim
ounano
pré-instalado.- Cole o seguinte código no ficheiro
singers.scala
. Tenha em atenção que a funcionalidade Data Boost está ativada, o que tem um impacto quase nulo na instância principal do Spanner.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
Substitua o seguinte:
- PROJECT_ID: o ID do seu Google Cloud projeto. Os IDs dos projetos estão listados na secção Informações do projeto no Google Cloud painel de controlo da consola.
- INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte
Configure uma instância do Spanner com a
Singers
tabela da base de dados.
- Guarde o ficheiro
singers.scala
.
- Cole o seguinte código no ficheiro
- Inicie o
spark-shell
REPL.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Substitua o seguinte:
CONNECTOR_VERSION: versão do conetor do Spanner. Escolha a versão do conetor do Spanner na lista de versões no repositório do GitHub
GoogleCloudDataproc/spark-spanner-connector
. - Execute
singers.scala
com o comando:load singers.scala
para criar a tabelaSingers
do Spanner. A saída da lista apresenta exemplos da saída Singers.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Leia gráficos do Spanner
O conector do Spanner suporta a exportação do gráfico para DataFrames de nós e arestas separados, bem como a exportação diretamente para o
GraphFrames
.
O exemplo seguinte exporta um Spanner para um GraphFrame
.
Usa a SpannerGraphConnector
classe do Python, incluída no JAR do conetor do Spanner, para ler o gráfico do Spanner.
from pyspark.sql import SparkSession connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar" spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example") .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") .config("spark.jars", connector_jar) .getOrCreate()) spark.sparkContext.addPyFile(connector_jar) from spannergraph import SpannerGraphConnector connector = (SpannerGraphConnector() .spark(spark) .project("PROJECT_ID") .instance("INSTANCE_ID") .database("DATABASE_ID") .graph("GRAPH_ID")) g = connector.load_graph() g.vertices.show() g.edges.show()
Substitua o seguinte:
- CONNECTOR_VERSION: versão do conetor do Spanner.
Escolha a versão do conetor do Spanner na lista de versões no repositório do GitHub
GoogleCloudDataproc/spark-spanner-connector
. - PROJECT_ID: o ID do seu Google Cloud projeto. Os IDs dos projetos estão listados na secção Informações do projeto no Google Cloud painel de controloda consola.
- INSTANCE_ID, DATABASE_ID e TABLE_NAME inseriram os IDs da instância, da base de dados e do gráfico.
Para exportar nós e arestas DataFrames
em vez de GraphFrames, use
load_dfs
:
df_vertices, df_edges, df_id_map = connector.load_dfs()
Limpar
Para evitar incorrer em cobranças contínuas na sua Google Cloud conta, pode parar ou eliminar o cluster do Dataproc e eliminar a instância do Spanner.
O que se segue?
- Consulte os
pyspark.sql.DataFrame
exemplos. - Para ver o suporte de idiomas do Spark DataFrame, consulte o seguinte:
- Consulte o repositório do conetor do Spark Spanner no GitHub.
- Consulte as sugestões de otimização de tarefas do Spark.