Usar a metastore do BigQuery com o Dataproc Serverless

Neste documento, explicamos como usar o metastore do BigQuery com o Dataproc Serverless.

Antes de começar

  1. Ative o faturamento do seu Google Cloud projeto. Saiba como verificar se o faturamento está ativado em um projeto.
  2. Ative as APIs BigQuery e Dataproc.

    Ativar as APIs

  3. Opcional: entenda como a metastore do BigQuery funciona e por que você deve usá-la.

Funções exigidas

Para receber as permissões necessárias para usar o Spark e o Dataproc Serverless com o metastore do BigQuery como uma loja de metadados, peça ao administrador para conceder a você os seguintes papéis do IAM:

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.

Fluxo de trabalho geral

Para usar o BigQuery com o Dataproc Serverless, siga estas etapas gerais:

  1. Crie um arquivo com os comandos que você quer executar na metastore do BigQuery.
  2. Conecte-se ao mecanismo de software de código aberto de sua preferência.
  3. Envie um job em lote usando o método de sua preferência, como o Spark SQL ou o PySpark.

Conectar a metastore do BigQuery ao Spark

As instruções a seguir mostram como conectar o Dataproc Serverless à metastore do BigQuery:

SparkSQL

Para enviar um job em lote do Spark SQL, siga estas etapas.

  1. Crie um arquivo SQL com os comandos do Spark SQL que você quer executar na metastore do BigQuery. Por exemplo, este comando cria um namespace e uma tabela.

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    Substitua:

    • CATALOG_NAME: o nome do catálogo que faz referência à sua tabela do Spark.
    • NAMESPACE_NAME: o nome do namespace que faz referência à tabela do Spark.
    • TABLE_NAME: um nome de tabela para a tabela do Spark.
    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage em que o data warehouse está armazenado.
  2. Envie um job em lote do Spark SQL executando o seguinte comando da CLI gcloud gcloud dataproc batches submit spark-sql:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
    --project=PROJECT_ID \
    --region=REGION \
    --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
    --deps-bucket=BUCKET_PATH \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Substitua:

    • SQL_SCRIPT_PATH: o caminho para o arquivo SQL que o job em lote usa.
    • PROJECT_ID: o ID do projeto Google Cloud para executar o job em lote.
    • REGION: a região em que a carga de trabalho é executada.
    • SUBNET_NAME: opcional: o nome de uma sub-rede da VPC no REGION que tem o Acesso privado do Google ativado e atende a outros requisitos de sub-rede de sessão.
    • LOCATION: o local em que o job em lote será executado.
    • BUCKET_PATH: o local do bucket do Cloud Storage para fazer upload das dependências da carga de trabalho. O WAREHOUSE_FOLDER está localizado neste bucket. O prefixo URI gs:// do bucket não é necessário. É possível especificar o caminho ou o nome do bucket, por exemplo, mybucketname1.

    Para mais informações sobre como enviar jobs em lote do Spark, consulte Executar uma carga de trabalho em lote do Spark.

PySpark

Para enviar um job em lote do PySpark, siga estas etapas.

  1. Crie um arquivo Python com os comandos PySpark que você quer executar na metastore do BigQuery.

    Por exemplo, o comando a seguir configura um ambiente do Spark para interagir com tabelas do Iceberg armazenadas na metastore do BigQuery. Em seguida, o comando cria um novo namespace e uma tabela Iceberg dentro dele.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    Substitua:

    • PROJECT_ID: o ID do projeto Google Cloud para executar o job em lote.
    • LOCATION: o local em que os recursos do BigQuery estão localizados.
    • CATALOG_NAME: o nome do catálogo que faz referência à sua tabela do Spark.
    • TABLE_NAME: um nome de tabela para a tabela do Spark.
    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage em que o data warehouse está armazenado.
    • NAMESPACE_NAME: o nome do namespace que faz referência à tabela do Spark.
  2. Envie o job em lote usando o comando gcloud dataproc batches submit pyspark a seguir.

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
     --version=2.2 \
     --project=PROJECT_ID \
     --region=REGION \
     --deps-bucket=BUCKET_PATH \
     --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar
     --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Substitua:

    • PYTHON_SCRIPT_PATH: o caminho para o script Python usado pelo job em lote.
    • PROJECT_ID: o ID do projeto Google Cloud para executar o job em lote.
    • REGION: a região em que a carga de trabalho é executada.
    • BUCKET_PATH: o local do bucket do Cloud Storage para fazer upload das dependências da carga de trabalho. O prefixo URI gs:// do bucket não é obrigatório. É possível especificar o caminho ou o nome do bucket, por exemplo, mybucketname1.

    Para mais informações sobre como enviar jobs em lote do PySpark, consulte a referência do PySpark para a gcloud.

A seguir