Trabalhe com procedimentos armazenados para o Apache Spark

Este documento destina-se a engenheiros, cientistas e analistas de dados para criar e chamar procedimentos armazenados para o Spark no BigQuery.

Com o BigQuery, pode criar e executar procedimentos armazenados do Spark escritos em Python, Java e Scala. Em seguida, pode executar estes procedimentos armazenados no BigQuery através de uma consulta GoogleSQL, semelhante à execução de procedimentos armazenados SQL.

Antes de começar

Para criar um procedimento armazenado para o Spark, peça ao seu administrador para criar uma ligação ao Spark e partilhá-la consigo. O administrador também tem de conceder à conta de serviço associada à ligação as autorizações de gestão de identidade e de acesso (IAM) necessárias.

Funções necessárias

Para receber as autorizações de que precisa para realizar as tarefas neste documento, peça ao seu administrador para lhe conceder as seguintes funções do IAM:

Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.

Estas funções predefinidas contêm as autorizações necessárias para realizar as tarefas descritas neste documento. Para ver as autorizações exatas que são necessárias, expanda a secção Autorizações necessárias:

Autorizações necessárias

São necessárias as seguintes autorizações para realizar as tarefas neste documento:

  • Crie uma associação:
    • bigquery.connections.create
    • bigquery.connections.list
  • Crie um procedimento armazenado para o Spark:
    • bigquery.routines.create
    • bigquery.connections.delegate
    • bigquery.jobs.create
  • Chame um procedimento armazenado para o Spark:
    • bigquery.routines.get
    • bigquery.connections.use
    • bigquery.jobs.create

Também pode conseguir estas autorizações com funções personalizadas ou outras funções predefinidas.

Consideração da localização

Tem de criar um procedimento armazenado para o Spark na mesma localização que a sua ligação , uma vez que o procedimento armazenado é executado na mesma localização que a ligação. Por exemplo, para criar um procedimento armazenado na multirregião dos EUA, usa uma ligação localizada na multirregião dos EUA.

Preços

  • As cobranças pela execução de procedimentos do Spark no BigQuery são semelhantes às cobranças pela execução de procedimentos do Spark no Serverless para Apache Spark. Para mais informações, consulte os preços do Serverless para Apache Spark.

  • Os procedimentos armazenados do Spark podem ser usados com o modelo de preços a pedido, bem como com qualquer uma das edições do BigQuery. Os procedimentos do Spark são cobrados através do modelo de pagamento conforme o uso da BigQuery Enterprise Edition em todos os casos, independentemente do modelo de preços de computação usado no seu projeto.

  • Os procedimentos armazenados do Spark para o BigQuery não suportam a utilização de reservas nem compromissos. As reservas e os compromissos existentes continuam a ser usados para outras consultas e procedimentos suportados. As cobranças pela utilização de procedimentos armazenados do Spark são adicionadas à sua fatura ao custo da edição Enterprise de pagamento conforme a utilização. Os descontos da sua organização são aplicados, quando aplicável.

  • Embora os procedimentos armazenados do Spark usem um motor de execução do Spark, não verá cobranças separadas pela execução do Spark. Conforme indicado, os custos correspondentes são comunicados como SKU de pagamento conforme a utilização da edição Enterprise do BigQuery.

  • Os procedimentos armazenados do Spark não oferecem um nível gratuito.

Crie um procedimento armazenado para o Spark

Tem de criar o procedimento armazenado na mesma localização que a ligação que usa.

Se o corpo do procedimento armazenado for superior a 1 MB, recomendamos que coloque o procedimento armazenado num ficheiro num contentor do Cloud Storage em vez de usar código inline. O BigQuery oferece dois métodos para criar um procedimento armazenado para o Spark com Python:

Use o editor de consultas SQL

