Trabalhe com procedimentos armazenados para o Apache Spark
Este documento destina-se a engenheiros, cientistas e analistas de dados para criar e chamar procedimentos armazenados para o Spark no BigQuery.
Com o BigQuery, pode criar e executar procedimentos armazenados do Spark escritos em Python, Java e Scala. Em seguida, pode executar estes procedimentos armazenados no BigQuery através de uma consulta GoogleSQL, semelhante à execução de procedimentos armazenados SQL.
Antes de começar
Para criar um procedimento armazenado para o Spark, peça ao seu administrador para criar uma ligação ao Spark e partilhá-la consigo. O administrador também tem de conceder à conta de serviço associada à ligação as autorizações de gestão de identidade e de acesso (IAM) necessárias.
Funções necessárias
Para receber as autorizações de que precisa para realizar as tarefas neste documento, peça ao seu administrador para lhe conceder as seguintes funções do IAM:
-
Crie um procedimento armazenado para o Spark:
-
Editor de dados do BigQuery (
roles/bigquery.dataEditor
) no conjunto de dados onde cria o procedimento armazenado -
Administrador da associação do BigQuery (
roles/bigquery.connectionAdmin
) na associação que o procedimento armazenado usa -
Utilizador de tarefas do BigQuery (
roles/bigquery.jobUser
) no seu projeto
-
Editor de dados do BigQuery (
-
Chame um procedimento armazenado para o Spark:
-
Leitor de metadados do BigQuery (
roles/bigquery.metadataViewer
) no conjunto de dados onde o procedimento armazenado está armazenado -
Utilizador da ligação do BigQuery (
roles/bigquery.connectionUser
) na ligação -
Utilizador de tarefas do BigQuery (
roles/bigquery.jobUser
) no seu projeto
-
Leitor de metadados do BigQuery (
Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.
Estas funções predefinidas contêm as autorizações necessárias para realizar as tarefas descritas neste documento. Para ver as autorizações exatas que são necessárias, expanda a secção Autorizações necessárias:
Autorizações necessárias
São necessárias as seguintes autorizações para realizar as tarefas neste documento:
-
Crie uma associação:
-
bigquery.connections.create
-
bigquery.connections.list
-
-
Crie um procedimento armazenado para o Spark:
-
bigquery.routines.create
-
bigquery.connections.delegate
-
bigquery.jobs.create
-
-
Chame um procedimento armazenado para o Spark:
-
bigquery.routines.get
-
bigquery.connections.use
-
bigquery.jobs.create
-
Também pode conseguir estas autorizações com funções personalizadas ou outras funções predefinidas.
Consideração da localização
Tem de criar um procedimento armazenado para o Spark na mesma localização que a sua ligação , uma vez que o procedimento armazenado é executado na mesma localização que a ligação. Por exemplo, para criar um procedimento armazenado na multirregião dos EUA, usa uma ligação localizada na multirregião dos EUA.
Preços
As cobranças pela execução de procedimentos do Spark no BigQuery são semelhantes às cobranças pela execução de procedimentos do Spark no Serverless para Apache Spark. Para mais informações, consulte os preços do Serverless para Apache Spark.
Os procedimentos armazenados do Spark podem ser usados com o modelo de preços a pedido, bem como com qualquer uma das edições do BigQuery. Os procedimentos do Spark são cobrados através do modelo de pagamento conforme o uso da BigQuery Enterprise Edition em todos os casos, independentemente do modelo de preços de computação usado no seu projeto.
Os procedimentos armazenados do Spark para o BigQuery não suportam a utilização de reservas nem compromissos. As reservas e os compromissos existentes continuam a ser usados para outras consultas e procedimentos suportados. As cobranças pela utilização de procedimentos armazenados do Spark são adicionadas à sua fatura ao custo da edição Enterprise de pagamento conforme a utilização. Os descontos da sua organização são aplicados, quando aplicável.
Embora os procedimentos armazenados do Spark usem um motor de execução do Spark, não verá cobranças separadas pela execução do Spark. Conforme indicado, os custos correspondentes são comunicados como SKU de pagamento conforme a utilização da edição Enterprise do BigQuery.
Os procedimentos armazenados do Spark não oferecem um nível gratuito.
Crie um procedimento armazenado para o Spark
Tem de criar o procedimento armazenado na mesma localização que a ligação que usa.
Se o corpo do procedimento armazenado for superior a 1 MB, recomendamos que coloque o procedimento armazenado num ficheiro num contentor do Cloud Storage em vez de usar código inline. O BigQuery oferece dois métodos para criar um procedimento armazenado para o Spark com Python:
- Se quiser usar a declaração
CREATE PROCEDURE
, use o editor de consultas SQL. - Se quiser escrever código Python diretamente, use o editor PySpark. Pode guardar o código como um procedimento armazenado.
Use o editor de consultas SQL
Para criar um procedimento armazenado para o Spark no editor de consultas SQL, siga estes passos:
Aceda à página do BigQuery.
No editor de consultas, adicione o exemplo de código para a declaração
CREATE PROCEDURE
apresentada.Em alternativa, no painel Explorador, clique na ligação no projeto que usou para criar o recurso de ligação. Em seguida, para criar um procedimento armazenado para o Spark, clique em
Criar procedimento armazenado.Python
Para criar um procedimento armazenado para o Spark em Python, use o seguinte exemplo de código:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Java ou Scala
Para criar um procedimento armazenado para o Spark em Java ou Scala com a opção
main_file_uri
, use o seguinte código de exemplo:CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_JAR_URI"]); LANGUAGE JAVA|SCALA
Para criar um procedimento armazenado para o Spark em Java ou Scala com as opções
main_class
ejar_uris
, use o seguinte código de exemplo:CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_class=["CLASS_NAME"], jar_uris=["URI"]); LANGUAGE JAVA|SCALA
Substitua o seguinte:
PROJECT_ID
: o projeto no qual quer criar o procedimento armazenado. Por exemplo,myproject
.DATASET
: o conjunto de dados no qual quer criar o procedimento armazenado, por exemplo,mydataset
.PROCEDURE_NAME
: o nome do procedimento armazenado que quer executar no BigQuery. Por exemplo,mysparkprocedure
.PROCEDURE_ARGUMENT
: um parâmetro para introduzir os argumentos de entrada.Neste parâmetro, especifique os seguintes campos:
ARGUMENT_MODE
: o modo do argumento.Os valores válidos incluem
IN
,OUT
eINOUT
. Por predefinição, o valor éIN
.ARGUMENT_NAME
: o nome do argumento.ARGUMENT_TYPE
: o tipo do argumento.
Por exemplo:
myproject.mydataset.mysparkproc(num INT64)
.Para mais informações, consulte como transmitir um valor como um parâmetro
IN
ou os parâmetrosOUT
eINOUT
neste documento.CONNECTION_PROJECT_ID
: o projeto que contém a associação para executar o procedimento do Spark.CONNECTION_REGION
: a região que contém a ligação para executar o procedimento do Spark, por exemplo,us
.CONNECTION_ID
: o ID de ligação, por exemplo,myconnection
.Quando vê os detalhes da associação na consola Google Cloud , o ID da associação é o valor na última secção do ID da associação totalmente qualificado que é apresentado em ID da associação, por exemplo
projects/myproject/locations/connection_location/connections/myconnection
.RUNTIME_VERSION
: a versão de tempo de execução do Spark, por exemplo,2.2
.MAIN_PYTHON_FILE_URI
: o caminho para um ficheiro PySpark, por exemplo,gs://mybucket/mypysparkmain.py
.Em alternativa, se quiser adicionar o corpo do procedimento armazenado na declaração
CREATE PROCEDURE
, adicionePYSPARK_CODE
apósLANGUAGE PYTHON AS
, conforme mostrado no exemplo em Usar código inline neste documento.PYSPARK_CODE
: a definição de uma aplicação PySpark na declaraçãoCREATE PROCEDURE
se quiser transmitir o corpo do procedimento inline.O valor é um literal de string. Se o código incluir aspas e barras invertidas, estas têm de ser escapadas ou representadas como uma string bruta. Por exemplo, o código de retorno
"\n";
pode ser representado como uma das seguintes opções:- String entre aspas:
"return \"\\n\";"
. As aspas e as barras invertidas são interpretadas de forma literal. - String entre aspas triplas:
"""return "\\n";"""
. As barras invertidas são ignoradas, enquanto as aspas não são. - String não processada:
r"""return "\n";"""
. Não é necessário usar carateres de escape.
- String entre aspas:
MAIN_JAR_URI
: o caminho do ficheiro JAR que contém a classemain
, por exemplo,gs://mybucket/my_main.jar
.CLASS_NAME
: o nome totalmente qualificado de uma classe num conjunto JAR com a opçãojar_uris
, por exemplo,com.example.wordcount
.URI
: o caminho do ficheiro JAR que contém a classe especificada na classemain
, por exemplo,gs://mybucket/mypysparkmain.jar
.
Para ver opções adicionais que pode especificar em
OPTIONS
, consulte a lista de opções de procedimentos.
Use o editor do PySpark
Quando cria um procedimento com o editor do PySpark, não precisa de usar a declaração CREATE PROCEDURE
. Em alternativa, adicione o código Python diretamente no editor do Pyspark e guarde ou execute o código.
Para criar um procedimento armazenado para o Spark no editor do PySpark, siga estes passos:
Aceda à página do BigQuery.
Se quiser escrever o código PySpark diretamente, abra o editor do PySpark. Para abrir o editor do PySpark, clique no menu
junto a Criar consulta SQL e, de seguida, selecione Criar procedimento do PySpark.Para definir opções, clique em Mais > Opções do PySpark e, de seguida, faça o seguinte:
Especifique a localização onde quer executar o código PySpark.
No campo Ligação, especifique a ligação do Spark.
Na secção Invocações de procedimentos armazenados, especifique o conjunto de dados no qual quer armazenar os procedimentos armazenados temporários que são gerados. Pode definir um conjunto de dados específico ou permitir a utilização de um conjunto de dados temporário para invocar o código PySpark.
O conjunto de dados temporário é gerado com a localização especificada no passo anterior. Se for especificado um nome do conjunto de dados, certifique-se de que o conjunto de dados e a ligação do Spark estão na mesma localização.
Na secção Parâmetros, defina os parâmetros para o procedimento armazenado. O valor do parâmetro só é usado durante as execuções na sessão do código PySpark, mas a própria declaração é armazenada no procedimento.
Na secção Opções avançadas, especifique as opções do procedimento. Para uma lista detalhada das opções de procedimento, consulte a lista de opções de procedimento.
1. Na secção Propriedades, adicione os pares de chave-valor para configurar a tarefa. Pode usar qualquer um dos pares de chave-valor das propriedades do Spark suportadas pelo Serverless para Apache Spark.
Nas Definições da conta de serviço, especifique a conta de serviço personalizada, a CMEK, o conjunto de dados de preparação e a pasta de preparação do Cloud Storage a usar durante as execuções na sessão do código PySpark.
Clique em Guardar.
Guarde um procedimento armazenado para o Spark
Depois de criar o procedimento armazenado com o editor do PySpark, pode guardar o procedimento armazenado. Para o fazer, siga estes passos:
Na Google Cloud consola, aceda à página BigQuery.
No editor de consultas, crie um procedimento armazenado para o Spark usando Python com o editor PySpark.
Clique em
Guardar > Guardar procedimento.
Na caixa de diálogo Guardar procedimento armazenado, especifique o nome do conjunto de dados onde quer armazenar o procedimento armazenado e o nome do procedimento armazenado.
Clique em Guardar.
Se quiser apenas executar o código PySpark em vez de o guardar como um procedimento armazenado, pode clicar em Executar em vez de Guardar.
Use contentores personalizados
O contentor personalizado fornece o ambiente de tempo de execução para os processos do controlador e do executor da carga de trabalho. Para usar contentores personalizados, use o seguinte exemplo de código:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Substitua o seguinte:
PROJECT_ID
: o projeto no qual quer criar o procedimento armazenado, por exemplo,myproject
.DATASET
: o conjunto de dados no qual quer criar o procedimento armazenado, por exemplo,mydataset
.PROCEDURE_NAME
: o nome do procedimento armazenado que quer executar no BigQuery, por exemplo,mysparkprocedure
.PROCEDURE_ARGUMENT
: um parâmetro para introduzir os argumentos de entrada.Neste parâmetro, especifique os seguintes campos:
ARGUMENT_MODE
: o modo do argumento.Os valores válidos incluem
IN
,OUT
eINOUT
. Por predefinição, o valor éIN
.ARGUMENT_NAME
: o nome do argumento.ARGUMENT_TYPE
: o tipo do argumento.
Por exemplo:
myproject.mydataset.mysparkproc(num INT64)
.Para mais informações, consulte como transmitir um valor como um parâmetro
IN
ou os parâmetrosOUT
eINOUT
neste documento.CONNECTION_PROJECT_ID
: o projeto que contém a associação para executar o procedimento do Spark.CONNECTION_REGION
: a região que contém a ligação para executar o procedimento do Spark, por exemplo,us
.CONNECTION_ID
: o ID da associação, por exemplo,myconnection
.Quando vê os detalhes da associação na Google Cloud consola, o ID da associação é o valor na última secção do ID da associação totalmente qualificado que é apresentado em ID da associação, por exemplo,
projects/myproject/locations/connection_location/connections/myconnection
.RUNTIME_VERSION
: a versão de tempo de execução do Spark, por exemplo,2.2
.MAIN_PYTHON_FILE_URI
: o caminho para um ficheiro PySpark, por exemplo,gs://mybucket/mypysparkmain.py
.Em alternativa, se quiser adicionar o corpo do procedimento armazenado na declaração
CREATE PROCEDURE
, adicionePYSPARK_CODE
apósLANGUAGE PYTHON AS
, conforme mostrado no exemplo em Usar código inline neste documento.PYSPARK_CODE
: a definição de uma aplicação PySpark na declaraçãoCREATE PROCEDURE
se quiser transmitir o corpo do procedimento inline.O valor é um literal de string. Se o código incluir aspas e barras invertidas, estas têm de ser escapadas ou representadas como uma string não processada. Por exemplo, o código de retorno
"\n";
pode ser representado como uma das seguintes opções:- String entre aspas:
"return \"\\n\";"
. As aspas e as barras invertidas são interpretadas de forma literal. - String entre aspas triplas:
"""return "\\n";"""
. As barras invertidas são ignoradas, enquanto as aspas não são. - String não processada:
r"""return "\n";"""
. Não é necessário usar carateres de escape.
- String entre aspas:
CONTAINER_IMAGE
: caminho da imagem no registo de artefactos. Só pode conter bibliotecas para usar no seu procedimento. Se não for especificado, é usada a imagem do contentor predefinida do sistema associada à versão de tempo de execução.
Para mais informações sobre como criar uma imagem de contentor personalizada com o Spark, consulte o artigo Crie uma imagem de contentor personalizada.
Chame um procedimento armazenado para o Spark
Depois de criar um procedimento armazenado, pode chamá-lo através de uma das seguintes opções:
Consola
Aceda à página do BigQuery.
No painel Explorador, expanda o projeto e selecione o procedimento armazenado para o Spark que quer executar.
Na janela Informações do procedimento armazenado, clique em Invocar procedimento armazenado. Em alternativa, pode expandir a opção Ver ações e clicar em Invocar.
Clique em Executar.
Na secção Todos os resultados, clique em Ver resultados.
Opcional: na secção Resultados da consulta, siga estes passos:
Se quiser ver os registos do controlador do Spark, clique em Detalhes da execução.
Se quiser ver os registos no Cloud Logging, clique em Informações da tarefa e, de seguida, no campo Registo, clique em registo.
Se quiser obter o ponto final do servidor do histórico do Spark, clique em Informações da tarefa e, de seguida, em Servidor do histórico do Spark.
SQL
Para chamar um procedimento armazenado, use a declaração CALL PROCEDURE
:
Na Google Cloud consola, aceda à página BigQuery.
No editor de consultas, introduza a seguinte declaração:
CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
Clique em
Executar.
Para mais informações sobre como executar consultas, consulte o artigo Execute uma consulta interativa.
Use uma conta de serviço personalizada
Em vez de usar a identidade de serviço da ligação Spark para aceder aos dados, pode usar uma conta de serviço personalizada para aceder aos dados no seu código Spark.
Para usar uma conta de serviço personalizada, especifique o INVOKER
modo de segurança (usando a declaração EXTERNAL SECURITY INVOKER
) quando criar um procedimento armazenado do Spark e especifique a conta de serviço quando invocar o procedimento armazenado.
Quando executa o procedimento armazenado do Spark com a conta de serviço personalizada pela primeira vez, o BigQuery cria um agente de serviço do Spark e concede ao agente de serviço as autorizações necessárias. Certifique-se de que não modifica esta concessão antes de invocar o procedimento armazenado do Spark. Para saber mais detalhes, consulte o artigo Agente do serviço BigQuery Spark.
Se quiser aceder e usar código Spark a partir do Cloud Storage, tem de conceder as autorizações necessárias à identidade do serviço da ligação Spark. Tem de conceder à conta de serviço da ligação a autorização storage.objects.get
IAM
ou a função storage.objectViewer
IAM.
Opcionalmente, pode conceder à conta de serviço da ligação acesso ao Dataproc Metastore e ao servidor de histórico persistente do Dataproc se os tiver especificado na ligação. Para mais informações, consulte o artigo Conceda acesso à conta de serviço.
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) EXTERNAL SECURITY INVOKER WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE] SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
Opcionalmente, pode adicionar os seguintes argumentos ao código anterior:
SET @@spark_proc_properties.staging_bucket='BUCKET_NAME'; SET @@spark_proc_properties.staging_dataset_id='DATASET';
Substitua o seguinte:
CUSTOM_SERVICE_ACCOUNT
: obrigatório. Uma conta de serviço personalizada fornecida por si.BUCKET_NAME
: opcional. O contentor do Cloud Storage que é usado como o sistema de ficheiros da aplicação Spark predefinido. Se não for fornecido, é criado um contentor do Cloud Storage predefinido no seu projeto, e o contentor é partilhado por todas as tarefas executadas no mesmo projeto.DATASET
: opcional. O conjunto de dados para armazenar os dados temporários produzidos ao invocar o procedimento. Os dados são limpos após a conclusão da tarefa. Se não for fornecido, é criado um conjunto de dados temporário predefinido para a tarefa.
A sua conta de serviço personalizada tem de ter as seguintes autorizações:
Para ler e escrever no contentor de preparação usado como o sistema de ficheiros da aplicação Spark predefinido:
storage.objects.*
ou a função de IAMroles/storage.objectAdmin
no contentor de preparação que especificar.- Além disso, as autorizações
storage.buckets.*
ou a função de IAMroles/storage.Admin
no projeto se o contentor de preparação não for especificado.
(Opcional) Para ler e escrever dados do e para o BigQuery:
bigquery.tables.*
nas tabelas do BigQuery.bigquery.readsessions.*
no projeto.- A função de IAM
roles/bigquery.admin
inclui as autorizações anteriores.
(Opcional) Para ler e escrever dados de e para o Cloud Storage:
storage.objects.*
ou a função do IAMroles/storage.objectAdmin
nos seus objetos do Cloud Storage.
(Opcional) Para ler e escrever no conjunto de dados de preparação usado para parâmetros
INOUT/OUT
:- Função de IAM
bigquery.tables.*
ouroles/bigquery.dataEditor
no conjunto de dados de preparação especificado. - Além disso, a autorização
bigquery.datasets.create
ou a função de IAMroles/bigquery.dataEditor
no projeto se o conjunto de dados de preparação não for especificado.
- Função de IAM
Exemplos de procedimentos armazenados para o Spark
Esta secção mostra exemplos de como pode criar um procedimento armazenado para o Apache Spark.
Use um ficheiro PySpark ou JAR no Cloud Storage
O exemplo seguinte mostra como criar um procedimento armazenado para o Spark usando a ligação my-project-id.us.my-connection
e um ficheiro PySpark ou JAR armazenado num contentor do Cloud Storage:
Python
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2", main_file_uri="gs://my-bucket/my-pyspark-main.py") LANGUAGE PYTHON
Java ou Scala
Use main_file_uri
para criar um procedimento armazenado:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2", main_file_uri="gs://my-bucket/my-scala-main.jar") LANGUAGE SCALA
Use main_class
para criar um procedimento armazenado:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2", main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"]) LANGUAGE SCALA
Use código inline
O exemplo seguinte mostra como criar um procedimento armazenado para o Spark através da ligação my-project-id.us.my-connection
e do código PySpark inline:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() # Load data from BigQuery. words = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:samples.shakespeare") \ .load() words.createOrReplaceTempView("words") # Perform word count. word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count") word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("wordcount_dataset.wordcount_output") """
Transmita um valor como um parâmetro de entrada
Os exemplos seguintes apresentam os dois métodos para transmitir um valor como um parâmetro de entrada em Python:
Método 1: use variáveis de ambiente
No código PySpark, pode obter os parâmetros de entrada do procedimento armazenado para o Spark através de variáveis de ambiente no controlador e nos executores do Spark. O nome da variável de ambiente tem o formato de
BIGQUERY_PROC_PARAM.PARAMETER_NAME
,
em que PARAMETER_NAME
é o nome do parâmetro de entrada. Por exemplo, se o nome do parâmetro de entrada for var
, o nome da variável de ambiente correspondente é BIGQUERY_PROC_PARAM.var
. Os parâmetros de entrada são codificados em JSON.
No seu código PySpark, pode obter o valor do parâmetro de entrada numa string JSON da variável de ambiente e descodificá-lo para uma variável Python.
O exemplo seguinte mostra como obter o valor de um parâmetro de entrada do tipo
INT64
no seu código PySpark:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import os import json spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc = spark.sparkContext # Get the input parameter num in JSON string and convert to a Python variable num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"])) """
Método 2: use uma biblioteca integrada
No código PySpark, pode simplesmente importar uma biblioteca integrada e usá-la para preencher todos os tipos de parâmetros. Para transmitir os parâmetros aos executores, preencha os parâmetros num controlador do Spark como variáveis Python e transmita os valores aos executores. A biblioteca incorporada suporta a maioria dos tipos de dados do BigQuery, exceto INTERVAL
, GEOGRAPHY
, NUMERIC
e BIGNUMERIC
.
Tipo de dados do BigQuery | Tipo de dados Python |
---|---|
BOOL
|
bool
|
STRING
|
str
|
FLOAT64
|
float
|
INT64
|
int
|
BYTES
|
bytes
|
DATE
|
datetime.date
|
TIMESTAMP
|
datetime.datetime
|
TIME
|
datetime.time
|
DATETIME
|
datetime.datetime
|
Array
|
Array
|
Struct
|
Struct
|
JSON
|
Object
|
NUMERIC
|
Não suportado |
BIGNUMERIC
|
Não suportado |
INTERVAL
|
Não suportado |
GEOGRAPHY
|
Não suportado |
O exemplo seguinte mostra como importar a biblioteca integrada e usá-la para preencher um parâmetro de entrada do tipo INT64 e um parâmetro de entrada do tipo ARRAY<STRUCT<a INT64, b STRING>> no seu código PySpark:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession from bigquery.spark.procedure import SparkProcParamContext def check_in_param(x, num): return x['a'] + num def main(): spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc=spark.sparkContext spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Get the input parameter num of type INT64 num = spark_proc_param_context.num # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>> info = spark_proc_param_context.info # Pass the parameter to executors df = sc.parallelize(info) value = df.map(lambda x : check_in_param(x, num)).sum() main() """
No código Java ou Scala, pode obter os parâmetros de entrada do procedimento armazenado para o Spark através de variáveis de ambiente no controlador e nos executores do Spark. O nome da variável de ambiente tem o formato BIGQUERY_PROC_PARAM.PARAMETER_NAME
, em que PARAMETER_NAME
é o nome do parâmetro de entrada. Por exemplo, se o nome do parâmetro de entrada for var, o nome da variável de ambiente correspondente é BIGQUERY_PROC_PARAM.var
.
No seu código Java ou Scala, pode obter o valor do parâmetro de entrada da variável de ambiente.
O exemplo seguinte mostra como obter o valor de um parâmetro de entrada das variáveis de ambiente no seu código Scala:
val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get
O exemplo seguinte mostra como obter parâmetros de entrada de variáveis de ambiente no seu código Java:
String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");
Transmita valores como parâmetros OUT
e INOUT
Os parâmetros de saída devolvem o valor do procedimento do Spark, enquanto o parâmetro INOUT
aceita um valor para o procedimento e devolve um valor do procedimento.
Para usar os parâmetros OUT
e INOUT
, adicione a palavra-chave OUT
ou INOUT
antes do nome do parâmetro quando criar o procedimento do Spark. No código PySpark, usa a biblioteca incorporada para devolver um valor como um parâmetro OUT
ou INOUT
. Tal como os parâmetros de entrada, a biblioteca incorporada suporta a maioria dos tipos de dados do BigQuery, exceto INTERVAL
, GEOGRAPHY
, NUMERIC
e BIGNUMERIC
. Os valores do tipo TIME
e DATETIME
são convertidos para o fuso horário UTC quando são devolvidos como os parâmetros OUT
ou INOUT
.
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON) WITH CONNECTION `my_bq_project.my_dataset.my_connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql.session import SparkSession import datetime from bigquery.spark.procedure import SparkProcParamContext spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate() spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Reading the IN and INOUT parameter values. int = spark_proc_param_context.int dt = spark_proc_param_context.datetime print("IN parameter value: ", int, ", INOUT parameter value: ", dt) # Returning the value of the OUT and INOUT parameters. spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.b = True spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}] spark_proc_param_context.time = datetime.time(23, 20, 50, 520000) spark_proc_param_context.f = 20.23 spark_proc_param_context.bs = b"hello" spark_proc_param_context.date = datetime.date(1985, 4, 12) spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.js = {"name": "Alice", "age": 30} """;
Ler a partir de uma tabela do Hive Metastore e escrever os resultados no BigQuery
O exemplo seguinte mostra como transformar uma tabela do Hive Metastore e escrever os resultados no BigQuery:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \ .enableHiveSupport() \ .getOrCreate() spark.sql("CREATE DATABASE IF NOT EXISTS records") spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)") spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)") df = spark.sql("SELECT * FROM records.student") df.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("records_dataset.student") """
Veja os filtros de registos
Depois de chamar um procedimento armazenado para o Spark, pode ver as informações de registo. Para obter as informações do filtro do Cloud Logging e o endpoint do cluster do histórico do Spark, use o comando bq
show
.
As informações do filtro estão disponíveis no campo SparkStatistics
da tarefa secundária. Para obter filtros de registos, siga estes passos:
Aceda à página do BigQuery.
No editor de consultas, liste as tarefas secundárias da tarefa de script do procedimento armazenado:
bq ls -j --parent_job_id=$parent_job_id
Para saber como obter o ID da tarefa, consulte o artigo Veja os detalhes da tarefa.
O resultado é semelhante ao seguinte:
jobId Job Type State Start Time Duration ---------------------------------------------- --------- --------- --------------- ---------------- script_job_90fb26c32329679c139befcc638a7e71_0 query SUCCESS 07 Sep 18:00:27 0:05:15.052000
Identifique o
jobId
do procedimento armazenado e use o comandobq show
para ver os detalhes da tarefa:bq show --format=prettyjson --job $child_job_id
Copie o campo
sparkStatistics
porque vai precisar dele noutro passo.O resultado é semelhante ao seguinte:
{ "configuration": {...} … "statistics": { … "query": { "sparkStatistics": { "loggingInfo": { "projectId": "myproject", "resourceType": "myresource" }, "sparkJobId": "script-job-90f0", "sparkJobLocation": "us-central1" }, … } } }
Para o registo, gere filtros de registo com os campos
SparkStatistics
:resource.type = sparkStatistics.loggingInfo.resourceType resource.labels.resource_container=sparkStatistics.loggingInfo.projectId resource.labels.spark_job_id=sparkStatistics.sparkJobId resource.labels.location=sparkStatistics.sparkJobLocation
Os registos são escritos no recurso monitorizado
bigquery.googleapis.com/SparkJob
. Os registos são etiquetados pelos componentesINFO
,DRIVER
eEXECUTOR
. Para filtrar registos do controlador do Spark, adicione o componentelabels.component = "DRIVER"
aos filtros de registos. Para filtrar registos do executor do Spark, adicione o componentelabels.component = "EXECUTOR"
aos filtros de registos.
Use a chave de encriptação gerida pelo cliente
O procedimento do BigQuery Spark usa a chave de encriptação gerida pelo cliente (CMEK) para proteger o seu conteúdo, juntamente com a encriptação predefinida fornecida pelo BigQuery. Para usar a CMEK no procedimento do Spark, primeiro, acione a criação da conta de serviço de encriptação do BigQuery e conceda as autorizações necessárias. O procedimento do Spark também suporta as políticas da organização CMEK se forem aplicadas ao seu projeto.
Se o procedimento armazenado estiver a usar o modo de segurança INVOKER
, a CMEK deve ser especificada através da variável do sistema SQL quando chamar o procedimento. Caso contrário, a CMEK pode ser especificada através da ligação associada ao procedimento armazenado.
Para especificar a CMEK através da ligação quando cria um procedimento armazenado do Spark, use o seguinte exemplo de código:
bq mk --connection --connection_type='SPARK' \ --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \ --project_id=PROJECT_ID \ --location=LOCATION \ CONNECTION_NAME
Para especificar a CMEK através da variável do sistema SQL quando chamar o procedimento, use o seguinte exemplo de código:
SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
Use os VPC Service Controls
Os VPC Service Controls permitem-lhe configurar um perímetro seguro para se proteger contra a exfiltração de dados. Para usar os VPC Service Controls com um procedimento do Spark para maior segurança, primeiro crie um perímetro de serviço.
Para proteger totalmente os trabalhos de procedimentos do Spark, adicione as seguintes APIs ao perímetro de serviço:
- API BigQuery (
bigquery.googleapis.com
) - Cloud Logging API (
logging.googleapis.com
) - API Cloud Storage (
storage.googleapis.com
), se usar o Cloud Storage - API Artifact Registry (
artifactregistry.googleapis.com
) ou API Container Registry (containerregistry.googleapis.com
), se usar um contentor personalizado - API Dataproc Metastore (
metastore.googleapis.com
) e API Cloud Run Admin (run.googleapis.com
), se usar o Dataproc Metastore
Adicione o projeto de consulta do procedimento do Spark ao perímetro. Adicione outros projetos que alojam o seu código ou dados do Spark ao perímetro.
Práticas recomendadas
Quando usa uma ligação no seu projeto pela primeira vez, o aprovisionamento demora cerca de mais um minuto. Para poupar tempo, pode reutilizar uma ligação do Spark existente quando cria um procedimento armazenado para o Spark.
Quando cria um procedimento do Spark para utilização em produção, a Google recomenda que especifique uma versão de tempo de execução. Para ver uma lista das versões do tempo de execução suportadas, consulte o artigo Versões do tempo de execução sem servidor para o Apache Spark. Recomendamos que use a versão de apoio técnico a longo prazo (LTS).
Quando especifica um contentor personalizado num procedimento do Spark, recomendamos que use o Artifact Registry e o streaming de imagens.
Para um melhor desempenho, pode especificar propriedades de atribuição de recursos no procedimento do Spark. Os procedimentos armazenados do Spark suportam uma lista de propriedades de atribuição de recursos igual à do Serverless para Apache Spark.
Limitações
- Só pode usar o protocolo de ponto final gRPC para se ligar ao Dataproc Metastore. Outros tipos de Hive Metastore ainda não são suportados.
- As chaves de encriptação geridas pelo cliente (CMEK) só estão disponíveis quando os clientes criam procedimentos Spark de região única. As chaves CMEK da região global e as chaves CMEK multirregionais, por exemplo,
EU
ouUS
, não são suportadas. - A transmissão de parâmetros de saída só é suportada para o PySpark.
- Se o conjunto de dados associado ao procedimento armazenado para o Spark for replicado para uma região de destino através da replicação de conjuntos de dados entre regiões, só é possível consultar o procedimento armazenado na região em que foi criado.
- O Spark não suporta o acesso a pontos finais HTTP na sua rede privada dos VPC Service Controls.
Quotas e limites
Para informações sobre quotas e limites, consulte os procedimentos armazenados para quotas e limites do Spark.
O que se segue?
- Saiba como ver um procedimento armazenado.
- Saiba como eliminar um procedimento armazenado.
- Saiba como trabalhar com um procedimento armazenado de SQL.