Configurer BigLake Metastore

Ce document explique comment configurer BigLake Metastore avec Dataproc ou Google Cloud sans serveur pour Apache Spark afin de créer un metastore unique et partagé qui fonctionne avec les moteurs Open Source, tels qu'Apache Spark ou Apache Flink.

Avant de commencer

  1. Activez la facturation pour votre projet Google Cloud . Découvrez comment vérifier si la facturation est activée sur un projet.
  2. Activez les API BigQuery et Dataproc.

    Activer les API

  3. (Facultatif) Découvrez le fonctionnement de BigLake Metastore et pourquoi vous devriez l'utiliser.

Rôles requis

Pour obtenir les autorisations nécessaires pour configurer le metastore BigLake, demandez à votre administrateur de vous accorder les rôles IAM suivants :

Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Vous pouvez également obtenir les autorisations requises avec des rôles personnalisés ou d'autres rôles prédéfinis.

Configurer votre metastore avec Dataproc

Vous pouvez configurer BigLake Metastore avec Dataproc à l'aide de Spark ou de Flink :

Spark

  1. Configurez un nouveau cluster. Pour créer un cluster Dataproc, exécutez la commande gcloud dataproc clusters create suivante, qui contient les paramètres à utiliser pour le metastore BigLake :

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=LOCATION \
        --single-node

    Remplacez les éléments suivants :

    • CLUSTER_NAME : nom de votre cluster Dataproc.
    • PROJECT_ID : ID du Google Cloud projet dans lequel vous créez le cluster.
    • LOCATION : région Compute Engine dans laquelle vous créez le cluster.
  2. Envoyez un job Spark à l'aide de l'une des méthodes suivantes :

    Google Cloud CLI

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
        --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
        --execute="SPARK_SQL_COMMAND"

    Remplacez les éléments suivants :

    • PROJECT_ID : ID du Google Cloud projet contenant le cluster Dataproc.
    • CLUSTER_NAME : nom du cluster Dataproc que vous utilisez pour exécuter le job Spark SQL.
    • REGION : région Compute Engine dans laquelle se trouve votre cluster.
    • LOCATION : emplacement des ressources BigQuery.
    • CATALOG_NAME : nom du catalogue Spark à utiliser avec votre job SQL.
    • WAREHOUSE_DIRECTORY : dossier Cloud Storage contenant votre entrepôt de données. Cette valeur commence par gs://.
    • SPARK_SQL_COMMAND : requête SparkSQL que vous souhaitez exécuter. Cette requête inclut les commandes permettant de créer vos ressources. Par exemple, pour créer un espace de noms et une table.

    CLI spark-sql

    1. Dans la console Google Cloud , accédez à la page Instances de VM.

      Accéder à la page "Instances de VM"

    2. Pour vous connecter à une instance de VM Dataproc, cliquez sur SSH sur la ligne qui liste le nom de l'instance de VM principale du cluster Dataproc, qui correspond au nom du cluster suivi du suffixe -m. Le résultat ressemble à ce qui suit :

      Connected, host fingerprint: ssh-rsa ...
      Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
      ...
      example-cluster@cluster-1-m:~$
      
    3. Dans le terminal, exécutez la commande d'initialisation du metastore BigLake suivante :

      spark-sql \
          --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
          --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
          --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

      Remplacez les éléments suivants :

      • CATALOG_NAME : nom du catalogue Spark que vous utilisez avec votre job SQL.
      • PROJECT_ID : ID du projet Google Cloud du catalogue BigLake Metastore auquel votre catalogue Spark est associé.
      • LOCATION : Google Cloud emplacement du metastore BigLake.
      • WAREHOUSE_DIRECTORY : dossier Cloud Storage contenant votre entrepôt de données. Cette valeur commence par gs://.

      Une fois que vous vous êtes connecté au cluster, votre terminal Spark affiche l'invite spark-sql, que vous pouvez utiliser pour envoyer des jobs Spark.

      spark-sql (default)>
      
  1. Créez un cluster Dataproc avec le composant Flink facultatif activé et assurez-vous d'utiliser Dataproc 2.2 ou version ultérieure.
  2. Dans la console Google Cloud , accédez à la page Instances de VM.

    Accéder à la page Instances de VM

  3. Dans la liste des instances de machine virtuelle, cliquez sur SSH pour vous connecter à l'instance de VM du cluster Dataproc principal, qui est listée sous le nom du cluster suivi du suffixe -m.

  4. Configurez le plug-in de catalogue personnalisé Iceberg pour BigLake Metastore :

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. Démarrez la session Flink sur YARN :

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
    -s yarn-session
  6. Créez un catalogue dans Flink :

    CREATE CATALOG CATALOG_NAME WITH (
    'type'='iceberg',
    'warehouse'='WAREHOUSE_DIRECTORY',
    'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
    'gcp_project'='PROJECT_ID',
    'gcp_location'='LOCATION'
    );

    Remplacez les éléments suivants :

    • CATALOG_NAME : identifiant du catalogue Flink, associé à un catalogue BigLake Metastore.
    • WAREHOUSE_DIRECTORY : chemin de base du répertoire de l'entrepôt (dossier Cloud Storage dans lequel Flink crée des fichiers). Cette valeur commence par gs://.
    • PROJECT_ID : ID du projet du catalogue BigLake Metastore auquel le catalogue Flink est associé.
    • LOCATION : emplacement des ressources BigQuery.