Para criar um procedimento armazenado para o Spark no editor de consultas SQL, siga estes passos:

  1. Aceda à página do BigQuery.

    Aceda ao BigQuery

  2. No editor de consultas, adicione o exemplo de código para a declaração CREATE PROCEDURE apresentada.

    Em alternativa, no painel Explorador, clique na ligação no projeto que usou para criar o recurso de ligação. Em seguida, para criar um procedimento armazenado para o Spark, clique em Criar procedimento armazenado.

    Python

    Para criar um procedimento armazenado para o Spark em Python, use o seguinte exemplo de código:

    CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_PYTHON_FILE_URI"]);
     LANGUAGE PYTHON [AS PYSPARK_CODE]
    

    Java ou Scala

    Para criar um procedimento armazenado para o Spark em Java ou Scala com a opção main_file_uri, use o seguinte código de exemplo:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_JAR_URI"]);
     LANGUAGE JAVA|SCALA
    

    Para criar um procedimento armazenado para o Spark em Java ou Scala com as opções main_class e jar_uris, use o seguinte código de exemplo:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_class=["CLASS_NAME"],
         jar_uris=["URI"]);
     LANGUAGE JAVA|SCALA
    

    Substitua o seguinte:

    • PROJECT_ID: o projeto no qual quer criar o procedimento armazenado. Por exemplo, myproject.
    • DATASET: o conjunto de dados no qual quer criar o procedimento armazenado, por exemplo, mydataset.
    • PROCEDURE_NAME: o nome do procedimento armazenado que quer executar no BigQuery. Por exemplo, mysparkprocedure.
    • PROCEDURE_ARGUMENT: um parâmetro para introduzir os argumentos de entrada.

      Neste parâmetro, especifique os seguintes campos:

      • ARGUMENT_MODE: o modo do argumento.

        Os valores válidos incluem IN, OUT e INOUT. Por predefinição, o valor é IN.

      • ARGUMENT_NAME: o nome do argumento.
      • ARGUMENT_TYPE: o tipo do argumento.

      Por exemplo: myproject.mydataset.mysparkproc(num INT64).

      Para mais informações, consulte como transmitir um valor como um parâmetro IN ou os parâmetros OUT e INOUT neste documento.

    • CONNECTION_PROJECT_ID: o projeto que contém a associação para executar o procedimento do Spark.
    • CONNECTION_REGION: a região que contém a ligação para executar o procedimento do Spark, por exemplo, us.
    • CONNECTION_ID: o ID de ligação, por exemplo, myconnection.

      Quando vê os detalhes da associação na consola Google Cloud , o ID da associação é o valor na última secção do ID da associação totalmente qualificado que é apresentado em ID da associação, por exemplo projects/myproject/locations/connection_location/connections/myconnection.

    • RUNTIME_VERSION: a versão de tempo de execução do Spark, por exemplo, 2.2.
    • MAIN_PYTHON_FILE_URI: o caminho para um ficheiro PySpark, por exemplo, gs://mybucket/mypysparkmain.py.

      Em alternativa, se quiser adicionar o corpo do procedimento armazenado na declaração CREATE PROCEDURE, adicione PYSPARK_CODE após LANGUAGE PYTHON AS, conforme mostrado no exemplo em Usar código inline neste documento.

    • PYSPARK_CODE: a definição de uma aplicação PySpark na declaração CREATE PROCEDURE se quiser transmitir o corpo do procedimento inline.

      O valor é um literal de string. Se o código incluir aspas e barras invertidas, estas têm de ser escapadas ou representadas como uma string bruta. Por exemplo, o código de retorno "\n"; pode ser representado como uma das seguintes opções:

      • String entre aspas: "return \"\\n\";". As aspas e as barras invertidas são interpretadas de forma literal.
      • String entre aspas triplas: """return "\\n";""". As barras invertidas são ignoradas, enquanto as aspas não são.
      • String não processada: r"""return "\n";""". Não é necessário usar carateres de escape.
      Para saber como adicionar código PySpark inline, consulte o artigo Usar código inline.
    • MAIN_JAR_URI: o caminho do ficheiro JAR que contém a classe main, por exemplo, gs://mybucket/my_main.jar.
    • CLASS_NAME: o nome totalmente qualificado de uma classe num conjunto JAR com a opção jar_uris, por exemplo, com.example.wordcount.
    • URI: o caminho do ficheiro JAR que contém a classe especificada na classe main, por exemplo, gs://mybucket/mypysparkmain.jar.

    Para ver opções adicionais que pode especificar em OPTIONS, consulte a lista de opções de procedimentos.

