Trabalhe com procedimentos armazenados no Apache Spark

Este documento é destinado a engenheiros, cientistas e analistas de dados para criar e chamar procedimentos armazenados do Spark no BigQuery.

Usando o BigQuery, é possível criar e executar procedimentos armazenados pelo Spark programados em Python, Java e Scala. Em seguida, execute esses procedimentos armazenados no BigQuery usando uma consulta do GoogleSQL, semelhante à execução de procedimentos armazenados SQL.

Antes de começar

Para criar um procedimento armazenado para o Spark, peça ao administrador para criar uma conexão do Spark e compartilhá-la com você. Seu administrador também precisa conceder à conta de serviço associada à conexão as permissões necessárias de gerenciamento de identidade e acesso (IAM, na sigla em inglês).

Funções exigidas

Para receber as permissões necessárias para realizar as tarefas neste documento, peça ao administrador para conceder a você os seguintes papéis do IAM:

Para mais informações sobre como conceder papéis, consulte Gerenciar acesso.

Esses papéis predefinidos contêm as permissões necessárias para executar as tarefas neste documento. Para conferir as permissões exatas necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As permissões a seguir são necessárias para executar as tarefas neste documento:

  • Crie uma conexão:
    • bigquery.connections.create
    • bigquery.connections.list
  • Crie um procedimento armazenado para o Spark:
    • bigquery.routines.create
    • bigquery.connections.delegate
    • bigquery.jobs.create
  • Chamar um procedimento armazenado para o Spark:
    • bigquery.routines.get
    • bigquery.connections.use
    • bigquery.jobs.create

Essas permissões também podem ser concedidas com papéis personalizados ou outros papéis predefinidos.

Consideração de local

É preciso criar um procedimento armazenado para o Spark no mesmo local da sua conexão, porque o procedimento armazenado é executado no mesmo local que a conexão. Por exemplo, para criar um procedimento armazenado na multirregião dos EUA, use uma conexão localizada na multirregião dos EUA.

Preços

  • As cobranças pela execução de procedimentos do Spark no BigQuery são semelhantes às cobranças pela execução de procedimentos do Spark no Dataproc sem servidor. Para mais informações, consulte Preços do Dataproc sem servidor.

  • Os procedimentos armazenados do Spark podem ser usados com o modelo de preços sob demanda e com qualquer uma das edições do BigQuery. Os procedimentos do Spark são cobrados usando o modelo de pagamento por uso da edição BigQuery Enterprise em todos os casos, independentemente do modelo de preços de computação usados em seu projeto.

  • Os procedimentos armazenados do Spark para o BigQuery não são compatíveis com o uso de reservas ou compromissos. As reservas e os compromissos continuam sendo usados para outros procedimentos e consultas com suporte. As cobranças pelo uso dos procedimentos armazenados do Spark são adicionadas à fatura na edição Enterprise, que é o custo de pagamento por uso. Os descontos da sua organização, quando for o caso, serão aplicados.

  • Os procedimentos armazenados do Spark usam um mecanismo de execução do Spark, mas não haverá cobranças separadas para essa execução. Conforme observado, as cobranças correspondentes são informadas como SKU de pagamento por uso da edição Enterprise do BigQuery.

  • Os procedimentos armazenados do Spark não oferecem um nível gratuito.

Criar um procedimento armazenado para o Spark

Crie o procedimento armazenado no mesmo local que a conexão usada.

Se o corpo do procedimento armazenado tiver mais de 1 MB, recomendamos que você coloque seu procedimento armazenado em um arquivo em um bucket do Cloud Storage em vez de usar código in-line. O BigQuery fornece dois métodos para criar um procedimento armazenado para o Spark usando o Python:

Usar o Editor de consultas SQL

