Configurar metastore de BigLake
En este documento se explica cómo configurar el almacén de metadatos de BigLake con Dataproc o Google Cloud Serverless para Apache Spark para crear un almacén de metadatos único y compartido que funcione en motores de código abierto, como Apache Spark o Apache Flink.
Antes de empezar
- Habilita la facturación de tu Google Cloud proyecto. Consulta cómo comprobar si la facturación está habilitada en un proyecto.
Habilita las APIs BigQuery y Dataproc.
Opcional: Consulta cómo funciona el metastore de BigLake y por qué deberías usarlo.
Roles obligatorios
Para obtener los permisos que necesitas para configurar el metastore de BigLake, pide a tu administrador que te conceda los siguientes roles de gestión de identidades y accesos:
-
Crea un clúster de Dataproc:
Trabajador de Dataproc (
roles/dataproc.worker
) en la cuenta de servicio predeterminada de Compute Engine del proyecto -
Crea tablas de metastore de BigLake:
-
Trabajador de Dataproc (
roles/dataproc.worker
) en la cuenta de servicio de la VM de Dataproc del proyecto -
Editor de datos de BigQuery (
roles/bigquery.dataEditor
) en la cuenta de servicio de la VM de Dataproc del proyecto -
Administrador de objetos de Storage (
roles/storage.objectAdmin
) en la cuenta de servicio de la VM de Dataproc del proyecto
-
Trabajador de Dataproc (
-
Consultar tablas de metastore de BigLake:
-
Lector de datos de BigQuery (
roles/bigquery.dataViewer
) en el proyecto -
Usuario de BigQuery (
roles/bigquery.user
) en el proyecto -
Lector de objetos de Storage (
roles/storage.objectViewer
) en el proyecto
-
Lector de datos de BigQuery (
Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.
También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.
Configurar el metastore con Dataproc
Puedes configurar BigLake Metastore con Dataproc mediante Spark o Flink:
Spark
Configura un clúster nuevo. Para crear un clúster de Dataproc, ejecuta el siguiente
gcloud dataproc clusters create
comando, que contiene los ajustes que necesitas para usar el metastore de BigLake:gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --single-node
Haz los cambios siguientes:
CLUSTER_NAME
: nombre del clúster de Dataproc.PROJECT_ID
: el ID del Google Cloud proyecto en el que vas a crear el clúster.LOCATION
: la región de Compute Engine en la que vas a crear el clúster.
Envía una tarea de Spark con uno de los siguientes métodos:
Google Cloud CLI
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
Haz los cambios siguientes:
PROJECT_ID
: el ID del Google Cloud proyecto que contiene el clúster de Dataproc.CLUSTER_NAME
: el nombre del clúster de Dataproc que usas para ejecutar el trabajo de Spark SQL.REGION
: la región de Compute Engine en la que se encuentra tu clúster.LOCATION
: la ubicación de los recursos de BigQuery.CATALOG_NAME
: el nombre del catálogo de Spark que se va a usar con tu tarea de SQL.WAREHOUSE_DIRECTORY
: la carpeta de Cloud Storage que contiene su almacén de datos. Este valor empieza porgs://
.SPARK_SQL_COMMAND
: la consulta de Spark SQL que quieres ejecutar. Esta consulta incluye los comandos para crear tus recursos. Por ejemplo, para crear un espacio de nombres y una tabla.
CLI de spark-sql
En la consola de Google Cloud , ve a la página Instancias de VM.
Para conectarte a una instancia de VM de Dataproc, haz clic en SSH en la fila que muestra el nombre de la VM principal del clúster de Dataproc, que es el nombre del clúster seguido del sufijo
-m
. El resultado debería ser similar al siguiente:Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$
En el terminal, ejecuta el siguiente comando de inicialización del metastore de BigLake:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Haz los cambios siguientes:
CATALOG_NAME
: el nombre del catálogo de Spark que estás usando con tu trabajo de SQL.PROJECT_ID
: el ID del proyecto Google Cloud del catálogo de metastore de BigLake con el que se vincula tu catálogo de Spark.LOCATION
: la Google Cloud ubicación del metastore de BigLake.WAREHOUSE_DIRECTORY
: la carpeta de Cloud Storage que contiene su almacén de datos. Este valor empieza porgs://
.
Una vez que te hayas conectado correctamente al clúster, el terminal de Spark mostrará el prompt
spark-sql
, que puedes usar para enviar trabajos de Spark.spark-sql (default)>
Flink
- Crea un clúster de Dataproc con el componente opcional Flink habilitado y asegúrate de usar Dataproc
2.2
o una versión posterior. En la consola de Google Cloud , ve a la página Instancias de VM.
En la lista de instancias de máquina virtual, haz clic en SSH para conectarte a la instancia de máquina virtual del clúster principal de Dataproc, que aparece con el nombre del clúster seguido del sufijo
-m
.Configura el complemento de catálogo personalizado de Iceberg para BigLake Metastore:
FLINK_VERSION=1.17 ICEBERG_VERSION=1.5.2 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
Inicia la sesión de Flink en YARN:
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
Crear un catálogo en Flink:
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp_project'='PROJECT_ID', 'gcp_location'='LOCATION' );
Haz los cambios siguientes:
CATALOG_NAME
: el identificador del catálogo de Flink, que está vinculado a un catálogo del metastore de BigLake.WAREHOUSE_DIRECTORY
: la ruta base del directorio del almacén (la carpeta de Cloud Storage en la que Flink crea archivos). Este valor empieza porgs://
.PROJECT_ID
: el ID del proyecto del catálogo de metastore de BigLake al que se vincula el catálogo de Flink.LOCATION
: la ubicación de los recursos de BigQuery.
Tu sesión de Flink ahora está conectada al metastore de BigLake y puedes ejecutar comandos de Flink SQL.
Gestionar recursos de metastore de BigLake
Ahora que te has conectado a BigLake Metastore, puedes crear y ver recursos basados en los metadatos almacenados en BigLake Metastore.
Por ejemplo, prueba a ejecutar los siguientes comandos en tu sesión interactiva de Flink SQL para crear una base de datos y una tabla de Iceberg.
Usa el catálogo personalizado de Iceberg:
USE CATALOG CATALOG_NAME;
Sustituye
CATALOG_NAME
por el identificador de catálogo de Flink.Crea una base de datos, lo que crea un conjunto de datos en BigQuery:
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
Sustituye
DATABASE_NAME
por el nombre de tu nueva base de datos.Usa la base de datos que has creado:
USE DATABASE_NAME;
Crea una tabla Iceberg. Con el siguiente comando se crea una tabla de ventas de ejemplo:
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
Sustituye
ICEBERG_TABLE_NAME
por el nombre de la nueva tabla.Para ver los metadatos de una tabla, sigue estos pasos:
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
Mostrar las tablas de la base de datos:
SHOW TABLES;
Ingerir datos en una tabla
Después de crear una tabla Iceberg en la sección anterior, puedes usar Flink DataGen como fuente de datos para ingerir datos en tiempo real en tu tabla. Los pasos siguientes son un ejemplo de este flujo de trabajo:
Para crear una tabla temporal con DataGen, sigue estos pasos:
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
Haz los cambios siguientes:
DATABASE_NAME
: el nombre de la base de datos en la que se almacenará tu tabla temporal.TEMP_TABLE_NAME
: un nombre para la tabla temporal.ICEBERG_TABLE_NAME
: el nombre de la tabla Iceberg que has creado en la sección anterior.
Define el paralelismo en 1:
SET 'parallelism.default' = '1';
Define el intervalo del punto de control:
SET 'execution.checkpointing.interval' = '10second';
Define el punto de control:
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
Inicia la tarea de streaming en tiempo real:
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
El resultado debería ser similar al siguiente:
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
Para comprobar el estado del trabajo de streaming, haz lo siguiente:
En la Google Cloud consola, ve a la página Clusters.
Selecciona tu clúster.
Haga clic en la pestaña Interfaces web.
Haz clic en el enlace YARN ResourceManager.
En la interfaz de YARN ResourceManager, busca tu sesión de Flink y haz clic en el enlace ApplicationMaster de Tracking UI.
En la columna Estado, comprueba que el estado de tu trabajo sea En ejecución.
Consulta datos de streaming en el cliente de Flink SQL:
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
Consultar datos de streaming en BigQuery:
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
Finaliza el trabajo de streaming en el cliente de Flink SQL:
STOP JOB 'JOB_ID';
Sustituye
JOB_ID
por el ID de trabajo que se mostró en la salida cuando creaste el trabajo de streaming.
Configurar el metastore con Serverless para Apache Spark
Puedes configurar BigLake Metastore con Serverless para Apache Spark mediante Spark SQL o PySpark.
Spark SQL
Crea un archivo SQL con los comandos de Spark SQL que quieras ejecutar en el almacén de metadatos de BigLake. Por ejemplo, este comando crea un espacio de nombres y una tabla:
CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME; CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
Haz los cambios siguientes:
CATALOG_NAME
: el nombre del catálogo que hace referencia a tu tabla de Spark.NAMESPACE_NAME
: el nombre del espacio de nombres que hace referencia a tu tabla de Spark.TABLE_NAME
: el nombre de la tabla de Spark.WAREHOUSE_DIRECTORY
: el URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.
Envía una tarea por lotes de Spark SQL ejecutando el siguiente comando
gcloud dataproc batches submit spark-sql
:gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \ --project=PROJECT_ID \ --region=REGION \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
Haz los cambios siguientes:
SQL_SCRIPT_PATH
: la ruta al archivo SQL que usa el trabajo por lotes.PROJECT_ID
: el ID del Google Cloud proyecto en el que se ejecutará la tarea por lotes.REGION
: la región en la que se ejecuta tu carga de trabajo.SUBNET_NAME
(opcional): el nombre de una subred de VPC enREGION
que cumpla los requisitos de la subred de sesión.BUCKET_PATH
: la ubicación del segmento de Cloud Storage en el que se subirán las dependencias de la carga de trabajo. ElWAREHOUSE_DIRECTORY
se encuentra en este contenedor. No es necesario incluir el prefijo de URIgs://
del bucket. Puedes especificar la ruta o el nombre del cubo; por ejemplo,mybucketname1
.LOCATION
: la ubicación en la que se ejecutará el trabajo por lotes.
Para obtener más información sobre cómo enviar trabajos por lotes de Spark, consulta Ejecutar una carga de trabajo por lotes de Spark.
PySpark
Crea un archivo de Python con los comandos de PySpark que quieras ejecutar en el metastore de BigLake.
Por ejemplo, el siguiente comando configura un entorno de Spark para interactuar con tablas de Iceberg almacenadas en BigLake Metastore. A continuación, el comando crea un nuevo espacio de nombres y una tabla Iceberg en ese espacio de nombres.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("BigLake Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")
Haz los cambios siguientes:
PROJECT_ID
: el ID del Google Cloud proyecto en el que se ejecutará la tarea por lotes.LOCATION
: la ubicación en la que se encuentran los recursos de BigQuery.CATALOG_NAME
: el nombre del catálogo que hace referencia a tu tabla de Spark.TABLE_NAME
: el nombre de la tabla de Spark.WAREHOUSE_DIRECTORY
: el URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.NAMESPACE_NAME
: el nombre del espacio de nombres que hace referencia a tu tabla de Spark.
Envía el trabajo por lotes con el siguiente comando
gcloud dataproc batches submit pyspark
:gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
Haz los cambios siguientes:
PYTHON_SCRIPT_PATH
: la ruta a la secuencia de comandos de Python que usa el trabajo por lotes.PROJECT_ID
: el ID del Google Cloud proyecto en el que se ejecutará la tarea por lotes.REGION
: la región en la que se ejecuta tu carga de trabajo.BUCKET_PATH
: la ubicación del segmento de Cloud Storage en el que se subirán las dependencias de la carga de trabajo. No es necesario incluir el prefijo de URIgs://
del bucket. Puede especificar la ruta o el nombre del segmento, por ejemplo,mybucketname1
.
Para obtener más información sobre cómo enviar trabajos por lotes de PySpark, consulta la referencia de gcloud de PySpark.
Siguientes pasos
- Crear y gestionar recursos de metastore.
- Configura las funciones opcionales de metastore de BigLake.