Use o editor do PySpark

Quando cria um procedimento com o editor do PySpark, não precisa de usar a declaração CREATE PROCEDURE. Em alternativa, adicione o código Python diretamente no editor do Pyspark e guarde ou execute o código.

Para criar um procedimento armazenado para o Spark no editor do PySpark, siga estes passos:

  1. Aceda à página do BigQuery.

    Aceda ao BigQuery

  2. Se quiser escrever o código PySpark diretamente, abra o editor do PySpark. Para abrir o editor do PySpark, clique no menu junto a Criar consulta SQL e, de seguida, selecione Criar procedimento do PySpark.

  3. Para definir opções, clique em Mais > Opções do PySpark e, de seguida, faça o seguinte:

    1. Especifique a localização onde quer executar o código PySpark.

    2. No campo Ligação, especifique a ligação do Spark.

    3. Na secção Invocações de procedimentos armazenados, especifique o conjunto de dados no qual quer armazenar os procedimentos armazenados temporários que são gerados. Pode definir um conjunto de dados específico ou permitir a utilização de um conjunto de dados temporário para invocar o código PySpark.

      O conjunto de dados temporário é gerado com a localização especificada no passo anterior. Se for especificado um nome do conjunto de dados, certifique-se de que o conjunto de dados e a ligação do Spark estão na mesma localização.

    4. Na secção Parâmetros, defina os parâmetros para o procedimento armazenado. O valor do parâmetro só é usado durante as execuções na sessão do código PySpark, mas a própria declaração é armazenada no procedimento.

    5. Na secção Opções avançadas, especifique as opções do procedimento. Para uma lista detalhada das opções de procedimento, consulte a lista de opções de procedimento.

    1. Na secção Propriedades, adicione os pares de chave-valor para configurar a tarefa. Pode usar qualquer um dos pares de chave-valor das propriedades do Spark suportadas pelo Serverless para Apache Spark.

    1. Nas Definições da conta de serviço, especifique a conta de serviço personalizada, a CMEK, o conjunto de dados de preparação e a pasta de preparação do Cloud Storage a usar durante as execuções na sessão do código PySpark.

    2. Clique em Guardar.

Guarde um procedimento armazenado para o Spark

Depois de criar o procedimento armazenado com o editor do PySpark, pode guardar o procedimento armazenado. Para o fazer, siga estes passos:

  1. Na Google Cloud consola, aceda à página BigQuery.

    Aceda ao BigQuery

  2. No editor de consultas, crie um procedimento armazenado para o Spark usando Python com o editor PySpark.

  3. Clique em Guardar > Guardar procedimento.

  4. Na caixa de diálogo Guardar procedimento armazenado, especifique o nome do conjunto de dados onde quer armazenar o procedimento armazenado e o nome do procedimento armazenado.

  5. Clique em Guardar.

    Se quiser apenas executar o código PySpark em vez de o guardar como um procedimento armazenado, pode clicar em Executar em vez de Guardar.

Use contentores personalizados

O contentor personalizado fornece o ambiente de tempo de execução para os processos do controlador e do executor da carga de trabalho. Para usar contentores personalizados, use o seguinte exemplo de código:

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