Votre session Flink est désormais connectée au metastore BigLake. Vous pouvez exécuter des commandes Flink SQL.

Maintenant que vous êtes connecté à BigLake Metastore, vous pouvez créer et afficher des ressources en fonction des métadonnées stockées dans BigLake Metastore.

Par exemple, essayez d'exécuter les commandes suivantes dans votre session Flink SQL interactive pour créer une base de données et une table Iceberg.

  1. Utilisez le catalogue Iceberg personnalisé :

    USE CATALOG CATALOG_NAME;

    Remplacez CATALOG_NAME par l'identifiant de votre catalogue Flink.

  2. Créez une base de données, ce qui crée un ensemble de données dans BigQuery :

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    Remplacez DATABASE_NAME par le nom de votre nouvelle base de données.

  3. Utilisez la base de données que vous avez créée :

    USE DATABASE_NAME;
  4. Créez une table Iceberg. L'exemple suivant crée une table de ventes :

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
      order_number BIGINT,
      price        DECIMAL(32,2),
      buyer        ROW<first_name STRING, last_name STRING>,
      order_time   TIMESTAMP(3)
    );

    Remplacez ICEBERG_TABLE_NAME par le nom de votre nouvelle table.

  5. Afficher les métadonnées de table :

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. Répertoriez les tables de la base de données :

    SHOW TABLES;

Ingérer des données dans votre table

Après avoir créé une table Iceberg dans la section précédente, vous pouvez utiliser Flink DataGen comme source de données pour ingérer des données en temps réel dans votre table. Voici un exemple de ce workflow :

  1. Créez une table temporaire à l'aide de DataGen :

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.order_number.kind' = 'sequence',
      'fields.order_number.start' = '1',
      'fields.order_number.end' = '1000000',
      'fields.price.min' = '0',
      'fields.price.max' = '10000',
      'fields.buyer.first_name.length' = '10',
      'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    Remplacez les éléments suivants :

    • DATABASE_NAME : nom de la base de données dans laquelle stocker votre table temporaire.
    • TEMP_TABLE_NAME : nom de votre table temporaire.
    • ICEBERG_TABLE_NAME : nom de la table Iceberg que vous avez créée dans la section précédente.
  2. Définissez le parallélisme sur 1 :

    SET 'parallelism.default' = '1';
  3. Définissez l'intervalle de point de contrôle :

    SET 'execution.checkpointing.interval' = '10second';
  4. Définissez le point de contrôle :

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. Démarrez le job de streaming en temps réel :

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    Le résultat ressemble à ce qui suit :

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. Pour vérifier l'état du job de streaming, procédez comme suit :

    1. Dans la console Google Cloud , accédez à la page Clusters.

      accéder aux clusters

    2. Sélectionnez votre cluster.

    3. Cliquez sur l'onglet Interfaces Web.

    4. Cliquez sur le lien Gestionnaire de ressources YARN.

    5. Dans l'interface YARN ResourceManager, recherchez votre session Flink, puis cliquez sur le lien ApplicationMaster sous Tracking UI.

    6. Dans la colonne État, vérifiez que l'état de votre job est En cours d'exécution.

  7. Interrogez les données de flux dans le client Flink SQL :

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. Interroger les flux de données dans BigQuery :

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Arrêtez le job de streaming dans le client Flink SQL :

    STOP JOB 'JOB_ID';

    Remplacez JOB_ID par l'ID de tâche qui s'est affiché dans le résultat lorsque vous avez créé la tâche de streaming.

