Usar a metastore do BigQuery com o Dataproc sem servidor

Neste documento, explicamos como usar a metastore do BigQuery com o Dataproc Serverless.

Antes de começar

  1. Ative o faturamento para o projeto do Google Cloud. Saiba como verificar se o faturamento está ativado em um projeto.
  2. Ative as APIs BigQuery e Dataproc.

    Ativar as APIs

  3. Opcional: entenda como a metastore do BigQuery funciona e por que você deve usá-la.

Funções exigidas

Para receber as permissões necessárias para usar o Spark e o Dataproc Serverless com a metastore do BigQuery como uma metastore, peça ao administrador para conceder a você os seguintes papéis do IAM:

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.

Fluxo de trabalho geral

Para usar o BigQuery com o Dataproc sem servidor, siga estas etapas gerais:

  1. Crie um arquivo com os comandos que você quer executar na metastore do BigQuery.
  2. Conecte-se ao mecanismo de software de código aberto de sua escolha.
  3. Envie um job em lote usando o método de sua preferência, como Spark SQL ou PySpark.

Conectar a metastore do BigQuery ao Spark

As instruções a seguir mostram como conectar o Dataproc Serverless ao metastore do BigQuery:

SparkSQL

Para enviar um job em lote do Spark SQL, siga estas etapas.

  1. Crie um arquivo SQL com os comandos do Spark SQL que você quer executar na metastore do BigQuery. Por exemplo, este comando cria um namespace e uma tabela.

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    Substitua:

    • CATALOG_NAME: o nome do catálogo que faz referência à sua tabela do Spark.
    • NAMESPACE_NAME: o nome do namespace que faz referência à tabela do Spark.
    • TABLE_NAME: um nome de tabela para a tabela do Spark.
    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage em que o data warehouse é armazenado.
  2. Envie um job em lote do Spark SQL executando o seguinte comando da CLI gcloud gcloud dataproc batches submit spark-sql:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
    --project=PROJECT_ID \
    --region=REGION \
    --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
    --deps-bucket=BUCKET_PATH \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Substitua:

    • SQL_SCRIPT_PATH: o caminho para o arquivo SQL que o job em lote usa.
    • PROJECT_ID: o ID do projeto do Google Cloud em que o job em lote será executado.
    • REGION: a região em que a carga de trabalho é executada.
    • SUBNET_NAME: opcional: o nome de uma sub-rede VPC no REGION que tem o Acesso privado do Google ativado e atende a outros requisitos de sub-rede de sessão.
    • LOCATION: o local para executar o job em lote.
    • BUCKET_PATH: o local do bucket do Cloud Storage para fazer upload das dependências de carga de trabalho. O WAREHOUSE_FOLDER está localizado neste bucket. O prefixo URI gs:// do bucket não é obrigatório. É possível especificar o caminho ou o nome do bucket, por exemplo, mybucketname1.

    Para mais informações sobre como enviar jobs em lote do Spark, consulte Executar uma carga de trabalho em lote do Spark.

PySpark

Para enviar um job em lote do PySpark, siga estas etapas.

  1. Crie um arquivo Python com os comandos do PySpark que você quer executar na metastore do BigQuery.

    Por exemplo, o comando abaixo configura um ambiente do Spark para interagir com as tabelas do Iceberg armazenadas na metastore do BigQuery. O comando, em seguida, cria um novo namespace e uma tabela Iceberg dentro dele.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que o job em lote será executado.
    • LOCATION: o local em que os recursos do BigQuery estão localizados.
    • CATALOG_NAME: o nome do catálogo que faz referência à sua tabela do Spark.
    • TABLE_NAME: um nome de tabela para a tabela do Spark.
    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage em que o data warehouse é armazenado.
    • NAMESPACE_NAME: o nome do namespace que faz referência à tabela do Spark.
  2. Envie o job em lote usando o comando gcloud dataproc batches submit pyspark abaixo.

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
     --version=2.2 \
     --project=PROJECT_ID \
     --region=REGION \
     --deps-bucket=BUCKET_PATH \
     --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar
     --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Substitua:

    • PYTHON_SCRIPT_PATH: o caminho para o script Python usado pelo job em lote.
    • PROJECT_ID: o ID do projeto do Google Cloud em que o job em lote será executado.
    • REGION: a região em que a carga de trabalho é executada.
    • BUCKET_PATH: o local do bucket do Cloud Storage para fazer upload das dependências de carga de trabalho. O prefixo URI gs:// do bucket não é obrigatório. É possível especificar o caminho ou o nome do bucket, por exemplo, mybucketname1.

    Para mais informações sobre como enviar jobs em lote do PySpark, consulte a referência do PySpark para a gcloud.

A seguir