Substitua o seguinte:

  • PROJECT_ID: o projeto no qual quer criar o procedimento armazenado, por exemplo, myproject.
  • DATASET: o conjunto de dados no qual quer criar o procedimento armazenado, por exemplo, mydataset.
  • PROCEDURE_NAME: o nome do procedimento armazenado que quer executar no BigQuery, por exemplo, mysparkprocedure.
  • PROCEDURE_ARGUMENT: um parâmetro para introduzir os argumentos de entrada.

    Neste parâmetro, especifique os seguintes campos:

    • ARGUMENT_MODE: o modo do argumento.

      Os valores válidos incluem IN, OUT e INOUT. Por predefinição, o valor é IN.

    • ARGUMENT_NAME: o nome do argumento.
    • ARGUMENT_TYPE: o tipo do argumento.

    Por exemplo: myproject.mydataset.mysparkproc(num INT64).

    Para mais informações, consulte como transmitir um valor como um parâmetro IN ou os parâmetros OUT e INOUT neste documento.

  • CONNECTION_PROJECT_ID: o projeto que contém a associação para executar o procedimento do Spark.
  • CONNECTION_REGION: a região que contém a ligação para executar o procedimento do Spark, por exemplo, us.
  • CONNECTION_ID: o ID da associação, por exemplo, myconnection.

    Quando vê os detalhes da associação na Google Cloud consola, o ID da associação é o valor na última secção do ID da associação totalmente qualificado que é apresentado em ID da associação, por exemplo, projects/myproject/locations/connection_location/connections/myconnection.

  • RUNTIME_VERSION: a versão de tempo de execução do Spark, por exemplo, 2.2.
  • MAIN_PYTHON_FILE_URI: o caminho para um ficheiro PySpark, por exemplo, gs://mybucket/mypysparkmain.py.

    Em alternativa, se quiser adicionar o corpo do procedimento armazenado na declaração CREATE PROCEDURE, adicione PYSPARK_CODE após LANGUAGE PYTHON AS, conforme mostrado no exemplo em Usar código inline neste documento.

  • PYSPARK_CODE: a definição de uma aplicação PySpark na declaração CREATE PROCEDURE se quiser transmitir o corpo do procedimento inline.

    O valor é um literal de string. Se o código incluir aspas e barras invertidas, estas têm de ser escapadas ou representadas como uma string não processada. Por exemplo, o código de retorno "\n"; pode ser representado como uma das seguintes opções:

    • String entre aspas: "return \"\\n\";". As aspas e as barras invertidas são interpretadas de forma literal.
    • String entre aspas triplas: """return "\\n";""". As barras invertidas são ignoradas, enquanto as aspas não são.
    • String não processada: r"""return "\n";""". Não é necessário usar carateres de escape.
    Para saber como adicionar código PySpark inline, consulte o artigo Usar código inline.
  • CONTAINER_IMAGE: caminho da imagem no registo de artefactos. Só pode conter bibliotecas para usar no seu procedimento. Se não for especificado, é usada a imagem do contentor predefinida do sistema associada à versão de tempo de execução.

Para mais informações sobre como criar uma imagem de contentor personalizada com o Spark, consulte o artigo Crie uma imagem de contentor personalizada.

Chame um procedimento armazenado para o Spark

Depois de criar um procedimento armazenado, pode chamá-lo através de uma das seguintes opções:

Consola

  1. Aceda à página do BigQuery.

    Aceda ao BigQuery

  2. No painel Explorador, expanda o projeto e selecione o procedimento armazenado para o Spark que quer executar.

  3. Na janela Informações do procedimento armazenado, clique em Invocar procedimento armazenado. Em alternativa, pode expandir a opção Ver ações e clicar em Invocar.

  4. Clique em Executar.

  5. Na secção Todos os resultados, clique em Ver resultados.

  6. Opcional: na secção Resultados da consulta, siga estes passos:

    • Se quiser ver os registos do controlador do Spark, clique em Detalhes da execução.

    • Se quiser ver os registos no Cloud Logging, clique em Informações da tarefa e, de seguida, no campo Registo, clique em registo.

    • Se quiser obter o ponto final do servidor do histórico do Spark, clique em Informações da tarefa e, de seguida, em Servidor do histórico do Spark.

SQL

Para chamar um procedimento armazenado, use a declaração CALL PROCEDURE:

  1. Na Google Cloud consola, aceda à página BigQuery.

    Aceda ao BigQuery

  2. No editor de consultas, introduza a seguinte declaração:

    CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()

  3. Clique em Executar.

Para mais informações sobre como executar consultas, consulte o artigo Execute uma consulta interativa.

Use uma conta de serviço personalizada

Em vez de usar a identidade de serviço da ligação Spark para aceder aos dados, pode usar uma conta de serviço personalizada para aceder aos dados no seu código Spark.

Para usar uma conta de serviço personalizada, especifique o INVOKERmodo de segurança (usando a declaração EXTERNAL SECURITY INVOKER) quando criar um procedimento armazenado do Spark e especifique a conta de serviço quando invocar o procedimento armazenado.