Para criar um procedimento armazenado para o Spark no editor de consultas SQL, siga estas etapas:

  1. Acessar a página do BigQuery.

    Acessar o BigQuery

  2. No Editor de consultas, adicione o exemplo de código para a instrução CREATE PROCEDURE que aparece.

    Como alternativa, no painel Explorer, clique na conexão do projeto que você usou para criar o recurso de conexão. Para criar um procedimento armazenado para o Spark, clique em Criar procedimento armazenado.

    Python

    Para criar procedimentos armazenados para o Spark em Python, use o exemplo de código a seguir:

    CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_PYTHON_FILE_URI"]);
     LANGUAGE PYTHON [AS PYSPARK_CODE]
    

    Java ou Scala

    Para criar um procedimento armazenado para Spark em Java ou Scala com a opção main_file_uri, use o seguinte exemplo de código:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_JAR_URI"]);
     LANGUAGE JAVA|SCALA
    

    Para criar um procedimento armazenado para Spark em Java ou Scala com as opções main_class e jar_uris, use o seguinte exemplo de código:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_class=["CLASS_NAME"],
         jar_uris=["URI"]);
     LANGUAGE JAVA|SCALA
    

    Substitua:

    • PROJECT_ID: o projeto em que você quer criar o procedimento armazenado (por exemplo, myproject).
    • DATASET: o conjunto de dados em que você quer criar o procedimento armazenado, por exemplo, mydataset.
    • PROCEDURE_NAME: o nome do procedimento armazenado que você quer executar no BigQuery. Por exemplo, mysparkprocedure
    • PROCEDURE_ARGUMENT: um parâmetro para inserir os argumentos de entrada.

      Nesse parâmetro, especifique os seguintes campos:

      • ARGUMENT_MODE: o modo do argumento.

        Os valores válidos incluem IN, OUT e INOUT. Por padrão, o valor é IN.

      • ARGUMENT_NAME: o nome do argumento.
      • ARGUMENT_TYPE: o tipo do argumento.

      Por exemplo, myproject.mydataset.mysparkproc(num INT64).

      Para mais informações, consulte como transmitir um valor como um parâmetro IN ou os parâmetros OUT e INOUT neste documento de dois minutos.

    • CONNECTION_PROJECT_ID: o projeto que contém a conexão para executar o procedimento do Spark
    • CONNECTION_REGION: a região que contém a conexão para executar o procedimento do Spark, por exemplo, us
    • CONNECTION_ID: o ID da conexão. Por exemplo, myconnection.

      Quando você visualiza os detalhes da conexão no console do Google Cloud, esse é o valor na última seção do ID da conexão totalmente qualificado, mostrado em ID da conexão, por exemplo, projects/myproject/locations/connection_location/connections/myconnection.

    • RUNTIME_VERSION: a versão do ambiente de execução do Spark, por exemplo, 1.1.
    • MAIN_PYTHON_FILE_URI: o caminho para um arquivo PySpark, por exemplo, gs://mybucket/mypysparkmain.py.

      Como alternativa, se você quiser adicionar o corpo do procedimento armazenado na instrução CREATE PROCEDURE, adicione o PYSPARK_CODE após LANGUAGE PYTHON AS, conforme mostrado no exemplo em Use o código in-line neste documento.

    • PYSPARK_CODE: a definição de um aplicativo PySpark na instrução CREATE PROCEDURE se você quiser transmitir o corpo do procedimento in-line.

      O valor é um literal de string. Se o código incluir aspas e barras invertidas, elas precisarão de escape ou representados como uma string bruta. Por exemplo, o código de retorno "\n"; pode ser representado como um dos seguintes:

      • String entre aspas: "return \"\\n\";". Aspas e barras invertidas são escapadas.
      • String entre três aspas: """return "\\n";""". As barras invertidas são escapadas, enquanto as aspas não.
      • String bruta: r"""return "\n";""". Não é necessário escape.
      Para saber como adicionar código in-line do PySpark, consulte Usar código in-line.
    • MAIN_JAR_URI: o caminho do arquivo JAR que contém a classe main, por exemplo, gs://mybucket/my_main.jar.
    • CLASS_NAME: o nome totalmente qualificado de uma classe em um conjunto JAR com a opção jar_uris. Por exemplo, com.example.wordcount.
    • URI: o caminho do arquivo JAR que contém a classe especificada na classe main, por exemplo, gs://mybucket/mypysparkmain.jar.

    Para conferir outras opções que você pode especificar em OPTIONS, consulte a lista de opções de procedimentos.

