Usar a metastore do BigQuery com procedimentos armazenados do Spark

Este documento explica como usar os procedimentos armazenados do Apache Spark com a metastore do BigQuery.

Antes de começar

  1. Ative o faturamento do seu Google Cloud projeto. Saiba como verificar se o faturamento está ativado em um projeto.
  2. Ative as APIs BigQuery e Dataflow.

    Ativar as APIs

  3. Opcional: saiba mais sobre o seguinte:

Funções exigidas

Para usar os procedimentos armazenados do Spark, revise os papéis necessários para procedimentos armazenados e conceda os papéis necessários.

Para receber as permissões necessárias para usar o Spark e os procedimentos armazenados com a metastore do BigQuery como uma loja de metadados, peça ao administrador para conceder a você os seguintes papéis do IAM:

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.

Criar e executar um procedimento armazenado

O exemplo a seguir mostra como criar e executar um procedimento armazenado com a metastore do BigQuery.

  1. Acessar a página do BigQuery.

    Ir para o BigQuery

  2. No editor de consultas, adicione o seguinte código de exemplo para a instrução CREATE PROCEDURE.

    CREATE OR REPLACE PROCEDURE
    `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`()
    WITH CONNECTION `PROJECT_ID.REGION.SPARK_CONNECTION_ID` OPTIONS (engine='SPARK',
    runtime_version='1.1',
    properties=[("spark.sql.catalog.CATALOG_NAME.warehouse",
    "WAREHOUSE_DIRECTORY"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_location",
    "LOCATION"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_project",
    "PROJECT_ID"),
    ("spark.sql.catalog.CATALOG_NAME",
    "org.apache.iceberg.spark.SparkCatalog"),
    ("spark.sql.catalog.CATALOG_NAME.catalog-impl",
    "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"),
    ("spark.jars.packages",
    "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2")],
    jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar"])
    LANGUAGE python AS R"""
    from pyspark.sql import SparkSession
    spark = SparkSession \
    .builder \
    .appName("BigQuery metastore Iceberg") \
    .getOrCreate()
    spark.sql("USE CATALOG_NAME;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'")
    spark.sql("DESCRIBE TABLE_NAME;")
    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\");")
    spark.sql("SELECT * from TABLE_NAME;")
    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);")
    spark.sql("DESCRIBE TABLE_NAME;")
    """;
    CALL `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`();

    Substitua:

    • PROJECT_ID: o ID do seu projeto Google Cloud .
    • BQ_DATASET_ID: o ID do conjunto de dados no BigQuery que contém o procedimento.
    • PROCEDURE_NAME: o nome do procedimento que você está criando ou substituindo.
    • REGION: o local da sua conexão do Spark.
    • LOCATION: o local dos seus recursos do BigQuery.
    • SPARK_CONNECTION_ID: o ID da sua conexão do Spark.
    • CATALOG_NAME: o nome do catálogo que você está usando.
    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage que contém o data warehouse.
    • NAMESPACE_NAME: o namespace que você está usando.

A seguir