Quando executa o procedimento armazenado do Spark com a conta de serviço personalizada pela primeira vez, o BigQuery cria um agente de serviço do Spark e concede ao agente de serviço as autorizações necessárias. Certifique-se de que não modifica esta concessão antes de invocar o procedimento armazenado do Spark. Para saber mais detalhes, consulte o artigo Agente do serviço BigQuery Spark.

Se quiser aceder e usar código Spark a partir do Cloud Storage, tem de conceder as autorizações necessárias à identidade do serviço da ligação Spark. Tem de conceder à conta de serviço da ligação a autorização storage.objects.getIAM ou a função storage.objectViewerIAM.

Opcionalmente, pode conceder à conta de serviço da ligação acesso ao Dataproc Metastore e ao servidor de histórico persistente do Dataproc se os tiver especificado na ligação. Para mais informações, consulte o artigo Conceda acesso à conta de serviço.

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  EXTERNAL SECURITY INVOKER
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Opcionalmente, pode adicionar os seguintes argumentos ao código anterior:

SET @@spark_proc_properties.staging_bucket='BUCKET_NAME';
SET @@spark_proc_properties.staging_dataset_id='DATASET';

Substitua o seguinte:

  • CUSTOM_SERVICE_ACCOUNT: obrigatório. Uma conta de serviço personalizada fornecida por si.
  • BUCKET_NAME: opcional. O contentor do Cloud Storage que é usado como o sistema de ficheiros da aplicação Spark predefinido. Se não for fornecido, é criado um contentor do Cloud Storage predefinido no seu projeto, e o contentor é partilhado por todas as tarefas executadas no mesmo projeto.
  • DATASET: opcional. O conjunto de dados para armazenar os dados temporários produzidos ao invocar o procedimento. Os dados são limpos após a conclusão da tarefa. Se não for fornecido, é criado um conjunto de dados temporário predefinido para a tarefa.

A sua conta de serviço personalizada tem de ter as seguintes autorizações:

  • Para ler e escrever no contentor de preparação usado como o sistema de ficheiros da aplicação Spark predefinido:

    • storage.objects.* ou a função de IAM roles/storage.objectAdmin no contentor de preparação que especificar.
    • Além disso, as autorizações storage.buckets.* ou a função de IAM roles/storage.Admin no projeto se o contentor de preparação não for especificado.
  • (Opcional) Para ler e escrever dados do e para o BigQuery:

    • bigquery.tables.* nas tabelas do BigQuery.
    • bigquery.readsessions.* no projeto.
    • A função de IAM roles/bigquery.admin inclui as autorizações anteriores.
  • (Opcional) Para ler e escrever dados de e para o Cloud Storage:

    • storage.objects.* ou a função do IAM roles/storage.objectAdmin nos seus objetos do Cloud Storage.
  • (Opcional) Para ler e escrever no conjunto de dados de preparação usado para parâmetros INOUT/OUT:

    • Função de IAM bigquery.tables.* ou roles/bigquery.dataEditor no conjunto de dados de preparação especificado.
    • Além disso, a autorização bigquery.datasets.create ou a função de IAM roles/bigquery.dataEditor no projeto se o conjunto de dados de preparação não for especificado.

Exemplos de procedimentos armazenados para o Spark

Esta secção mostra exemplos de como pode criar um procedimento armazenado para o Apache Spark.

Use um ficheiro PySpark ou JAR no Cloud Storage

O exemplo seguinte mostra como criar um procedimento armazenado para o Spark usando a ligação my-project-id.us.my-connection e um ficheiro PySpark ou JAR armazenado num contentor do Cloud Storage:

Python

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="2.2", main_file_uri="gs://my-bucket/my-pyspark-main.py")
LANGUAGE PYTHON

Java ou Scala

Use main_file_uri para criar um procedimento armazenado:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="2.2", main_file_uri="gs://my-bucket/my-scala-main.jar")
LANGUAGE SCALA

Use main_class para criar um procedimento armazenado:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="2.2",
main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"])
LANGUAGE SCALA

