Use o metastore do BigLake com o Serverless para Apache Spark

Este documento explica como usar o metastore do BigLake com o Serverless para Apache Spark.

Antes de começar

  1. Ative a faturação para o seu Google Cloud projeto. Saiba como verificar se a faturação está ativada num projeto.
  2. Ative as APIs BigQuery e Dataproc.

    Ative as APIs

  3. Opcional: compreenda como funciona o metastore do BigLake e por que motivo o deve usar.

Funções necessárias

Para receber as autorizações de que precisa para usar o Spark e o Serverless para Apache Spark com o metastore do BigLake como um repositório de metadados, peça ao seu administrador para lhe conceder as seguintes funções do IAM:

Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.

Também pode conseguir as autorizações necessárias através de funções personalizadas ou outras funções predefinidas.

Fluxo de trabalho geral

Para usar o BigQuery com o Serverless para Apache Spark, siga estes passos gerais:

  1. Crie um ficheiro com os comandos que quer executar no metastore do BigLake.
  2. Estabeleça ligação ao motor de software de código aberto da sua escolha.
  3. Envie uma tarefa em lote através do método à sua escolha, como o Spark SQL ou o PySpark.

Associe o metastore do BigLake ao Spark

As instruções seguintes mostram como ligar o Serverless para Apache Spark ao metastore do BigLake:

SparkSQL

Para enviar uma tarefa em lote do Spark SQL, conclua os seguintes passos.

  1. Crie um ficheiro SQL com os comandos SQL do Spark que quer executar no metastore do BigLake. Por exemplo, este comando cria um espaço de nomes e uma tabela.

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    Substitua o seguinte:

    • CATALOG_NAME: o nome do catálogo que faz referência à sua tabela do Spark.
    • NAMESPACE_NAME: o nome do espaço de nomes que faz referência à sua tabela do Spark.
    • TABLE_NAME: um nome de tabela para a sua tabela do Spark.
    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage onde o seu data warehouse está armazenado.
  2. Envie uma tarefa em lote do Spark SQL executando o seguinte comando da CLI gcloud:gcloud dataproc batches submit spark-sql

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
    --project=PROJECT_ID \
    --region=REGION \
    --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
    --deps-bucket=BUCKET_PATH \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Substitua o seguinte:

    • SQL_SCRIPT_PATH: o caminho para o ficheiro SQL que a tarefa em lote usa.
    • PROJECT_ID: o ID do Google Cloud projeto no qual executar o trabalho em lote.
    • REGION: a região onde a sua carga de trabalho é executada.
    • SUBNET_NAME: opcional: o nome de uma sub-rede da VPC no REGION que tem o acesso privado da Google ativado e cumpre outros requisitos da sub-rede de sessão.
    • LOCATION: a localização onde executar a tarefa em lote.
    • BUCKET_PATH: a localização do contentor do Cloud Storage para carregar dependências da carga de trabalho. O ficheiro WAREHOUSE_FOLDER está localizado neste contentor. O prefixo de URI gs:// do contentor não é obrigatório. Pode especificar o caminho do contentor ou o nome do contentor, por exemplo, mybucketname1.

    Para mais informações sobre o envio de tarefas em lote do Spark, consulte o artigo Execute uma carga de trabalho em lote do Spark.

PySpark

Para enviar uma tarefa em lote do PySpark, conclua os seguintes passos.

  1. Crie um ficheiro Python com os comandos PySpark que quer executar no metastore do BigLake.

    Por exemplo, o comando seguinte configura um ambiente Spark para interagir com tabelas Iceberg armazenadas no metastore do BigLake. Em seguida, o comando cria um novo espaço de nomes e uma tabela Iceberg nesse espaço de nomes.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    Substitua o seguinte:

    • PROJECT_ID: o ID do Google Cloud projeto no qual executar o trabalho em lote.
    • LOCATION: a localização onde os recursos do BigQuery estão localizados.
    • CATALOG_NAME: o nome do catálogo que faz referência à sua tabela do Spark.
    • TABLE_NAME: um nome de tabela para a sua tabela do Spark.
    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage onde o seu data warehouse está armazenado.
    • NAMESPACE_NAME: o nome do espaço de nomes que faz referência à sua tabela do Spark.
  2. Envie a tarefa em lote através do seguinte comando gcloud dataproc batches submit pyspark.

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
     --version=2.2 \
     --project=PROJECT_ID \
     --region=REGION \
     --deps-bucket=BUCKET_PATH \
     --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Substitua o seguinte:

    • PYTHON_SCRIPT_PATH: o caminho para o script Python que a tarefa em lote usa.
    • PROJECT_ID: o ID do Google Cloud projeto no qual executar o trabalho em lote.
    • REGION: a região onde a sua carga de trabalho é executada.
    • BUCKET_PATH: a localização do contentor do Cloud Storage para carregar dependências da carga de trabalho. O prefixo de URI gs:// do contentor não é obrigatório. Pode especificar o caminho ou o nome do contentor, por exemplo, mybucketname1.

    Para mais informações sobre o envio de tarefas em lote do PySpark, consulte a referência do gcloud do PySpark.

O que se segue?