Configurer BigLake Metastore
Ce document explique comment configurer BigLake Metastore avec Dataproc ou Google Cloud sans serveur pour Apache Spark afin de créer un metastore unique et partagé qui fonctionne avec les moteurs Open Source, tels qu'Apache Spark ou Apache Flink.
Avant de commencer
- Activez la facturation pour votre projet Google Cloud . Découvrez comment vérifier si la facturation est activée sur un projet.
Activez les API BigQuery et Dataproc.
(Facultatif) Découvrez le fonctionnement de BigLake Metastore et pourquoi vous devriez l'utiliser.
Rôles requis
Pour obtenir les autorisations nécessaires pour configurer le metastore BigLake, demandez à votre administrateur de vous accorder les rôles IAM suivants :
-
Créez un cluster Dataproc :
Nœud de calcul Dataproc (
roles/dataproc.worker
) sur le compte de service Compute Engine par défaut du projet -
Créez des tables BigLake Metastore :
-
Nœud de calcul Dataproc (
roles/dataproc.worker
) sur le compte de service de VM Dataproc dans le projet -
Éditeur de données BigQuery (
roles/bigquery.dataEditor
) sur le compte de service de VM Dataproc dans le projet -
Administrateur des objets de l'espace de stockage (
roles/storage.objectAdmin
) sur le compte de service de la VM Dataproc dans le projet
-
Nœud de calcul Dataproc (
-
Interrogez les tables BigLake Metastore :
-
Lecteur de données BigQuery (
roles/bigquery.dataViewer
) sur le projet -
Utilisateur BigQuery (
roles/bigquery.user
) sur le projet -
Lecteur des objets Storage (
roles/storage.objectViewer
) sur le projet
-
Lecteur de données BigQuery (
Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.
Vous pouvez également obtenir les autorisations requises avec des rôles personnalisés ou d'autres rôles prédéfinis.
Configurer votre metastore avec Dataproc
Vous pouvez configurer BigLake Metastore avec Dataproc à l'aide de Spark ou de Flink :
Spark
Configurez un nouveau cluster. Pour créer un cluster Dataproc, exécutez la commande
gcloud dataproc clusters create
suivante, qui contient les paramètres à utiliser pour le metastore BigLake :gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --single-node
Remplacez les éléments suivants :
CLUSTER_NAME
: nom de votre cluster Dataproc.PROJECT_ID
: ID du Google Cloud projet dans lequel vous créez le cluster.LOCATION
: région Compute Engine dans laquelle vous créez le cluster.
Envoyez un job Spark à l'aide de l'une des méthodes suivantes :
Google Cloud CLI
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
Remplacez les éléments suivants :
PROJECT_ID
: ID du Google Cloud projet contenant le cluster Dataproc.CLUSTER_NAME
: nom du cluster Dataproc que vous utilisez pour exécuter le job Spark SQL.REGION
: région Compute Engine dans laquelle se trouve votre cluster.LOCATION
: emplacement des ressources BigQuery.CATALOG_NAME
: nom du catalogue Spark à utiliser avec votre job SQL.WAREHOUSE_DIRECTORY
: dossier Cloud Storage contenant votre entrepôt de données. Cette valeur commence pargs://
.SPARK_SQL_COMMAND
: requête SparkSQL que vous souhaitez exécuter. Cette requête inclut les commandes permettant de créer vos ressources. Par exemple, pour créer un espace de noms et une table.
CLI spark-sql
Dans la console Google Cloud , accédez à la page Instances de VM.
Pour vous connecter à une instance de VM Dataproc, cliquez sur SSH sur la ligne qui liste le nom de l'instance de VM principale du cluster Dataproc, qui correspond au nom du cluster suivi du suffixe
-m
. Le résultat ressemble à ce qui suit :Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$
Dans le terminal, exécutez la commande d'initialisation du metastore BigLake suivante :
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Remplacez les éléments suivants :
CATALOG_NAME
: nom du catalogue Spark que vous utilisez avec votre job SQL.PROJECT_ID
: ID du projet Google Cloud du catalogue BigLake Metastore auquel votre catalogue Spark est associé.LOCATION
: Google Cloud emplacement du metastore BigLake.WAREHOUSE_DIRECTORY
: dossier Cloud Storage contenant votre entrepôt de données. Cette valeur commence pargs://
.
Une fois que vous vous êtes connecté au cluster, votre terminal Spark affiche l'invite
spark-sql
, que vous pouvez utiliser pour envoyer des jobs Spark.spark-sql (default)>
Flink
- Créez un cluster Dataproc avec le composant Flink facultatif activé et assurez-vous d'utiliser Dataproc
2.2
ou version ultérieure. Dans la console Google Cloud , accédez à la page Instances de VM.
Dans la liste des instances de machine virtuelle, cliquez sur SSH pour vous connecter à l'instance de VM du cluster Dataproc principal, qui est listée sous le nom du cluster suivi du suffixe
-m
.Configurez le plug-in de catalogue personnalisé Iceberg pour BigLake Metastore :
FLINK_VERSION=1.17 ICEBERG_VERSION=1.5.2 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
Démarrez la session Flink sur YARN :
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
Créez un catalogue dans Flink :
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp_project'='PROJECT_ID', 'gcp_location'='LOCATION' );
Remplacez les éléments suivants :
CATALOG_NAME
: identifiant du catalogue Flink, associé à un catalogue BigLake Metastore.WAREHOUSE_DIRECTORY
: chemin de base du répertoire de l'entrepôt (dossier Cloud Storage dans lequel Flink crée des fichiers). Cette valeur commence pargs://
.PROJECT_ID
: ID du projet du catalogue BigLake Metastore auquel le catalogue Flink est associé.LOCATION
: emplacement des ressources BigQuery.
Votre session Flink est désormais connectée au metastore BigLake. Vous pouvez exécuter des commandes Flink SQL.
Gérer les ressources de métastore BigLake
Maintenant que vous êtes connecté à BigLake Metastore, vous pouvez créer et afficher des ressources en fonction des métadonnées stockées dans BigLake Metastore.
Par exemple, essayez d'exécuter les commandes suivantes dans votre session Flink SQL interactive pour créer une base de données et une table Iceberg.
Utilisez le catalogue Iceberg personnalisé :
USE CATALOG CATALOG_NAME;
Remplacez
CATALOG_NAME
par l'identifiant de votre catalogue Flink.Créez une base de données, ce qui crée un ensemble de données dans BigQuery :
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
Remplacez
DATABASE_NAME
par le nom de votre nouvelle base de données.Utilisez la base de données que vous avez créée :
USE DATABASE_NAME;
Créez une table Iceberg. L'exemple suivant crée une table de ventes :
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
Remplacez
ICEBERG_TABLE_NAME
par le nom de votre nouvelle table.Afficher les métadonnées de table :
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
Répertoriez les tables de la base de données :
SHOW TABLES;
Ingérer des données dans votre table
Après avoir créé une table Iceberg dans la section précédente, vous pouvez utiliser Flink DataGen comme source de données pour ingérer des données en temps réel dans votre table. Voici un exemple de ce workflow :
Créez une table temporaire à l'aide de DataGen :
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
Remplacez les éléments suivants :
DATABASE_NAME
: nom de la base de données dans laquelle stocker votre table temporaire.TEMP_TABLE_NAME
: nom de votre table temporaire.ICEBERG_TABLE_NAME
: nom de la table Iceberg que vous avez créée dans la section précédente.
Définissez le parallélisme sur 1 :
SET 'parallelism.default' = '1';
Définissez l'intervalle de point de contrôle :
SET 'execution.checkpointing.interval' = '10second';
Définissez le point de contrôle :
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
Démarrez le job de streaming en temps réel :
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
Le résultat ressemble à ce qui suit :
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
Pour vérifier l'état du job de streaming, procédez comme suit :
Dans la console Google Cloud , accédez à la page Clusters.
Sélectionnez votre cluster.
Cliquez sur l'onglet Interfaces Web.
Cliquez sur le lien Gestionnaire de ressources YARN.
Dans l'interface YARN ResourceManager, recherchez votre session Flink, puis cliquez sur le lien ApplicationMaster sous Tracking UI.
Dans la colonne État, vérifiez que l'état de votre job est En cours d'exécution.
Interrogez les données de flux dans le client Flink SQL :
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
Interroger les flux de données dans BigQuery :
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
Arrêtez le job de streaming dans le client Flink SQL :
STOP JOB 'JOB_ID';
Remplacez
JOB_ID
par l'ID de tâche qui s'est affiché dans le résultat lorsque vous avez créé la tâche de streaming.
Configurer votre metastore avec Serverless pour Apache Spark
Vous pouvez configurer le metastore BigLake avec Serverless pour Apache Spark à l'aide de Spark SQL ou de PySpark.
Spark SQL
Créez un fichier SQL contenant les commandes Spark SQL que vous souhaitez exécuter dans le metastore BigLake. Par exemple, cette commande crée un espace de noms et une table :
CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME; CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
Remplacez les éléments suivants :
CATALOG_NAME
: nom du catalogue qui fait référence à votre table Spark.NAMESPACE_NAME
: nom de l'espace de noms qui fait référence à votre table Spark.TABLE_NAME
: nom de table pour votre table Spark.WAREHOUSE_DIRECTORY
: URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.
Envoyez un job par lot SparkSQL en exécutant la commande
gcloud dataproc batches submit spark-sql
suivante :gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \ --project=PROJECT_ID \ --region=REGION \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
Remplacez les éléments suivants :
SQL_SCRIPT_PATH
: chemin d'accès au fichier SQL utilisé par le job par lot.PROJECT_ID
: ID du Google Cloud projet dans lequel exécuter le job par lot.REGION
: région dans laquelle votre charge de travail est exécutée.SUBNET_NAME
(facultatif) : nom d'un sous-réseau VPC dansREGION
qui répond aux exigences concernant le sous-réseau de session.BUCKET_PATH
: emplacement du bucket Cloud Storage pour importer les dépendances de charge de travail. Le fichierWAREHOUSE_DIRECTORY
se trouve dans ce bucket. Le préfixe d'URIgs://
du bucket n'est pas obligatoire. Vous pouvez spécifier le chemin d'accès ou le nom du bucket, par exemplemybucketname1
.LOCATION
: emplacement dans lequel exécuter le job par lot.
Pour en savoir plus sur l'envoi de jobs Spark par lot, consultez Exécuter une charge de travail Spark par lot.
PySpark
Créez un fichier Python avec les commandes PySpark que vous souhaitez exécuter dans BigLake Metastore.
Par exemple, la commande suivante configure un environnement Spark pour interagir avec les tables Iceberg stockées dans le metastore BigLake. La commande crée ensuite un espace de noms et une table Iceberg dans cet espace de noms.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("BigLake Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")
Remplacez les éléments suivants :
PROJECT_ID
: ID du Google Cloud projet dans lequel exécuter le job par lot.LOCATION
: emplacement des ressources BigQuery.CATALOG_NAME
: nom du catalogue qui fait référence à votre table Spark.TABLE_NAME
: nom de table pour votre table Spark.WAREHOUSE_DIRECTORY
: URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.NAMESPACE_NAME
: nom de l'espace de noms qui fait référence à votre table Spark.
Envoyez le job par lot à l'aide de la commande
gcloud dataproc batches submit pyspark
suivante :gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
Remplacez les éléments suivants :
PYTHON_SCRIPT_PATH
: chemin d'accès au script Python utilisé par le job par lot.PROJECT_ID
: ID du Google Cloud projet dans lequel exécuter le job par lot.REGION
: région dans laquelle votre charge de travail est exécutée.BUCKET_PATH
: emplacement du bucket Cloud Storage pour importer les dépendances de charge de travail. Le préfixe d'URIgs://
du bucket n'est pas obligatoire. Vous pouvez spécifier le chemin d'accès ou le nom du bucket, par exemplemybucketname1
.
Pour en savoir plus sur l'envoi de jobs par lot PySpark, consultez la documentation de référence de gcloud pour PySpark.
Étapes suivantes
- Créez et gérez des ressources de metastore.
- Configurez les fonctionnalités facultatives de BigLake Metastore.