Use código inline

O exemplo seguinte mostra como criar um procedimento armazenado para o Spark através da ligação my-project-id.us.my-connection e do código PySpark inline:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="2.2")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

Transmita um valor como um parâmetro de entrada

Os exemplos seguintes apresentam os dois métodos para transmitir um valor como um parâmetro de entrada em Python:

Método 1: use variáveis de ambiente

No código PySpark, pode obter os parâmetros de entrada do procedimento armazenado para o Spark através de variáveis de ambiente no controlador e nos executores do Spark. O nome da variável de ambiente tem o formato de BIGQUERY_PROC_PARAM.PARAMETER_NAME, em que PARAMETER_NAME é o nome do parâmetro de entrada. Por exemplo, se o nome do parâmetro de entrada for var, o nome da variável de ambiente correspondente é BIGQUERY_PROC_PARAM.var. Os parâmetros de entrada são codificados em JSON. No seu código PySpark, pode obter o valor do parâmetro de entrada numa string JSON da variável de ambiente e descodificá-lo para uma variável Python.

O exemplo seguinte mostra como obter o valor de um parâmetro de entrada do tipo INT64 no seu código PySpark:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="2.2")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
import os
import json

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
sc = spark.sparkContext

# Get the input parameter num in JSON string and convert to a Python variable
num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))

"""

Método 2: use uma biblioteca integrada

No código PySpark, pode simplesmente importar uma biblioteca integrada e usá-la para preencher todos os tipos de parâmetros. Para transmitir os parâmetros aos executores, preencha os parâmetros num controlador do Spark como variáveis Python e transmita os valores aos executores. A biblioteca incorporada suporta a maioria dos tipos de dados do BigQuery, exceto INTERVAL, GEOGRAPHY, NUMERIC e BIGNUMERIC.

Tipo de dados do BigQuery Tipo de dados Python
BOOL bool
STRING str
FLOAT64 float
INT64 int
BYTES bytes
DATE datetime.date
TIMESTAMP datetime.datetime
TIME datetime.time
DATETIME datetime.datetime
Array Array
Struct Struct
JSON Object
NUMERIC Não suportado
BIGNUMERIC Não suportado
INTERVAL Não suportado
GEOGRAPHY Não suportado

O exemplo seguinte mostra como importar a biblioteca integrada e usá-la para preencher um parâmetro de entrada do tipo INT64 e um parâmetro de entrada do tipo ARRAY<STRUCT<a INT64, b STRING>> no seu código PySpark:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="2.2")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
from bigquery.spark.procedure import SparkProcParamContext

def check_in_param(x, num):
  return x['a'] + num

def main():
  spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
  sc=spark.sparkContext
  spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

  # Get the input parameter num of type INT64
  num = spark_proc_param_context.num

  # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>>
  info = spark_proc_param_context.info

  # Pass the parameter to executors
  df = sc.parallelize(info)
  value = df.map(lambda x : check_in_param(x, num)).sum()

main()
"""

No código Java ou Scala, pode obter os parâmetros de entrada do procedimento armazenado para o Spark através de variáveis de ambiente no controlador e nos executores do Spark. O nome da variável de ambiente tem o formato BIGQUERY_PROC_PARAM.PARAMETER_NAME, em que PARAMETER_NAME é o nome do parâmetro de entrada. Por exemplo, se o nome do parâmetro de entrada for var, o nome da variável de ambiente correspondente é BIGQUERY_PROC_PARAM.var. No seu código Java ou Scala, pode obter o valor do parâmetro de entrada da variável de ambiente.

O exemplo seguinte mostra como obter o valor de um parâmetro de entrada das variáveis de ambiente no seu código Scala:

val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get

O exemplo seguinte mostra como obter parâmetros de entrada de variáveis de ambiente no seu código Java:

String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");

Transmita valores como parâmetros OUT e INOUT