Usar o editor do PySpark

Ao criar um procedimento usando o editor do PySpark, não é necessário usar a instrução CREATE PROCEDURE. Em vez disso, adicione o código Python diretamente no editor Pyspark e salve ou execute o código.

Para criar um procedimento armazenado para o Spark no editor do PySpark, siga estas etapas:

  1. Acessar a página do BigQuery.

    Acessar o BigQuery

  2. Se você quiser digitar o código do PySpark diretamente, abra o editor do PySpark. Para abrir o editor do PySpark, clique no menu ao lado de Criar consulta SQL e selecione Criar PySpark Procedimento.

  3. Para definir opções, clique em Mais > Opções do PySpark e faça o seguinte:

    1. Especifique o local em que você quer executar o código do PySpark.

    2. No campo Conexão, especifique a conexão Spark.

    3. Na seção Invocação do procedimento armazenado, especifique o conjunto de dados em que você quer armazenar os procedimentos armazenados temporários que foram gerados. É possível definir um conjunto de dados específico ou permitir o uso de um conjunto de dados temporário para invocar o código do PySpark.

      O conjunto de dados temporário é gerado com o local especificado na etapa anterior. Se um nome de conjunto de dados for especificado, verifique se o conjunto de dados e a conexão Spark precisam estar no mesmo local.

    4. Na seção Parâmetros, defina parâmetros para o procedimento armazenado. O valor do parâmetro é usado apenas durante execuções em sessão do código PySpark, mas a declaração em si é armazenada no procedimento.

    5. Na seção Opções avançadas, especifique as opções de procedimento. Para uma lista detalhada das opções de procedimento, consulte a lista de opções de procedimento.

    6. Na seção Propriedades, adicione os pares de chave-valor para configurar o job. Use qualquer um dos pares de chave-valor das propriedades Spark sem servidor do Dataproc.

    7. Em Configurações da conta de serviço, especifique a conta de serviço personalizada, a CMEK, o conjunto de dados de teste e a pasta de preparo do Cloud Storage que serão usadas durante as execuções da sessão do código do PySpark.

    8. Clique em Salvar.

Salvar um procedimento armazenado para o Spark

Depois de criar o procedimento armazenado usando o editor do PySpark, será possível salvar o procedimento armazenado. Para fazer isso, siga estas etapas:

  1. No Console do Google Cloud, acesse a página BigQuery.

    Acessar o BigQuery

  2. No editor de consultas, crie um procedimento armazenado para o Spark usando o Python com o editor do PySpark.

  3. Clique em Salvar > Salvar procedimento.

  4. Na caixa de diálogo Salvar procedimento armazenado, especifique o nome do conjunto de dados em que você quer armazenar o procedimento armazenado e o nome do procedimento armazenado.

  5. Clique em Salvar.

    Se você quiser apenas executar o código PySpark em vez de salvá-lo como um procedimento armazenado, clique em Executar em vez de Salvar.

Usar contêineres personalizados

O contêiner personalizado fornece o ambiente de execução para os processos de driver e executor da carga de trabalho. Para usar contêineres personalizados, use o seguinte exemplo de código:

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