Configurer votre metastore avec Serverless pour Apache Spark

Vous pouvez configurer le metastore BigLake avec Serverless pour Apache Spark à l'aide de Spark SQL ou de PySpark.

Spark SQL

  1. Créez un fichier SQL contenant les commandes Spark SQL que vous souhaitez exécuter dans le metastore BigLake. Par exemple, cette commande crée un espace de noms et une table :

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    Remplacez les éléments suivants :

    • CATALOG_NAME : nom du catalogue qui fait référence à votre table Spark.
    • NAMESPACE_NAME : nom de l'espace de noms qui fait référence à votre table Spark.
    • TABLE_NAME : nom de table pour votre table Spark.
    • WAREHOUSE_DIRECTORY : URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.
  2. Envoyez un job par lot SparkSQL en exécutant la commande gcloud dataproc batches submit spark-sql suivante :

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Remplacez les éléments suivants :

    • SQL_SCRIPT_PATH : chemin d'accès au fichier SQL utilisé par le job par lot.
    • PROJECT_ID : ID du Google Cloud projet dans lequel exécuter le job par lot.
    • REGION : région dans laquelle votre charge de travail est exécutée.
    • SUBNET_NAME (facultatif) : nom d'un sous-réseau VPC dans REGION qui répond aux exigences concernant le sous-réseau de session.
    • BUCKET_PATH : emplacement du bucket Cloud Storage pour importer les dépendances de charge de travail. Le fichier WAREHOUSE_DIRECTORY se trouve dans ce bucket. Le préfixe d'URI gs:// du bucket n'est pas obligatoire. Vous pouvez spécifier le chemin d'accès ou le nom du bucket, par exemple mybucketname1.
    • LOCATION : emplacement dans lequel exécuter le job par lot.

    Pour en savoir plus sur l'envoi de jobs Spark par lot, consultez Exécuter une charge de travail Spark par lot.

PySpark

  1. Créez un fichier Python avec les commandes PySpark que vous souhaitez exécuter dans BigLake Metastore.

    Par exemple, la commande suivante configure un environnement Spark pour interagir avec les tables Iceberg stockées dans le metastore BigLake. La commande crée ensuite un espace de noms et une table Iceberg dans cet espace de noms.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    Remplacez les éléments suivants :

    • PROJECT_ID : ID du Google Cloud projet dans lequel exécuter le job par lot.
    • LOCATION : emplacement des ressources BigQuery.
    • CATALOG_NAME : nom du catalogue qui fait référence à votre table Spark.
    • TABLE_NAME : nom de table pour votre table Spark.
    • WAREHOUSE_DIRECTORY : URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.
    • NAMESPACE_NAME : nom de l'espace de noms qui fait référence à votre table Spark.
  2. Envoyez le job par lot à l'aide de la commande gcloud dataproc batches submit pyspark suivante :

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
        --version=2.2 \
        --project=PROJECT_ID \
        --region=REGION \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Remplacez les éléments suivants :

    • PYTHON_SCRIPT_PATH : chemin d'accès au script Python utilisé par le job par lot.
    • PROJECT_ID : ID du Google Cloud projet dans lequel exécuter le job par lot.
    • REGION : région dans laquelle votre charge de travail est exécutée.
    • BUCKET_PATH : emplacement du bucket Cloud Storage pour importer les dépendances de charge de travail. Le préfixe d'URI gs:// du bucket n'est pas obligatoire. Vous pouvez spécifier le chemin d'accès ou le nom du bucket, par exemple mybucketname1.

    Pour en savoir plus sur l'envoi de jobs par lot PySpark, consultez la documentation de référence de gcloud pour PySpark.

Étapes suivantes