Os parâmetros de saída devolvem o valor do procedimento do Spark, enquanto o parâmetro INOUT aceita um valor para o procedimento e devolve um valor do procedimento. Para usar os parâmetros OUT e INOUT, adicione a palavra-chave OUT ou INOUT antes do nome do parâmetro quando criar o procedimento do Spark. No código PySpark, usa a biblioteca incorporada para devolver um valor como um parâmetro OUT ou INOUT. Tal como os parâmetros de entrada, a biblioteca incorporada suporta a maioria dos tipos de dados do BigQuery, exceto INTERVAL, GEOGRAPHY, NUMERIC e BIGNUMERIC. Os valores do tipo TIME e DATETIME são convertidos para o fuso horário UTC quando são devolvidos como os parâmetros OUT ou INOUT.

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON)
WITH CONNECTION `my_bq_project.my_dataset.my_connection`
OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS
R"""
from pyspark.sql.session import SparkSession
import datetime
from bigquery.spark.procedure import SparkProcParamContext

spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate()
spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

# Reading the IN and INOUT parameter values.
int = spark_proc_param_context.int
dt = spark_proc_param_context.datetime
print("IN parameter value: ", int, ", INOUT parameter value: ", dt)

# Returning the value of the OUT and INOUT parameters.
spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.b = True
spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}]
spark_proc_param_context.time = datetime.time(23, 20, 50, 520000)
spark_proc_param_context.f = 20.23
spark_proc_param_context.bs = b"hello"
spark_proc_param_context.date = datetime.date(1985, 4, 12)
spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.js = {"name": "Alice", "age": 30}
""";

Ler a partir de uma tabela do Hive Metastore e escrever os resultados no BigQuery

O exemplo seguinte mostra como transformar uma tabela do Hive Metastore e escrever os resultados no BigQuery:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="2.2")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \
   .enableHiveSupport() \
   .getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS records")

spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)")

spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)")

df = spark.sql("SELECT * FROM records.student")