Substitua:

  • PROJECT_ID: o projeto em que você quer criar o procedimento armazenado (por exemplo, myproject).
  • DATASET: o conjunto de dados em que você quer criar o procedimento armazenado, por exemplo, mydataset.
  • PROCEDURE_NAME: o nome do procedimento armazenado que você quer executar no BigQuery. Por exemplo, mysparkprocedure
  • PROCEDURE_ARGUMENT: um parâmetro para inserir os argumentos de entrada.

    Nesse parâmetro, especifique os seguintes campos:

    • ARGUMENT_MODE: o modo do argumento.

      Os valores válidos incluem IN, OUT e INOUT. Por padrão, o valor é IN.

    • ARGUMENT_NAME: o nome do argumento.
    • ARGUMENT_TYPE: o tipo do argumento.

    Por exemplo, myproject.mydataset.mysparkproc(num INT64).

    Para mais informações, consulte como transmitir um valor como um parâmetro IN ou os parâmetros OUT e INOUT neste documento.

  • CONNECTION_PROJECT_ID: o projeto que contém a conexão para executar o procedimento do Spark
  • CONNECTION_REGION: a região que contém a conexão para executar o procedimento do Spark, por exemplo, us
  • CONNECTION_ID: o ID da conexão. Por exemplo, myconnection.

    Ao ver os detalhes da conexão no console do Google Cloud, o ID da conexão é o valor na última seção do ID da conexão totalmente qualificado, mostrado em ID da conexão, por exemplo, projects/myproject/locations/connection_location/connections/myconnection.

  • RUNTIME_VERSION: a versão do ambiente de execução do Spark, por exemplo, 1.1.
  • MAIN_PYTHON_FILE_URI: o caminho para um arquivo PySpark, por exemplo, gs://mybucket/mypysparkmain.py.

    Como alternativa, se você quiser adicionar o corpo do procedimento armazenado na instrução CREATE PROCEDURE, adicione o PYSPARK_CODE após LANGUAGE PYTHON AS, conforme mostrado no exemplo em Use o código in-line neste documento.

  • PYSPARK_CODE: a definição de um aplicativo PySpark na instrução CREATE PROCEDURE se você quiser transmitir o corpo do procedimento in-line.

    O valor é um literal de string. Se o código incluir aspas e barras invertidas, elas precisarão de escape ou ser representadas como uma string bruta. Por exemplo, o código de retorno "\n"; pode ser representado como um dos seguintes:

    • String entre aspas: "return \"\\n\";". Aspas e barras invertidas são escapadas.
    • String entre três aspas: """return "\\n";""". As barras invertidas são escapadas, enquanto as aspas não.
    • String bruta: r"""return "\n";""". Não é necessário escape.
    Para saber como adicionar código in-line do PySpark, consulte Usar código in-line.
  • CONTAINER_IMAGE: caminho da imagem no Artifacts Registry. Ele precisa conter apenas bibliotecas a serem usadas no procedimento. Se não for especificada, a imagem do contêiner padrão do sistema associada à versão do ambiente de execução será usada.

Para mais informações sobre como criar uma imagem de contêiner personalizada com o Spark, consulte Criar uma imagem de contêiner personalizada.

Chamar um procedimento armazenado para o Spark

Depois de criar um procedimento armazenado, você pode chamá-lo usando uma das seguintes opções:

Console

  1. Acessar a página do BigQuery.

    Acessar o BigQuery

  2. No painel Explorer, expanda seu projeto e selecione o procedimento armazenado para o Spark que você quer executar.

  3. Na janela Informações do procedimento armazenado, clique em Invocar o procedimento armazenado. Também é possível expandir a opção Exibir ações e clicar em Invocar.

  4. Clique em Executar.

  5. Na seção Todos os resultados, clique em Exibir resultados.

  6. Opcional: na seção Resultados da consulta, siga estas etapas:

    • Se você quiser conferir os registros do driver do Spark, clique em Detalhes da execução.

    • Se você quiser consultar registros no Cloud Logging, clique em Informações do job e, no campo Registro, clique em log

    • Se você quiser acessar o endpoint do servidor de histórico do Spark, clique em Informações do job e, em seguida, em Servidor de histórico do Spark.

SQL

