Utiliser le métastore BigQuery avec Dataproc sans serveur

.

Ce document explique comment utiliser le métastore BigQuery avec Dataproc sans serveur.

Avant de commencer

  1. Activez la facturation pour votre projet Google Cloud . Découvrez comment vérifier si la facturation est activée sur un projet.
  2. Activez les API BigQuery et Dataproc.

    Activer les API

  3. Facultatif: Découvrez le fonctionnement du métastore BigQuery et pourquoi vous devriez l'utiliser.

Rôles requis

Pour obtenir les autorisations nécessaires pour utiliser Spark et Dataproc sans serveur avec le métastore BigQuery en tant que magasin de métadonnées, demandez à votre administrateur de vous accorder les rôles IAM suivants:

Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.

Vous pouvez également obtenir les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.

Workflow général

Pour utiliser BigQuery avec Dataproc sans serveur, procédez comme suit:

  1. Créez un fichier contenant les commandes que vous souhaitez exécuter dans le métastore BigQuery.
  2. Connectez-vous au moteur logiciel Open Source de votre choix.
  3. Envoyez une job par lot l'aide de la méthode de votre choix, comme Spark SQL ou PySpark.

Connecter le métastore BigQuery à Spark

Les instructions suivantes vous expliquent comment connecter Dataproc sans serveur au métastore BigQuery:

SparkSQL

Pour envoyer une job par lot Spark SQL, procédez comme suit :

  1. Créez un fichier SQL contenant les commandes Spark SQL que vous souhaitez exécuter dans le métastore BigQuery. Par exemple, cette commande crée un espace de noms et une table.

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    Remplacez les éléments suivants :

    • CATALOG_NAME: nom du catalogue qui fait référence à votre table Spark.
    • NAMESPACE_NAME: nom de l'espace de noms qui fait référence à votre table Spark.
    • TABLE_NAME: nom de la table Spark.
    • WAREHOUSE_DIRECTORY: URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.
  2. Envoyez un job par lot Spark SQL en exécutant la commande gcloud CLI gcloud dataproc batches submit spark-sql suivante:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
    --project=PROJECT_ID \
    --region=REGION \
    --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
    --deps-bucket=BUCKET_PATH \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Remplacez les éléments suivants :

    • SQL_SCRIPT_PATH: chemin d'accès au fichier SQL utilisé par la job par lot.
    • PROJECT_ID: ID du projet Google Cloud dans lequel exécuter la job par lot.
    • REGION: région dans laquelle votre charge de travail s'exécute.
    • SUBNET_NAME: facultatif: nom d'un sous-réseau VPC dans REGION pour lequel l'accès privé à Google est activé et qui répond aux autres exigences concernant les sous-réseaux de session.
    • LOCATION: emplacement dans lequel exécuter la job par lot.
    • BUCKET_PATH: emplacement du bucket Cloud Storage pour importer les dépendances de la charge de travail. Le WAREHOUSE_FOLDER se trouve dans ce bucket. Le préfixe d'URI gs:// du compartiment n'est pas obligatoire. Vous pouvez spécifier le chemin d'accès ou le nom du bucket, par exemple mybucketname1.

    Pour en savoir plus sur l'envoi de tâches par lot Spark, consultez Exécuter une charge de travail par lot Spark.

PySpark

Pour envoyer une job par lot PySpark, procédez comme suit :

  1. Créez un fichier Python contenant les commandes PySpark que vous souhaitez exécuter dans le métastore BigQuery.

    Par exemple, la commande suivante configure un environnement Spark pour interagir avec les tables Iceberg stockées dans le métastore BigQuery. La commande crée ensuite un espace de noms et une table Iceberg dans cet espace de noms.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    Remplacez les éléments suivants :

    • PROJECT_ID: ID du projet Google Cloud dans lequel exécuter la job par lot.
    • LOCATION: emplacement des ressources BigQuery.
    • CATALOG_NAME: nom du catalogue qui fait référence à votre table Spark.
    • TABLE_NAME: nom de la table Spark.
    • WAREHOUSE_DIRECTORY: URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.
    • NAMESPACE_NAME: nom de l'espace de noms qui fait référence à votre table Spark.
  2. Envoyez le job par lot à l'aide de la commande gcloud dataproc batches submit pyspark suivante.

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
     --version=2.2 \
     --project=PROJECT_ID \
     --region=REGION \
     --deps-bucket=BUCKET_PATH \
     --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar
     --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Remplacez les éléments suivants :

    • PYTHON_SCRIPT_PATH: chemin d'accès au script Python utilisé par la job par lot.
    • PROJECT_ID: ID du projet Google Cloud dans lequel exécuter la job par lot.
    • REGION: région dans laquelle votre charge de travail s'exécute.
    • BUCKET_PATH: emplacement du bucket Cloud Storage pour importer les dépendances de la charge de travail. Le préfixe d'URI gs:// du compartiment n'est pas obligatoire. Vous pouvez spécifier le chemin d'accès ou le nom du bucket, par exemple mybucketname1.

    Pour en savoir plus sur l'envoi de tâches par lot PySpark, consultez la documentation de référence gcloud PySpark.

Étape suivante