df.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("records_dataset.student")
"""

Veja os filtros de registos

Depois de chamar um procedimento armazenado para o Spark, pode ver as informações de registo. Para obter as informações do filtro do Cloud Logging e o endpoint do cluster do histórico do Spark, use o comando bq show. As informações do filtro estão disponíveis no campo SparkStatistics da tarefa secundária. Para obter filtros de registos, siga estes passos:

  1. Aceda à página do BigQuery.

    Aceda ao BigQuery

  2. No editor de consultas, liste as tarefas secundárias da tarefa de script do procedimento armazenado:

    bq ls -j --parent_job_id=$parent_job_id

    Para saber como obter o ID da tarefa, consulte o artigo Veja os detalhes da tarefa.

    O resultado é semelhante ao seguinte:

                    jobId                         Job Type     State       Start Time         Duration
    ---------------------------------------------- ---------   ---------  ---------------  ----------------
    script_job_90fb26c32329679c139befcc638a7e71_0   query      SUCCESS   07 Sep 18:00:27   0:05:15.052000
  3. Identifique o jobId do procedimento armazenado e use o comando bq show para ver os detalhes da tarefa:

    bq show --format=prettyjson --job $child_job_id

    Copie o campo sparkStatistics porque vai precisar dele noutro passo.

    O resultado é semelhante ao seguinte:

    {
    "configuration": {...}"statistics": {
       "query": {
        "sparkStatistics": {
          "loggingInfo": {
            "projectId": "myproject",
            "resourceType": "myresource"
          },
          "sparkJobId": "script-job-90f0",
          "sparkJobLocation": "us-central1"
        },
          }
    }
    }

  4. Para o registo, gere filtros de registo com os campos SparkStatistics:

    resource.type = sparkStatistics.loggingInfo.resourceType
    resource.labels.resource_container=sparkStatistics.loggingInfo.projectId
    resource.labels.spark_job_id=sparkStatistics.sparkJobId
    resource.labels.location=sparkStatistics.sparkJobLocation

    Os registos são escritos no recurso monitorizado bigquery.googleapis.com/SparkJob. Os registos são etiquetados pelos componentes INFO, DRIVER e EXECUTOR. Para filtrar registos do controlador do Spark, adicione o componente labels.component = "DRIVER" aos filtros de registos. Para filtrar registos do executor do Spark, adicione o componente labels.component = "EXECUTOR" aos filtros de registos.

Use a chave de encriptação gerida pelo cliente

O procedimento do BigQuery Spark usa a chave de encriptação gerida pelo cliente (CMEK) para proteger o seu conteúdo, juntamente com a encriptação predefinida fornecida pelo BigQuery. Para usar a CMEK no procedimento do Spark, primeiro, acione a criação da conta de serviço de encriptação do BigQuery e conceda as autorizações necessárias. O procedimento do Spark também suporta as políticas da organização CMEK se forem aplicadas ao seu projeto.

Se o procedimento armazenado estiver a usar o modo de segurança INVOKER, a CMEK deve ser especificada através da variável do sistema SQL quando chamar o procedimento. Caso contrário, a CMEK pode ser especificada através da ligação associada ao procedimento armazenado.

Para especificar a CMEK através da ligação quando cria um procedimento armazenado do Spark, use o seguinte exemplo de código:

bq mk --connection --connection_type='SPARK' \
 --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \
 --project_id=PROJECT_ID \
 --location=LOCATION \
 CONNECTION_NAME

Para especificar a CMEK através da variável do sistema SQL quando chamar o procedimento, use o seguinte exemplo de código:

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME;
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Use os VPC Service Controls

Os VPC Service Controls permitem-lhe configurar um perímetro seguro para se proteger contra a exfiltração de dados. Para usar os VPC Service Controls com um procedimento do Spark para maior segurança, primeiro crie um perímetro de serviço.

Para proteger totalmente os trabalhos de procedimentos do Spark, adicione as seguintes APIs ao perímetro de serviço:

  • API BigQuery (bigquery.googleapis.com)
  • Cloud Logging API (logging.googleapis.com)
  • API Cloud Storage (storage.googleapis.com), se usar o Cloud Storage
  • API Artifact Registry (artifactregistry.googleapis.com) ou API Container Registry (containerregistry.googleapis.com), se usar um contentor personalizado
  • API Dataproc Metastore (metastore.googleapis.com) e API Cloud Run Admin (run.googleapis.com), se usar o Dataproc Metastore

Adicione o projeto de consulta do procedimento do Spark ao perímetro. Adicione outros projetos que alojam o seu código ou dados do Spark ao perímetro.

Práticas recomendadas

  • Quando usa uma ligação no seu projeto pela primeira vez, o aprovisionamento demora cerca de mais um minuto. Para poupar tempo, pode reutilizar uma ligação do Spark existente quando cria um procedimento armazenado para o Spark.

  • Quando cria um procedimento do Spark para utilização em produção, a Google recomenda que especifique uma versão de tempo de execução. Para ver uma lista das versões do tempo de execução suportadas, consulte o artigo Versões do tempo de execução sem servidor para o Apache Spark. Recomendamos que use a versão de apoio técnico a longo prazo (LTS).

  • Quando especifica um contentor personalizado num procedimento do Spark, recomendamos que use o Artifact Registry e o streaming de imagens.

  • Para um melhor desempenho, pode especificar propriedades de atribuição de recursos no procedimento do Spark. Os procedimentos armazenados do Spark suportam uma lista de propriedades de atribuição de recursos igual à do Serverless para Apache Spark.

Limitações

  • Só pode usar o protocolo de ponto final gRPC para se ligar ao Dataproc Metastore. Outros tipos de Hive Metastore ainda não são suportados.
  • As chaves de encriptação geridas pelo cliente (CMEK) só estão disponíveis quando os clientes criam procedimentos Spark de região única. As chaves CMEK da região global e as chaves CMEK multirregionais, por exemplo, EU ou US, não são suportadas.
  • A transmissão de parâmetros de saída só é suportada para o PySpark.
  • Se o conjunto de dados associado ao procedimento armazenado para o Spark for replicado para uma região de destino através da replicação de conjuntos de dados entre regiões, só é possível consultar o procedimento armazenado na região em que foi criado.
  • O Spark não suporta o acesso a pontos finais HTTP na sua rede privada dos VPC Service Controls.

Quotas e limites

Para informações sobre quotas e limites, consulte os procedimentos armazenados para quotas e limites do Spark.

O que se segue?