Para chamar um procedimento armazenado, use a instrução CALL PROCEDURE:

  1. No Console do Google Cloud, acesse a página BigQuery.

    Ir para o BigQuery

  2. No editor de consultas, digite a seguinte instrução:

    CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
    

  3. Clique em Executar.

Para mais informações sobre como executar consultas, acesse Executar uma consulta interativa.

Usar uma conta de serviço personalizada

Em vez de usar a identidade de serviço da conexão Spark para acessar dados, é possível usar uma conta de serviço personalizada para acessar dados no seu código Spark.

Para usar uma conta de serviço personalizada, especifique o modo de segurança INVOKER (usando a instrução EXTERNAL SECURITY INVOKER) ao criar um procedimento armazenado do Spark e especifique a conta de serviço ao invocar o procedimento armazenado.

Se quiser acessar e usar o código Spark do Cloud Storage, você precisará conceder as permissões necessárias para a identificação de serviço da conexão Spark. Você precisa conceder à conta de serviço da conexão a permissão storage.objects.get do IAM ou o papel storage.objectViewer do IAM.

Opcionalmente, conceda à conta de serviço da conexão acesso ao Metastore do Dataproc e ao Servidor de histórico permanente do Dataproc, se você os tiver especificado na conexão. Para mais informações, consulte Conceder acesso à conta de serviço.

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  EXTERNAL SECURITY INVOKER
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Opcionalmente, você pode adicionar os seguintes argumentos ao código anterior:

SET @@spark_proc_properties.staging_bucket='BUCKET_NAME';
SET @@spark_proc_properties.staging_dataset_id='DATASET';

Substitua:

  • CUSTOM_SERVICE_ACCOUNT: obrigatório. Uma conta de serviço personalizada fornecida por você.
  • BUCKET_NAME: opcional. O bucket do Cloud Storage usado como o sistema de arquivos padrão do aplicativo Spark. Se isso não for fornecido, um bucket padrão do Cloud Storage será criado no seu projeto e compartilhado por todos os jobs em execução no mesmo projeto.
  • DATASET: opcional. O conjunto de dados para armazenar os dados temporários produzidos pela invocação do procedimento. Os dados são limpos após a conclusão do job. Se esse valor não for fornecido, um conjunto de dados temporário padrão será criado para o job.

Sua conta de serviço personalizada precisa ter as seguintes permissões:

  • Para ler e gravar no bucket de preparo usado como sistema de arquivos padrão do aplicativo Spark:

    • storage.objects.* ou o papel do IAM roles/storage.objectAdmin no bucket de preparo que você especificar.
    • Além disso, as permissões storage.buckets.* ou o papel do IAM roles/storage.Admin no projeto se o bucket de preparo não estiver especificado.
  • (Opcional) Para ler e gravar dados no BigQuery:

    • bigquery.tables.* nas tabelas do BigQuery.
    • bigquery.readsessions.* no seu projeto.
    • O papel roles/bigquery.admin do IAM inclui as permissões anteriores.
  • (Opcional) Para ler e gravar dados no Cloud Storage e no Cloud Storage:

    • storage.objects.* ou o papel do IAM roles/storage.objectAdmin nos objetos do Cloud Storage.
  • (Opcional) Para ler e gravar no conjunto de dados de preparo usado para os parâmetros INOUT/OUT:

    • bigquery.tables.* ou roles/bigquery.dataEditor no conjunto de dados de preparo especificado.
    • Além disso, a permissão bigquery.datasets.create ou o papel do IAM roles/bigquery.dataEditor no projeto se o conjunto de dados de teste não for especificado.

Exemplos de procedimentos armazenados para o Spark

Nesta seção, mostramos exemplos de como criar um procedimento armazenado para o Apache Spark.

Usar um arquivo PySpark ou JAR no Cloud Storage

No exemplo a seguir, mostramos como criar um procedimento armazenado para o Spark usando a conexão my-project-id.us.my-connection e um arquivo PySpark ou JAR armazenado em um bucket do Cloud Storage:

