Trabalhe com procedimentos armazenados no Apache Spark
Este documento é destinado a engenheiros, cientistas e analistas de dados para criar e chamar procedimentos armazenados do Spark no BigQuery.
Usando o BigQuery, é possível criar e executar procedimentos armazenados pelo Spark programados em Python, Java e Scala. Em seguida, execute esses procedimentos armazenados no BigQuery usando uma consulta do GoogleSQL, semelhante à execução de procedimentos armazenados SQL.
Antes de começar
Para criar um procedimento armazenado para o Spark, peça ao administrador para criar uma conexão do Spark e compartilhá-la com você. Seu administrador também precisa conceder à conta de serviço associada à conexão as permissões necessárias de gerenciamento de identidade e acesso (IAM, na sigla em inglês).
Funções exigidas
Para receber as permissões necessárias para realizar as tarefas neste documento, peça ao administrador para conceder a você os seguintes papéis do IAM:
-
Crie um procedimento armazenado para o Spark:
-
Editor de dados do BigQuery (
roles/bigquery.dataEditor
) no conjunto de dados em que você cria o procedimento armazenado -
Administrador de conexão do BigQuery (
roles/bigquery.connectionAdmin
) na conexão usada pelo procedimento armazenado -
Usuário de jobs do BigQuery (
roles/bigquery.jobUser
) no seu projeto
-
Editor de dados do BigQuery (
-
Chamar um procedimento armazenado para o Spark:
-
Leitor de metadados do BigQuery (
roles/bigquery.metadataViewer
) no conjunto de dados em que o procedimento está armazenado -
Usuário de conexão do BigQuery (
roles/bigquery.connectionUser
) na conexão -
Usuário de jobs do BigQuery (
roles/bigquery.jobUser
) no seu projeto
-
Leitor de metadados do BigQuery (
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Esses papéis predefinidos contêm as permissões necessárias para executar as tarefas neste documento. Para conferir as permissões exatas necessárias, expanda a seção Permissões necessárias:
Permissões necessárias
As permissões a seguir são necessárias para executar as tarefas neste documento:
-
Crie uma conexão:
-
bigquery.connections.create
-
bigquery.connections.list
-
-
Crie um procedimento armazenado para o Spark:
-
bigquery.routines.create
-
bigquery.connections.delegate
-
bigquery.jobs.create
-
-
Chamar um procedimento armazenado para o Spark:
-
bigquery.routines.get
-
bigquery.connections.use
-
bigquery.jobs.create
-
Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.
Consideração de local
É preciso criar um procedimento armazenado para o Spark no mesmo local da sua conexão, porque o procedimento armazenado é executado no mesmo local que a conexão. Por exemplo, para criar um procedimento armazenado na multirregião dos EUA, use uma conexão localizada na multirregião dos EUA.
Preços
As cobranças pela execução de procedimentos do Spark no BigQuery são semelhantes às cobranças pela execução de procedimentos do Spark no Dataproc sem servidor. Para mais informações, consulte Preços do Dataproc sem servidor.
Os procedimentos armazenados do Spark podem ser usados com o modelo de preços sob demanda e com qualquer uma das edições do BigQuery. Os procedimentos do Spark são cobrados usando o modelo de pagamento por uso da edição BigQuery Enterprise em todos os casos, independentemente do modelo de preços de computação usados em seu projeto.
Os procedimentos armazenados do Spark para o BigQuery não são compatíveis com o uso de reservas ou compromissos. As reservas e os compromissos continuam sendo usados para outros procedimentos e consultas com suporte. As cobranças pelo uso dos procedimentos armazenados do Spark são adicionadas à fatura na edição Enterprise, que é o custo de pagamento por uso. Os descontos da sua organização, quando for o caso, serão aplicados.
Os procedimentos armazenados do Spark usam um mecanismo de execução do Spark, mas não haverá cobranças separadas para essa execução. Conforme observado, as cobranças correspondentes são informadas como SKU de pagamento por uso da edição Enterprise do BigQuery.
Os procedimentos armazenados do Spark não oferecem um nível gratuito.
Criar um procedimento armazenado para o Spark
Crie o procedimento armazenado no mesmo local que a conexão usada.
Se o corpo do procedimento armazenado tiver mais de 1 MB, recomendamos que você coloque seu procedimento armazenado em um arquivo em um bucket do Cloud Storage em vez de usar código in-line. O BigQuery fornece dois métodos para criar um procedimento armazenado para o Spark usando o Python:
- Se você quiser usar a instrução
CREATE PROCEDURE
, use o Editor de consultas SQL. - Se você quiser digitar código Python diretamente, use o editor PySpark. É possível salvar o código como um procedimento armazenado.
Usar o Editor de consultas SQL
Para criar um procedimento armazenado para o Spark no editor de consultas SQL, siga estas etapas:
Acessar a página do BigQuery.
No Editor de consultas, adicione o exemplo de código para a instrução
CREATE PROCEDURE
que aparece.Como alternativa, no painel Explorer, clique na conexão do projeto que você usou para criar o recurso de conexão. Para criar um procedimento armazenado para o Spark, clique em
Criar procedimento armazenado.Python
Para criar procedimentos armazenados para o Spark em Python, use o exemplo de código a seguir:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Java ou Scala
Para criar um procedimento armazenado para Spark em Java ou Scala com a opção
main_file_uri
, use o seguinte exemplo de código:CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_JAR_URI"]); LANGUAGE JAVA|SCALA
Para criar um procedimento armazenado para Spark em Java ou Scala com as opções
main_class
ejar_uris
, use o seguinte exemplo de código:CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_class=["CLASS_NAME"], jar_uris=["URI"]); LANGUAGE JAVA|SCALA
Substitua:
PROJECT_ID
: o projeto em que você quer criar o procedimento armazenado (por exemplo,myproject
).DATASET
: o conjunto de dados em que você quer criar o procedimento armazenado, por exemplo,mydataset
.PROCEDURE_NAME
: o nome do procedimento armazenado que você quer executar no BigQuery. Por exemplo,mysparkprocedure
PROCEDURE_ARGUMENT
: um parâmetro para inserir os argumentos de entrada.Nesse parâmetro, especifique os seguintes campos:
ARGUMENT_MODE
: o modo do argumento.Os valores válidos incluem
IN
,OUT
eINOUT
. Por padrão, o valor éIN
.ARGUMENT_NAME
: o nome do argumento.ARGUMENT_TYPE
: o tipo do argumento.
Por exemplo,
myproject.mydataset.mysparkproc(num INT64)
.Para mais informações, consulte como transmitir um valor como um parâmetro
IN
ou os parâmetrosOUT
eINOUT
neste documento de dois minutos.CONNECTION_PROJECT_ID
: o projeto que contém a conexão para executar o procedimento do SparkCONNECTION_REGION
: a região que contém a conexão para executar o procedimento do Spark, por exemplo,us
CONNECTION_ID
: o ID da conexão. Por exemplo,myconnection
.Quando você confere os detalhes da conexão no console Google Cloud , o ID da conexão é o valor na última seção do ID da conexão totalmente qualificado, mostrado em ID da conexão, por exemplo,
projects/myproject/locations/connection_location/connections/myconnection
.RUNTIME_VERSION
: a versão do ambiente de execução do Spark, por exemplo,1.1
.MAIN_PYTHON_FILE_URI
: o caminho para um arquivo PySpark, por exemplo,gs://mybucket/mypysparkmain.py
.Como alternativa, se você quiser adicionar o corpo do procedimento armazenado na instrução
CREATE PROCEDURE
, adicione oPYSPARK_CODE
apósLANGUAGE PYTHON AS
, conforme mostrado no exemplo em Use o código in-line neste documento.PYSPARK_CODE
: a definição de um aplicativo PySpark na instruçãoCREATE PROCEDURE
se você quiser transmitir o corpo do procedimento in-line.O valor é um literal de string. Se o código incluir aspas e barras invertidas, elas precisarão de escape ou representados como uma string bruta. Por exemplo, o código de retorno
"\n";
pode ser representado como um dos seguintes:- String entre aspas:
"return \"\\n\";"
. Aspas e barras invertidas são escapadas. - String entre três aspas:
"""return "\\n";"""
. As barras invertidas são escapadas, enquanto as aspas não. - String bruta:
r"""return "\n";"""
. Não é necessário escape.
- String entre aspas:
MAIN_JAR_URI
: o caminho do arquivo JAR que contém a classemain
, por exemplo,gs://mybucket/my_main.jar
.CLASS_NAME
: o nome totalmente qualificado de uma classe em um conjunto JAR com a opçãojar_uris
. Por exemplo,com.example.wordcount
.URI
: o caminho do arquivo JAR que contém a classe especificada na classemain
, por exemplo,gs://mybucket/mypysparkmain.jar
.
Para conferir outras opções que você pode especificar em
OPTIONS
, consulte a lista de opções de procedimentos.
Usar o editor do PySpark
Ao criar um procedimento usando o editor do PySpark, não é necessário usar a
instrução CREATE PROCEDURE
. Em vez disso, adicione o código Python diretamente no editor Pyspark e salve ou execute o código.
Para criar um procedimento armazenado para o Spark no editor do PySpark, siga estas etapas:
Acessar a página do BigQuery.
Se você quiser digitar o código do PySpark diretamente, abra o editor do PySpark. Para abrir o editor do PySpark, clique no menu
ao lado de Criar consulta SQL e selecione Criar PySpark Procedimento.Para definir opções, clique em Mais > Opções do PySpark e faça o seguinte:
Especifique o local em que você quer executar o código do PySpark.
No campo Conexão, especifique a conexão Spark.
Na seção Invocação do procedimento armazenado, especifique o conjunto de dados em que você quer armazenar os procedimentos armazenados temporários que foram gerados. É possível definir um conjunto de dados específico ou permitir o uso de um conjunto de dados temporário para invocar o código do PySpark.
O conjunto de dados temporário é gerado com o local especificado na etapa anterior. Se um nome de conjunto de dados for especificado, verifique se o conjunto de dados e a conexão Spark precisam estar no mesmo local.
Na seção Parâmetros, defina parâmetros para o procedimento armazenado. O valor do parâmetro é usado apenas durante execuções em sessão do código PySpark, mas a declaração em si é armazenada no procedimento.
Na seção Opções avançadas, especifique as opções de procedimento. Para uma lista detalhada das opções de procedimento, consulte a lista de opções de procedimento.
Na seção Propriedades, adicione os pares de chave-valor para configurar o job. Use qualquer um dos pares de chave-valor das propriedades Spark sem servidor do Dataproc.
Em Configurações da conta de serviço, especifique a conta de serviço personalizada, a CMEK, o conjunto de dados de teste e a pasta de preparo do Cloud Storage que serão usadas durante as execuções da sessão do código do PySpark.
Clique em Salvar.
Salvar um procedimento armazenado para o Spark
Depois de criar o procedimento armazenado usando o editor do PySpark, será possível salvar o procedimento armazenado. Para fazer isso, siga estas etapas:
No console do Google Cloud , acesse a página BigQuery.
No editor de consultas, crie um procedimento armazenado para o Spark usando o Python com o editor do PySpark.
Clique em Salvar > Salvar procedimento.
Na caixa de diálogo Salvar procedimento armazenado, especifique o nome do conjunto de dados em que você quer armazenar o procedimento armazenado e o nome do procedimento armazenado.
Clique em Salvar.
Se você quiser apenas executar o código PySpark em vez de salvá-lo como um procedimento armazenado, clique em Executar em vez de Salvar.
Usar contêineres personalizados
O contêiner personalizado fornece o ambiente de execução para os processos de driver e executor da carga de trabalho. Para usar contêineres personalizados, use o seguinte exemplo de código:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Substitua:
PROJECT_ID
: o projeto em que você quer criar o procedimento armazenado (por exemplo,myproject
).DATASET
: o conjunto de dados em que você quer criar o procedimento armazenado, por exemplo,mydataset
.PROCEDURE_NAME
: o nome do procedimento armazenado que você quer executar no BigQuery. Por exemplo,mysparkprocedure
PROCEDURE_ARGUMENT
: um parâmetro para inserir os argumentos de entrada.Nesse parâmetro, especifique os seguintes campos:
ARGUMENT_MODE
: o modo do argumento.Os valores válidos incluem
IN
,OUT
eINOUT
. Por padrão, o valor éIN
.ARGUMENT_NAME
: o nome do argumento.ARGUMENT_TYPE
: o tipo do argumento.
Por exemplo,
myproject.mydataset.mysparkproc(num INT64)
.Para mais informações, consulte como transmitir um valor como um parâmetro
IN
ou os parâmetrosOUT
eINOUT
neste documento.CONNECTION_PROJECT_ID
: o projeto que contém a conexão para executar o procedimento do SparkCONNECTION_REGION
: a região que contém a conexão para executar o procedimento do Spark, por exemplo,us
CONNECTION_ID
: o ID da conexão. Por exemplo,myconnection
.Ao acessar os detalhes da conexão no console Google Cloud , o ID da conexão é o valor na última seção do ID da conexão totalmente qualificado, mostrado em ID da conexão, por exemplo,
projects/myproject/locations/connection_location/connections/myconnection
.RUNTIME_VERSION
: a versão do ambiente de execução do Spark, por exemplo,1.1
.MAIN_PYTHON_FILE_URI
: o caminho para um arquivo PySpark, por exemplo,gs://mybucket/mypysparkmain.py
.Como alternativa, se você quiser adicionar o corpo do procedimento armazenado na instrução
CREATE PROCEDURE
, adicione oPYSPARK_CODE
apósLANGUAGE PYTHON AS
, conforme mostrado no exemplo em Use o código in-line neste documento.PYSPARK_CODE
: a definição de um aplicativo PySpark na instruçãoCREATE PROCEDURE
se você quiser transmitir o corpo do procedimento in-line.O valor é um literal de string. Se o código incluir aspas e barras invertidas, elas precisarão de escape ou ser representadas como uma string bruta. Por exemplo, o código de retorno
"\n";
pode ser representado como um dos seguintes:- String entre aspas:
"return \"\\n\";"
. Aspas e barras invertidas são escapadas. - String entre três aspas:
"""return "\\n";"""
. As barras invertidas são escapadas, enquanto as aspas não. - String bruta:
r"""return "\n";"""
. Não é necessário escape.
- String entre aspas:
CONTAINER_IMAGE
: caminho da imagem no Artifacts Registry. Ele precisa conter apenas bibliotecas a serem usadas no procedimento. Se não for especificada, a imagem do contêiner padrão do sistema associada à versão do ambiente de execução será usada.
Para mais informações sobre como criar uma imagem de contêiner personalizada com o Spark, consulte Criar uma imagem de contêiner personalizada.
Chamar um procedimento armazenado para o Spark
Depois de criar um procedimento armazenado, você pode chamá-lo usando uma das seguintes opções:
Console
Acessar a página do BigQuery.
No painel Explorer, expanda seu projeto e selecione o procedimento armazenado para o Spark que você quer executar.
Na janela Informações do procedimento armazenado, clique em Invocar o procedimento armazenado. Também é possível expandir a opção Exibir ações e clicar em Invocar.
Clique em Executar.
Na seção Todos os resultados, clique em Exibir resultados.
Opcional: na seção Resultados da consulta, siga estas etapas:
Se você quiser conferir os registros do driver do Spark, clique em Detalhes da execução.
Se você quiser consultar registros no Cloud Logging, clique em Informações do job e, no campo Registro, clique em log
Se você quiser acessar o endpoint do servidor de histórico do Spark, clique em Informações do job e, em seguida, em Servidor de histórico do Spark.
SQL
Para chamar um procedimento armazenado, use a instrução CALL PROCEDURE
:
No console do Google Cloud , acesse a página BigQuery.
No editor de consultas, digite a seguinte instrução:
CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
Clique em
Executar.
Para mais informações sobre como executar consultas, acesse Executar uma consulta interativa.
Usar uma conta de serviço personalizada
Em vez de usar a identidade de serviço da conexão Spark para acessar dados, é possível usar uma conta de serviço personalizada para acessar dados no seu código Spark.
Para usar uma conta de serviço personalizada, especifique o modo de segurança INVOKER
(usando a instrução EXTERNAL SECURITY INVOKER
) ao criar um procedimento armazenado do Spark e especifique a conta de serviço ao invocar o procedimento armazenado.
Se quiser acessar e usar o código Spark do
Cloud Storage, você precisará conceder as permissões necessárias para a
identificação de serviço da conexão Spark. Você precisa conceder à
conta de serviço da conexão a permissão storage.objects.get
do IAM
ou o papel storage.objectViewer
do IAM.
Opcionalmente, conceda à conta de serviço da conexão acesso ao Metastore do Dataproc e ao Servidor de histórico permanente do Dataproc, se você os tiver especificado na conexão. Para mais informações, consulte Conceder acesso à conta de serviço.
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) EXTERNAL SECURITY INVOKER WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE] SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
Opcionalmente, você pode adicionar os seguintes argumentos ao código anterior:
SET @@spark_proc_properties.staging_bucket='BUCKET_NAME'; SET @@spark_proc_properties.staging_dataset_id='DATASET';
Substitua:
CUSTOM_SERVICE_ACCOUNT
: obrigatório. Uma conta de serviço personalizada fornecida por você.BUCKET_NAME
: opcional. O bucket do Cloud Storage usado como o sistema de arquivos padrão do aplicativo Spark. Se isso não for fornecido, um bucket padrão do Cloud Storage será criado no seu projeto e compartilhado por todos os jobs em execução no mesmo projeto.DATASET
: opcional. O conjunto de dados para armazenar os dados temporários produzidos pela invocação do procedimento. Os dados são limpos após a conclusão do job. Se esse valor não for fornecido, um conjunto de dados temporário padrão será criado para o job.
Sua conta de serviço personalizada precisa ter as seguintes permissões:
Para ler e gravar no bucket de preparo usado como sistema de arquivos padrão do aplicativo Spark:
storage.objects.*
ou o papel do IAMroles/storage.objectAdmin
no bucket de preparo que você especificar.- Além disso, as permissões
storage.buckets.*
ou o papel do IAMroles/storage.Admin
no projeto se o bucket de preparo não estiver especificado.
(Opcional) Para ler e gravar dados no BigQuery:
bigquery.tables.*
nas tabelas do BigQuery.bigquery.readsessions.*
no seu projeto.- O papel
roles/bigquery.admin
do IAM inclui as permissões anteriores.
(Opcional) Para ler e gravar dados no Cloud Storage e no Cloud Storage:
storage.objects.*
ou o papel do IAMroles/storage.objectAdmin
nos objetos do Cloud Storage.
(Opcional) Para ler e gravar no conjunto de dados de preparo usado para os parâmetros
INOUT/OUT
:bigquery.tables.*
ouroles/bigquery.dataEditor
no conjunto de dados de preparo especificado.- Além disso, a permissão
bigquery.datasets.create
ou o papel do IAMroles/bigquery.dataEditor
no projeto se o conjunto de dados de teste não for especificado.
Exemplos de procedimentos armazenados para o Spark
Nesta seção, mostramos exemplos de como criar um procedimento armazenado para o Apache Spark.
Usar um arquivo PySpark ou JAR no Cloud Storage
No exemplo a seguir, mostramos como criar um procedimento armazenado para o Spark usando a conexão my-project-id.us.my-connection
e um arquivo PySpark ou JAR armazenado em um bucket do Cloud Storage:
Python
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py") LANGUAGE PYTHON
Java ou Scala
Use main_file_uri
para criar um procedimento armazenado:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar") LANGUAGE SCALA
Use main_class
para criar um procedimento armazenado:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"]) LANGUAGE SCALA
Usar código in-line
O exemplo a seguir mostra como criar um procedimento armazenado para o Spark usando a conexão my-project-id.us.my-connection
e o código in-line PySpark:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() # Load data from BigQuery. words = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:samples.shakespeare") \ .load() words.createOrReplaceTempView("words") # Perform word count. word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count") word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("wordcount_dataset.wordcount_output") """
Transmitir um valor como parâmetro de entrada
Os exemplos a seguir mostram os dois métodos para transmitir um valor como parâmetro de entrada em Python:
Método 1: usar variáveis de ambiente
No código do PySpark, é possível conseguir os parâmetros de entrada do procedimento armazenado para o Spark por meio de variáveis de ambiente no driver e nos executores do Spark. O nome da variável de ambiente tem o formato BIGQUERY_PROC_PARAM.PARAMETER_NAME
, em que PARAMETER_NAME
é o nome do parâmetro de entrada. Por exemplo, se o nome do parâmetro de entrada for var
, o nome da variável de ambiente correspondente é BIGQUERY_PROC_PARAM.var
. Os parâmetros de entrada são codificados em JSON.
No código do PySpark, é possível receber o valor do parâmetro de entrada em uma string JSON da variável de ambiente e decodificá-lo para uma variável Python.
O exemplo a seguir mostra como receber o valor de um parâmetro de entrada do tipo INT64
no código do PySpark:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import os import json spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc = spark.sparkContext # Get the input parameter num in JSON string and convert to a Python variable num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"])) """
Método 2: usar uma biblioteca integrada
No código do PySpark, basta importar uma biblioteca integrada e usá-la para preencher todos os tipos de parâmetros. Para transmitir os parâmetros para os executores, preencha os parâmetros em um driver do Spark como variáveis do Python e transmita os valores para os executores. A biblioteca integrada é compatível com a maioria dos tipos de dados do BigQuery, exceto INTERVAL
, GEOGRAPHY
, NUMERIC
e BIGNUMERIC
.
Tipo de dados do BigQuery | Tipo de dados Python |
---|---|
BOOL
|
bool
|
STRING
|
str
|
FLOAT64
|
float
|
INT64
|
int
|
BYTES
|
bytes
|
DATE
|
datetime.date
|
TIMESTAMP
|
datetime.datetime
|
TIME
|
datetime.time
|
DATETIME
|
datetime.datetime
|
Array
|
Array
|
Struct
|
Struct
|
JSON
|
Object
|
NUMERIC
|
Sem suporte |
BIGNUMERIC
|
Sem suporte |
INTERVAL
|
Sem suporte |
GEOGRAPHY
|
Sem suporte |
O exemplo a seguir mostra como importar a biblioteca integrada e usá-la para preencher um parâmetro de entrada do tipo INT64 e um parâmetro de entrada do tipo ARRAY<STRUCT<a INT64, b STRING>> no código do PySpark:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession from bigquery.spark.procedure import SparkProcParamContext def check_in_param(x, num): return x['a'] + num def main(): spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc=spark.sparkContext spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Get the input parameter num of type INT64 num = spark_proc_param_context.num # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>> info = spark_proc_param_context.info # Pass the parameter to executors df = sc.parallelize(info) value = df.map(lambda x : check_in_param(x, num)).sum() main() """
No código Java ou Scala, é possível receber os parâmetros de entrada do procedimento armazenado para o Spark por meio de variáveis de ambiente no driver e nos executores do Spark. O nome da variável de ambiente tem o formato BIGQUERY_PROC_PARAM.PARAMETER_NAME
, em que PARAMETER_NAME
é o nome do parâmetro de entrada. Por exemplo, se o nome do parâmetro de entrada for var, o nome da variável de ambiente correspondente é BIGQUERY_PROC_PARAM.var
.
No código Java ou Scala, é possível receber o valor do parâmetro de entrada da variável de ambiente.
O exemplo a seguir mostra como receber o valor de um parâmetro de entrada de variáveis de ambiente para o código Scala:
val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get
O exemplo a seguir mostra como receber parâmetros de entrada de variáveis de ambiente no código Java:
String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");
Transmitir valores como parâmetros OUT
e INOUT
Os parâmetros de saída retornam o valor do procedimento do Spark, enquanto o parâmetro INOUT
aceita um valor do procedimento e retorna um valor do procedimento.
Para usar os parâmetros OUT
e INOUT
, adicione a palavra-chave OUT
ou INOUT
antes do nome do parâmetro ao criar o procedimento do Spark. No código do PySpark, use a biblioteca integrada para retornar um valor como um parâmetro OUT
ou INOUT
. Assim como os parâmetros de entrada, a biblioteca integrada é compatível com a maioria dos tipos de dados do BigQuery, exceto INTERVAL
, GEOGRAPHY
, NUMERIC
e BIGNUMERIC
. Os valores dos tipos TIME
e DATETIME
são convertidos para o fuso horário UTC ao retornar como os parâmetros OUT
ou INOUT
.
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON) WITH CONNECTION `my_bq_project.my_dataset.my_connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql.session import SparkSession import datetime from bigquery.spark.procedure import SparkProcParamContext spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate() spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Reading the IN and INOUT parameter values. int = spark_proc_param_context.int dt = spark_proc_param_context.datetime print("IN parameter value: ", int, ", INOUT parameter value: ", dt) # Returning the value of the OUT and INOUT parameters. spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.b = True spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}] spark_proc_param_context.time = datetime.time(23, 20, 50, 520000) spark_proc_param_context.f = 20.23 spark_proc_param_context.bs = b"hello" spark_proc_param_context.date = datetime.date(1985, 4, 12) spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.js = {"name": "Alice", "age": 30} """;
Ler de uma tabela do Hive Metastore e gravar resultados no BigQuery
No exemplo a seguir, mostramos como transformar uma tabela do Hive Metastore e gravar os resultados no BigQuery:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \ .enableHiveSupport() \ .getOrCreate() spark.sql("CREATE DATABASE IF NOT EXISTS records") spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)") spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)") df = spark.sql("SELECT * FROM records.student") df.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("records_dataset.student") """
Ver filtros de registro
Depois de chamar um procedimento armazenado para o Spark, você poderá exibir as informações do registro. Para acessar as informações de filtro do Cloud Logging
e o endpoint do cluster de histórico do Spark, use o comando bq
show
.
As informações do filtro estão disponíveis no campo SparkStatistics
do job filho. Para receber filtros de registro, siga estas etapas:
Acessar a página do BigQuery.
No editor de consultas, liste os jobs filhos do job de script do procedimento armazenado:
bq ls -j --parent_job_id=$parent_job_id
Para saber como conseguir o ID do job, consulte Exibir detalhes do job.
O resultado será assim:
jobId Job Type State Start Time Duration ---------------------------------------------- --------- --------- --------------- ---------------- script_job_90fb26c32329679c139befcc638a7e71_0 query SUCCESS 07 Sep 18:00:27 0:05:15.052000
Identifique o
jobId
do procedimento armazenado e use o comandobq show
para visualizar os detalhes do job:bq show --format=prettyjson --job $child_job_id
Copie o campo
sparkStatistics
, porque você precisará dele em outra etapa.O resultado será assim:
{ "configuration": {...} … "statistics": { … "query": { "sparkStatistics": { "loggingInfo": { "projectId": "myproject", "resourceType": "myresource" }, "sparkJobId": "script-job-90f0", "sparkJobLocation": "us-central1" }, … } } }
Para o Logging, gere filtros de registro com os campos
SparkStatistics
:resource.type = sparkStatistics.loggingInfo.resourceType resource.labels.resource_container=sparkStatistics.loggingInfo.projectId resource.labels.spark_job_id=sparkStatistics.sparkJobId resource.labels.location=sparkStatistics.sparkJobLocation
Os registros são gravados no recurso monitorado
bigquery.googleapis.com/SparkJob
. Os registros são rotulados pelos componentesINFO
,DRIVER
eEXECUTOR
. Para filtrar registros do driver do Spark, adicione o componentelabels.component = "DRIVER"
aos filtros de registro. Para filtrar registros do executor do Spark, adicione o componentelabels.component = "EXECUTOR"
aos filtros de registro.
Usar a chave de criptografia gerenciada pelo cliente
O procedimento do BigQuery para Spark usa a chave de criptografia gerenciada pelo cliente (CMEK) para proteger seu conteúdo, com a criptografia padrão fornecida pelo BigQuery. Para usar a CMEK no procedimento do Spark, primeiro acione a criação da conta de serviço de criptografia do BigQuery e conceda as permissões necessárias. O procedimento do Spark também oferece suporte às políticas da organização de CMEK, se elas forem aplicadas ao seu projeto.
Se o procedimento armazenado estiver usando o modo de segurança INVOKER
, a CMEK precisará ser especificada pela variável de sistema SQL ao chamar o procedimento. Caso contrário, a CMEK poderá ser especificada pela conexão associada ao procedimento armazenado.
Para especificar a CMEK por meio da conexão ao criar um procedimento armazenado do Spark, use o seguinte exemplo de código:
bq mk --connection --connection_type='SPARK' \ --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \ --project_id=PROJECT_ID \ --location=LOCATION \ CONNECTION_NAME
Para especificar a CMEK por meio da variável de sistema SQL ao chamar o procedimento, use este exemplo de código:
SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
Usar o VPC Service Controls
O VPC Service Controls permite configurar um perímetro seguro para evitar a exfiltração de dados. Para usar o VPC Service Controls com um procedimento do Spark para aumentar a segurança, primeiro crie um perímetro de serviço.
Para proteger totalmente os jobs do procedimento Spark, adicione as seguintes APIs ao perímetro de serviço:
- API BigQuery (
bigquery.googleapis.com
) - API Cloud Logging (
logging.googleapis.com
) - API Cloud Storage (
storage.googleapis.com
), se você usa o Cloud Storage - API Artifact Registry (
artifactregistry.googleapis.com
) ou API Container Registry (containerregistry.googleapis.com
), se você usar um contêiner personalizado - API Dataproc Metastore (
metastore.googleapis.com
) e API Cloud Run Admin (run.googleapis.com
), se você usar o metastore do Dataproc
Adicione o projeto de consulta do procedimento do Spark ao perímetro. Adicione outros projetos que hospedam seus dados ou código Spark no perímetro.
Práticas recomendadas
Quando você usa a conexão no projeto pela primeira vez, o provisionamento leva cerca de um minuto a mais. Para economizar tempo, é possível reutilizar uma conexão Spark ao criar um procedimento armazenado para o Spark.
Ao criar um procedimento do Spark para uso em produção, o Google recomenda especificar uma versão de ambiente de execução. Para ver uma lista de versões de ambiente de execução compatíveis, consulte Versões de ambiente de execução sem servidor do Dataproc. Recomendamos usar a versão com suporte de longo tempo (LTS).
Ao especificar um contêiner personalizado em um procedimento do Spark, recomendamos usar o Artifact Registry e o streaming de imagens.
Para um melhor desempenho, é possível especificar propriedades de alocação de recursos no procedimento do Spark. Os procedimentos armazenados pelo Spark são compatíveis com uma lista de propriedades de alocação de recursos, da mesma forma que o Dataproc sem servidor.
Limitações
- É possível usar apenas o protocolo de endpoints gRPC para se conectar ao metastore do Dataproc. Outros tipos de metastore Hive ainda não são compatíveis.
- As chaves de criptografia gerenciadas pelo cliente (CMEK) estão disponíveis apenas quando os clientes
criam procedimentos do Spark de uma única região. As chaves CMEK de região global e chaves CMEK multirregionais, por exemplo,
EU
ouUS
, não são compatíveis. - A passagem de parâmetros de saída só é compatível com o PySpark.
- Se o conjunto de dados associado ao procedimento armazenado para o Spark for replicado para uma região de destino por meio da replicação entre as regiões do conjunto de dados, o procedimento armazenado só poderá ser consultado na região que em que foi criado.
- O Spark não oferece suporte ao acesso a endpoints HTTP na sua rede privada do VPC Service Controls.
Cotas e limites
Para saber mais sobre cotas e limites, consulte procedimentos armazenados para cotas e limites do Spark.
A seguir
- Saiba como exibir um procedimento armazenado.
- Saiba como excluir um procedimento armazenado.
- Saiba como trabalhar com um procedimento armazenado de SQL.