Use o metastore do BigLake com procedimentos armazenados do Spark

Este documento explica como usar procedimentos armazenados do Apache Spark com o metastore do BigLake.

Antes de começar

  1. Ative a faturação para o seu Google Cloud projeto. Saiba como verificar se a faturação está ativada num projeto.
  2. Ative as APIs BigQuery e Dataflow.

    Ative as APIs

  3. Opcional: saiba mais sobre o seguinte:

Funções necessárias

Para usar procedimentos armazenados do Spark, reveja as funções necessárias para procedimentos armazenados e conceda as funções necessárias.

Para receber as autorizações de que precisa para usar o Spark e os procedimentos armazenados com o metastore do BigLake como uma loja de metadados, peça ao seu administrador para lhe conceder as seguintes funções de IAM:

Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.

Também pode conseguir as autorizações necessárias através de funções personalizadas ou outras funções predefinidas.

Crie e execute um procedimento armazenado

O exemplo seguinte mostra como criar e executar um procedimento armazenado com o metastore do BigLake.

  1. Aceda à página do BigQuery.

    Aceda ao BigQuery

  2. No editor de consultas, adicione o seguinte código de amostra para a declaração CREATE PROCEDURE.

    CREATE OR REPLACE PROCEDURE
    `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`()
    WITH CONNECTION `PROJECT_ID.REGION.SPARK_CONNECTION_ID` OPTIONS (engine='SPARK',
    runtime_version='1.1',
    properties=[("spark.sql.catalog.CATALOG_NAME.warehouse",
    "WAREHOUSE_DIRECTORY"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_location",
    "LOCATION"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_project",
    "PROJECT_ID"),
    ("spark.sql.catalog.CATALOG_NAME",
    "org.apache.iceberg.spark.SparkCatalog"),
    ("spark.sql.catalog.CATALOG_NAME.catalog-impl",
    "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"),
    ("spark.jars.packages",
    "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1")],
    jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar"])
    LANGUAGE python AS R"""
    from pyspark.sql import SparkSession
    spark = SparkSession \
    .builder \
    .appName("BigLake Metastore Iceberg") \
    .getOrCreate()
    spark.sql("USE CATALOG_NAME;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'")
    spark.sql("DESCRIBE TABLE_NAME;")
    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\");")
    spark.sql("SELECT * from TABLE_NAME;")
    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);")
    spark.sql("DESCRIBE TABLE_NAME;")
    """;
    CALL `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`();

    Substitua o seguinte:

    • PROJECT_ID: o ID do seu projeto Google Cloud .
    • BQ_DATASET_ID: o ID do conjunto de dados no BigQuery que contém o procedimento.
    • PROCEDURE_NAME: o nome do procedimento que está a criar ou substituir.
    • REGION: a localização da sua ligação Spark.
    • LOCATION: a localização dos seus recursos do BigQuery.
    • SPARK_CONNECTION_ID: o ID da sua ligação do Spark.
    • CATALOG_NAME: o nome do catálogo que está a usar.
    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage que contém o seu armazém de dados.
    • NAMESPACE_NAME: o espaço de nomes que está a usar.

O que se segue?