Python

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py")
LANGUAGE PYTHON

Java ou Scala

Use main_file_uri para criar um procedimento armazenado:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar")
LANGUAGE SCALA

Use main_class para criar um procedimento armazenado:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1",
main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"])
LANGUAGE SCALA

Usar código in-line

O exemplo a seguir mostra como criar um procedimento armazenado para o Spark usando a conexão my-project-id.us.my-connection e o código in-line PySpark:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

Transmitir um valor como parâmetro de entrada

Os exemplos a seguir mostram os dois métodos para transmitir um valor como parâmetro de entrada em Python:

Método 1: usar variáveis de ambiente

No código do PySpark, é possível conseguir os parâmetros de entrada do procedimento armazenado para o Spark por meio de variáveis de ambiente no driver e nos executores do Spark. O nome da variável de ambiente tem o formato BIGQUERY_PROC_PARAM.PARAMETER_NAME, em que PARAMETER_NAME é o nome do parâmetro de entrada. Por exemplo, se o nome do parâmetro de entrada for var, o nome da variável de ambiente correspondente é BIGQUERY_PROC_PARAM.var. Os parâmetros de entrada são codificados em JSON. No código do PySpark, é possível receber o valor do parâmetro de entrada em uma string JSON da variável de ambiente e decodificá-lo para uma variável Python.

O exemplo a seguir mostra como receber o valor de um parâmetro de entrada do tipo INT64 no código do PySpark:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
import os
import json

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
sc = spark.sparkContext

# Get the input parameter num in JSON string and convert to a Python variable
num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))

"""

Método 2: usar uma biblioteca integrada

No código do PySpark, basta importar uma biblioteca integrada e usá-la para preencher todos os tipos de parâmetros. Para transmitir os parâmetros para os executores, preencha os parâmetros em um driver do Spark como variáveis do Python e transmita os valores para os executores. A biblioteca integrada é compatível com a maioria dos tipos de dados do BigQuery, exceto INTERVAL, GEOGRAPHY, NUMERIC e BIGNUMERIC.

Tipo de dados do BigQuery Tipo de dados Python
BOOL bool
STRING str
FLOAT64 float
INT64 int
BYTES bytes
DATE datetime.date
TIMESTAMP datetime.datetime
TIME datetime.time
DATETIME datetime.datetime
Array Array
Struct Struct
JSON Object
NUMERIC Sem suporte
BIGNUMERIC Não compatível
INTERVAL Não compatível
GEOGRAPHY Sem suporte

O exemplo a seguir mostra como importar a biblioteca integrada e usá-la para preencher um parâmetro de entrada do tipo INT64 e um parâmetro de entrada do tipo ARRAY<STRUCT<a INT64, b STRING>> no código do PySpark:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
from bigquery.spark.procedure import SparkProcParamContext

def check_in_param(x, num):
  return x['a'] + num

def main():
  spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
  sc=spark.sparkContext
  spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

  # Get the input parameter num of type INT64
  num = spark_proc_param_context.num

  # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>>
  info = spark_proc_param_context.info

  # Pass the parameter to executors
  df = sc.parallelize(info)
  value = df.map(lambda x : check_in_param(x, num)).sum()

main()
"""

No código Java ou Scala, é possível receber os parâmetros de entrada do procedimento armazenado para o Spark por meio de variáveis de ambiente no driver e nos executores do Spark. O nome da variável de ambiente tem o formato BIGQUERY_PROC_PARAM.PARAMETER_NAME, em que PARAMETER_NAME é o nome do parâmetro de entrada. Por exemplo, se o nome do parâmetro de entrada for var, o nome da variável de ambiente correspondente é BIGQUERY_PROC_PARAM.var. No código Java ou Scala, é possível receber o valor do parâmetro de entrada da variável de ambiente.

O exemplo a seguir mostra como receber o valor de um parâmetro de entrada de variáveis de ambiente para o código Scala:

