Usar el metastore de BigLake con Dataproc
En este documento se explica cómo usar el almacén de metadatos de BigLake con Dataproc en Compute Engine. Esta conexión te proporciona un único almacén de metadatos compartido que funciona en motores de software de código abierto, como Apache Spark o Apache Flink.
Antes de empezar
- Habilita la facturación de tu Google Cloud proyecto. Consulta cómo comprobar si la facturación está habilitada en un proyecto.
Habilita las APIs BigQuery y Dataproc.
Opcional: Consulta cómo funciona el metastore de BigLake y por qué deberías usarlo.
Roles obligatorios
Para obtener los permisos que necesitas para usar Spark o Flink y Dataproc con BigLake Metastore como almacén de metadatos, pide a tu administrador que te conceda los siguientes roles de gestión de identidades y accesos:
-
Crea un clúster de Dataproc:
Trabajador de Dataproc (
roles/dataproc.worker
) en la cuenta de servicio predeterminada de Compute Engine del proyecto -
Crea tablas de metastore de BigLake en Spark o Flink:
-
Trabajador de Dataproc (
roles/dataproc.worker
) en la cuenta de servicio de la VM de Dataproc del proyecto -
Editor de datos de BigQuery (
roles/bigquery.dataEditor
) en la cuenta de servicio de la VM de Dataproc del proyecto -
Administrador de objetos de Storage (
roles/storage.objectAdmin
) en la cuenta de servicio de la VM de Dataproc del proyecto
-
Trabajador de Dataproc (
-
Consulta tablas de la metastore de BigLake en BigQuery:
-
Lector de datos de BigQuery (
roles/bigquery.dataViewer
) en el proyecto -
Usuario de BigQuery (
roles/bigquery.user
) en el proyecto -
Lector de objetos de Storage (
roles/storage.objectViewer
) en el proyecto
-
Lector de datos de BigQuery (
Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.
También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.
Flujo de trabajo general
Para usar Dataproc en Compute Engine con BigLake Metastore, sigue estos pasos generales:
- Crea un clúster de Dataproc o configura uno que ya tengas.
- Conéctate al motor de software de código abierto que prefieras, como Spark o Flink.
- Usa un archivo JAR para instalar el complemento de catálogo de Apache Iceberg en el clúster.
- Crea y gestiona tus recursos de metaalmacén de BigLake según sea necesario, en función del motor de software de código abierto que estés usando.
- En BigQuery, accede a tus recursos de metastore de BigLake y úsalos.
Conectar BigLake Metastore a Spark
En las siguientes instrucciones se muestra cómo conectar Dataproc a BigLake Metastore mediante Spark SQL interactivo.
Descargar el complemento de catálogo de Iceberg
Para conectar BigLake Metastore con Dataproc y Spark, debes usar el archivo JAR del complemento del catálogo de Iceberg de BigLake Metastore.
Este archivo se incluye de forma predeterminada en la versión 2.2 de la imagen de Dataproc. Si tus clústeres de Dataproc no tienen acceso directo a Internet, debes descargar el complemento y subirlo a un segmento de Cloud Storage al que pueda acceder tu clúster de Dataproc.
Descarga el complemento del catálogo de Iceberg de metastore de BigLake.
Configurar un clúster de Dataproc
Antes de conectarte a BigLake Metastore, debes configurar un clúster de Dataproc.
Para ello, puedes crear un clúster o usar uno que ya tengas. Después, puedes usar este clúster para ejecutar Spark SQL interactivo y gestionar tus recursos de metastore de BigLake.
La subred de la región en la que se crea el clúster debe tener habilitada la opción Acceso privado de Google (PGA). De forma predeterminada, las VMs de clúster de Dataproc creadas con la versión de imagen 2.2 (predeterminada) o posterior solo tienen direcciones IP internas. Para permitir que las VMs del clúster se comuniquen con las APIs de Google, habilita el acceso privado de Google en la subred de la red
default
(o el nombre de la red especificado por el usuario, si procede) de la región en la que se crea el clúster.Si quieres ejecutar el ejemplo de interfaz web de Zeppelin de esta guía, debes usar o crear un clúster de Dataproc con el componente opcional de Zeppelin habilitado.
Agrupación nueva
Para crear un clúster de Dataproc, ejecuta el siguiente comando gcloud
dataproc clusters create
. Esta configuración contiene los ajustes que necesitas para usar BigLake Metastore.
gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --optional-components=ZEPPELIN \ --enable-component-gateway \ --single-node
Haz los cambios siguientes:
CLUSTER_NAME
: nombre del clúster de Dataproc.PROJECT_ID
: el ID del Google Cloud proyecto en el que vas a crear el clúster.LOCATION
: la Google Cloud región en la que vas a crear el clúster.
Clúster disponible
Para configurar un clúster, añade el siguiente tiempo de ejecución de Iceberg Spark al clúster.
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1
Puedes añadir el tiempo de ejecución de una de las siguientes formas:
Initialization Script. Añade la dependencia del tiempo de ejecución a una secuencia de comandos de inicialización personalizada que se ejecute cuando se cree.
Después de añadir la dependencia del tiempo de ejecución a la secuencia de comandos, sigue las instrucciones para crear, recrear y actualizar un clúster.
Instalación manual. Añade manualmente el archivo JAR del complemento del catálogo de Iceberg y configura las propiedades de Spark para incluir el tiempo de ejecución en tu clúster.
Enviar una tarea de Spark
Para enviar un trabajo de Spark, utilice uno de los siguientes métodos:
CLI de gcloud
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region==REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
Haz los cambios siguientes:
PROJECT_ID
: el ID del Google Cloud proyecto que contiene el clúster de Dataproc.CLUSTER_NAME
: el nombre del clúster de Dataproc que usas para ejecutar el trabajo de Spark SQL.REGION
: la región de Compute Engine en la que se encuentra tu clúster.LOCATION
: la ubicación de los recursos de BigQuery.CATALOG_NAME
: el nombre del catálogo de Spark que estás usando con tu trabajo de SQL.WAREHOUSE_DIRECTORY
: la carpeta de Cloud Storage que contiene su almacén de datos. Este valor empieza porgs://
.SPARK_SQL_COMMAND
: la consulta de Spark SQL que quieres ejecutar. Esta consulta incluye los comandos para crear tus recursos. Por ejemplo, para crear un espacio de nombres y una tabla.
Chispa interactiva
Conéctate a Spark e instala el complemento de catálogo
Para instalar el complemento de catálogo de BigLake Metastore, conéctate a tu clúster de Dataproc mediante SSH.
- En la consola de Google Cloud , ve a la página Instancias de VM.
Para conectarte a una instancia de VM de Dataproc, haz clic en SSH en la lista de instancias de máquinas virtuales. La salida es similar a la siguiente:
Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$
En el terminal, ejecuta el siguiente comando de inicialización del metastore de BigLake:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Haz los cambios siguientes:
CATALOG_NAME
: el nombre del catálogo de Spark que estás usando con tu trabajo de SQL.PROJECT_ID
: el ID del proyecto Google Cloud del catálogo de metastore de BigLake con el que se vincula tu catálogo de Spark.LOCATION
: la Google Cloud ubicación del metastore de BigLake.WAREHOUSE_DIRECTORY
: la carpeta de Cloud Storage que contiene su almacén de datos. Este valor empieza porgs://
.
Después de conectarte correctamente a un clúster, tu terminal de Spark mostrará la petición
spark-sql
.spark-sql (default)>
Gestionar recursos de metastore de BigLake
Ahora estás conectado a BigLake Metastore. Puede ver los recursos que ya tiene o crear otros nuevos a partir de los metadatos almacenados en BigLake Metastore.
Por ejemplo, prueba a ejecutar los siguientes comandos en la sesión interactiva de Spark SQL para crear un espacio de nombres y una tabla de Iceberg.
Usa el catálogo personalizado de Iceberg:
USE `CATALOG_NAME`;
Crea un espacio de nombres:
CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
Usa el espacio de nombres creado:
USE NAMESPACE_NAME;
Crea una tabla de Iceberg:
CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
Insertar una fila de una tabla:
INSERT INTO TABLE_NAME VALUES (1, "first row");
Añadir una columna a una tabla:
ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
Para ver los metadatos de una tabla, sigue estos pasos:
DESCRIBE EXTENDED TABLE_NAME;
Mostrar las tablas del espacio de nombres:
SHOW TABLES;
Cuaderno de Zeppelin
En la Google Cloud consola, ve a la página Clústeres de Dataproc.
Haz clic en el nombre del clúster que quieras usar.
Se abrirá la página Cluster Details (Detalles del clúster).
En el menú de navegación, haga clic en Interfaces web.
En Pasarela de componentes, haz clic en Zeppelin. Se abrirá la página del cuaderno de Zeppelin.
En el menú de navegación, haz clic en Notebook (Cuaderno) y, a continuación, en +Create new note (+Crear nota).
En el cuadro de diálogo, introduce el nombre del cuaderno. Deja Spark seleccionado como intérprete predeterminado.
Haz clic en Crear. Se crea un cuaderno.
En el cuaderno, haz clic en el menú de configuración y, a continuación, en Intérprete.
En el campo Buscar intérpretes, busca Spark.
Haz clic en Editar.
En el campo Spark.jars, introduce el URI del archivo JAR de Spark.
https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
Haz clic en Guardar.
Haz clic en Aceptar.
Copia el siguiente código de PySpark en tu notebook de Zeppelin.
%pyspark from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("BigLake Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("select version()").show() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE TABLE_NAME;").show()
Haz los cambios siguientes:
CATALOG_NAME
: nombre del catálogo de Spark que se va a usar en la tarea de SQL.PROJECT_ID
: el ID del Google Cloud proyecto que contiene el clúster de Dataproc.WAREHOUSE_DIRECTORY
: la carpeta de Cloud Storage que contiene su almacén de datos. Este valor empieza porgs://
.NAMESPACE_NAME
: el nombre del espacio de nombres que hace referencia a tu tabla de Spark.WAREHOUSE_DIRECTORY
: el URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.TABLE_NAME
: el nombre de la tabla de Spark.
Haz clic en el icono de ejecución o pulsa
Shift-Enter
para ejecutar el código. Cuando se complete el trabajo, el mensaje de estado mostrará "Spark Job Finished" (Trabajo de Spark finalizado) y el resultado mostrará el contenido de la tabla:
Conectar BigLake Metastore con Flink
En las siguientes instrucciones se muestra cómo conectar Dataproc al metastore de BigLake mediante el cliente de Flink SQL.
Instalar el complemento de catálogo y conectarse a una sesión de Flink
Para conectar BigLake Metastore a Flink, haz lo siguiente:
- Crea un clúster de Dataproc con el componente opcional Flink habilitado y asegúrate de usar Dataproc 2.2 o una versión posterior.
En la consola de Google Cloud , ve a la página Instancias de VM.
En la lista de instancias de máquina virtual, haz clic en SSH para conectarte a una instancia de VM de Dataproc.
Configura el complemento de catálogo personalizado de Iceberg para BigLake Metastore:
FLINK_VERSION=1.17 ICEBERG_VERSION=1.5.2 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
Inicia la sesión de Flink en YARN:
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
Crear un catálogo en Flink:
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp_project'='PROJECT_ID', 'gcp_location'='LOCATION' );
Haz los cambios siguientes:
CATALOG_NAME
: el identificador del catálogo de Flink, que está vinculado a un catálogo del metastore de BigLake.WAREHOUSE_DIRECTORY
: la ruta base del directorio del almacén (la carpeta de Cloud Storage en la que Flink crea archivos). Este valor empieza porgs://
.PROJECT_ID
: el ID del proyecto del catálogo de metastore de BigLake al que se vincula el catálogo de Flink.LOCATION
: la ubicación de los recursos de BigQuery.
Tu sesión de Flink ahora está conectada al metastore de BigLake y puedes ejecutar comandos de Flink SQL.
Gestionar recursos de metastore de BigLake
Ahora que te has conectado a BigLake Metastore, puedes crear y ver recursos basados en los metadatos almacenados en BigLake Metastore.
Por ejemplo, prueba a ejecutar los siguientes comandos en tu sesión interactiva de Flink SQL para crear una base de datos y una tabla de Iceberg.
Usa el catálogo personalizado de Iceberg:
USE CATALOG CATALOG_NAME;
Sustituye
CATALOG_NAME
por el identificador de catálogo de Flink.Crea una base de datos, lo que crea un conjunto de datos en BigQuery:
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
Sustituye
DATABASE_NAME
por el nombre de tu nueva base de datos.Usa la base de datos que has creado:
USE DATABASE_NAME;
Crea una tabla Iceberg. Con el siguiente comando se crea una tabla de ventas de ejemplo:
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
Sustituye
ICEBERG_TABLE_NAME
por el nombre de la nueva tabla.Para ver los metadatos de una tabla, sigue estos pasos:
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
Mostrar las tablas de la base de datos:
SHOW TABLES;
Ingerir datos en una tabla
Después de crear una tabla Iceberg en la sección anterior, puedes usar Flink DataGen como fuente de datos para ingerir datos en tiempo real en tu tabla. Los pasos siguientes son un ejemplo de este flujo de trabajo:
Para crear una tabla temporal con DataGen, sigue estos pasos:
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
Haz los cambios siguientes:
DATABASE_NAME
: el nombre de la base de datos en la que se almacenará tu tabla temporal.TEMP_TABLE_NAME
: un nombre para la tabla temporal.ICEBERG_TABLE_NAME
: el nombre de la tabla Iceberg que has creado en la sección anterior.
Define el paralelismo en 1:
SET 'parallelism.default' = '1';
Define el intervalo del punto de control:
SET 'execution.checkpointing.interval' = '10second';
Define el punto de control:
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
Inicia la tarea de streaming en tiempo real:
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
El resultado debería ser similar al siguiente:
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
Para comprobar el estado del trabajo de streaming, haz lo siguiente:
En la Google Cloud consola, ve a la página Clusters.
Selecciona tu clúster.
Haga clic en la pestaña Interfaces web.
Haz clic en el enlace YARN ResourceManager.
En la interfaz de YARN ResourceManager, busca tu sesión de Flink y haz clic en el enlace ApplicationMaster de Tracking UI.
En la columna Estado, comprueba que el estado de tu trabajo sea En ejecución.
Consulta datos de streaming en el cliente de Flink SQL:
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
Consultar datos de streaming en BigQuery:
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
Finaliza el trabajo de streaming en el cliente de Flink SQL:
STOP JOB 'JOB_ID';
Sustituye
JOB_ID
por el ID de trabajo que se mostró en la salida cuando creaste el trabajo de streaming.
Siguientes pasos
- Configura las funciones opcionales de metastore de BigLake.
- Ver y consultar tablas de Spark en BigQuery