val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get

O exemplo a seguir mostra como receber parâmetros de entrada de variáveis de ambiente no código Java:

String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");

Transmitir valores como parâmetros OUT e INOUT

Os parâmetros de saída retornam o valor do procedimento do Spark, enquanto o parâmetro INOUT aceita um valor do procedimento e retorna um valor do procedimento. Para usar os parâmetros OUT e INOUT, adicione a palavra-chave OUT ou INOUT antes do nome do parâmetro ao criar o procedimento do Spark. No código do PySpark, use a biblioteca integrada para retornar um valor como um parâmetro OUT ou INOUT. Assim como os parâmetros de entrada, a biblioteca integrada é compatível com a maioria dos tipos de dados do BigQuery, exceto INTERVAL, GEOGRAPHY, NUMERIC e BIGNUMERIC. Os valores dos tipos TIME e DATETIME são convertidos para o fuso horário UTC ao retornar como os parâmetros OUT ou INOUT.


CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON)
WITH CONNECTION `my_bq_project.my_dataset.my_connection`
OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS
R"""
from pyspark.sql.session import SparkSession
import datetime
from bigquery.spark.procedure import SparkProcParamContext

spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate()
spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

# Reading the IN and INOUT parameter values.
int = spark_proc_param_context.int
dt = spark_proc_param_context.datetime
print("IN parameter value: ", int, ", INOUT parameter value: ", dt)

# Returning the value of the OUT and INOUT parameters.
spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.b = True
spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}]
spark_proc_param_context.time = datetime.time(23, 20, 50, 520000)
spark_proc_param_context.f = 20.23
spark_proc_param_context.bs = b"hello"
spark_proc_param_context.date = datetime.date(1985, 4, 12)
spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.js = {"name": "Alice", "age": 30}
""";

Ler de uma tabela do Hive Metastore e gravar resultados no BigQuery

No exemplo a seguir, mostramos como transformar uma tabela do Hive Metastore e gravar os resultados no BigQuery:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \
   .enableHiveSupport() \
   .getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS records")

spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)")

spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)")

df = spark.sql("SELECT * FROM records.student")

df.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("records_dataset.student")
"""

Ver filtros de registro

Depois de chamar um procedimento armazenado para o Spark, você poderá exibir as informações do registro. Para acessar as informações de filtro do Cloud Logging e o endpoint do cluster de histórico do Spark, use o comando bq show. As informações do filtro estão disponíveis no campo SparkStatistics do job filho. Para receber filtros de registro, siga estas etapas:

  1. Acessar a página do BigQuery.

    Ir para o BigQuery

  2. No editor de consultas, liste os jobs filhos do job de script do procedimento armazenado:

    bq ls -j --parent_job_id=$parent_job_id
    

    Para saber como conseguir o ID do job, consulte Exibir detalhes do job.

    O resultado será assim:

                    jobId                         Job Type     State       Start Time         Duration
    ---------------------------------------------- ---------   ---------  ---------------  ----------------
    script_job_90fb26c32329679c139befcc638a7e71_0   query      SUCCESS   07 Sep 18:00:27   0:05:15.052000
    
  3. Identifique o jobId do procedimento armazenado e use o comando bq show para visualizar os detalhes do job:

    bq show --format=prettyjson --job $child_job_id
    

    Copie o campo sparkStatistics, porque você precisará dele em outra etapa.

    O resultado será assim:

    {
    "configuration": {...}
    …
    "statistics": {
     …
      "query": {
        "sparkStatistics": {
          "loggingInfo": {
            "projectId": "myproject",
            "resourceType": "myresource"
          },
          "sparkJobId": "script-job-90f0",
          "sparkJobLocation": "us-central1"
        },
        …
      }
    }
    }
    

  4. Para o Logging, gere filtros de registro com os campos SparkStatistics:

    resource.type = sparkStatistics.loggingInfo.resourceType
    resource.labels.resource_container=sparkStatistics.loggingInfo.projectId
    resource.labels.spark_job_id=sparkStatistics.sparkJobId
    resource.labels.location=sparkStatistics.sparkJobLocation
    
    

    Os registros são gravados no recurso monitorado bigquery.googleapis.com/SparkJob. Os registros são rotulados pelos componentes INFO, DRIVER e EXECUTOR. Para filtrar registros do driver do Spark, adicione o componente labels.component = "DRIVER" aos filtros de registro. Para filtrar registros do executor do Spark, adicione o componente labels.component = "EXECUTOR" aos filtros de registro.

Usar a chave de criptografia gerenciada pelo cliente

O procedimento do BigQuery para Spark usa a chave de criptografia gerenciada pelo cliente (CMEK) para proteger seu conteúdo, com a criptografia padrão fornecida pelo BigQuery. Para usar a CMEK no procedimento do Spark, primeiro acione a criação da conta de serviço de criptografia do BigQuery e conceda as permissões necessárias. O procedimento do Spark também oferece suporte às políticas da organização de CMEK, se elas forem aplicadas ao seu projeto.

Se o procedimento armazenado estiver usando o modo de segurança INVOKER, a CMEK precisará ser especificada pela variável de sistema SQL ao chamar o procedimento. Caso contrário, a CMEK poderá ser especificada pela conexão associada ao procedimento armazenado.

Para especificar a CMEK por meio da conexão ao criar um procedimento armazenado do Spark, use o seguinte exemplo de código:

bq mk --connection --connection_type='SPARK' \
 --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \
 --project_id=PROJECT_ID \
 --location=LOCATION \
 CONNECTION_NAME

Para especificar a CMEK por meio da variável de sistema SQL ao chamar o procedimento, use este exemplo de código:

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME;
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Usar o VPC Service Controls

O VPC Service Controls permite configurar um perímetro seguro para evitar a exfiltração de dados. Para usar o VPC Service Controls com um procedimento do Spark para aumentar a segurança, primeiro crie um perímetro de serviço.

Para proteger totalmente os jobs do procedimento Spark, adicione as seguintes APIs ao perímetro de serviço:

  • API BigQuery (bigquery.googleapis.com)
  • API Cloud Logging (logging.googleapis.com)
  • API Cloud Storage (storage.googleapis.com), se você usa o Cloud Storage
  • API Artifact Registry (artifactregistry.googleapis.com) ou API Container Registry (containerregistry.googleapis.com), se você usar um contêiner personalizado
  • API Dataproc Metastore (metastore.googleapis.com) e API Cloud Run Admin (run.googleapis.com), se você usar o metastore do Dataproc

Adicione o projeto de consulta do procedimento do Spark ao perímetro. Adicione outros projetos que hospedam seus dados ou código Spark no perímetro.

Práticas recomendadas

  • Quando você usa a conexão no projeto pela primeira vez, o provisionamento leva cerca de um minuto a mais. Para economizar tempo, é possível reutilizar uma conexão Spark ao criar um procedimento armazenado para o Spark.

  • Ao criar um procedimento do Spark para uso em produção, o Google recomenda especificar uma versão de ambiente de execução. Para ver uma lista de versões de ambiente de execução compatíveis, consulte Versões de ambiente de execução sem servidor do Dataproc. Recomendamos usar a versão com suporte de longo tempo (LTS).

  • Ao especificar um contêiner personalizado em um procedimento do Spark, recomendamos usar o Artifact Registry e o streaming de imagens.

  • Para um melhor desempenho, é possível especificar propriedades de alocação de recursos no procedimento do Spark. Os procedimentos armazenados pelo Spark são compatíveis com uma lista de propriedades de alocação de recursos, da mesma forma que o Dataproc sem servidor.

Limitações

Cotas e limites

Para saber mais sobre cotas e limites, consulte procedimentos armazenados para cotas e limites do Spark.

A seguir