Configura BigLake Metastore
En este documento, se explica cómo configurar BigLake Metastore con Dataproc o Google Cloud Serverless para Apache Spark para crear un metastore único y compartido que funcione en motores de código abierto, como Apache Spark o Apache Flink.
Antes de comenzar
- Habilita la facturación para tu proyecto de Google Cloud . Obtén información para verificar si la facturación está habilitada en un proyecto.
Habilita las APIs de BigQuery y Dataproc.
Opcional: Comprende cómo funciona BigLake Metastore y por qué deberías usarlo.
Roles requeridos
Para obtener los permisos que necesitas para configurar el metastore de BigLake, pídele a tu administrador que te otorgue los siguientes roles de IAM:
-
Crea un clúster de Dataproc:
Trabajador de Dataproc (
roles/dataproc.worker
) en la cuenta de servicio predeterminada de Compute Engine en el proyecto -
Crea tablas de BigLake Metastore:
-
Trabajador de Dataproc (
roles/dataproc.worker
) en la cuenta de servicio de VM de Dataproc en el proyecto -
Editor de datos de BigQuery (
roles/bigquery.dataEditor
) en la cuenta de servicio de la VM de Dataproc del proyecto -
Administrador de objetos de almacenamiento (
roles/storage.objectAdmin
) en la cuenta de servicio de VM de Dataproc en el proyecto
-
Trabajador de Dataproc (
-
Consulta las tablas de BigLake Metastore:
-
Visualizador de datos de BigQuery (
roles/bigquery.dataViewer
) en el proyecto -
Usuario de BigQuery (
roles/bigquery.user
) en el proyecto -
Visualizador de objetos de Storage (
roles/storage.objectViewer
) en el proyecto
-
Visualizador de datos de BigQuery (
Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.
También puedes obtener los permisos necesarios a través de roles personalizados o cualquier otro rol predefinido.
Configura tu almacén de metadatos con Dataproc
Puedes configurar BigLake Metastore con Dataproc usando Spark o Flink:
Spark
Configura un clúster nuevo. Para crear un clúster de Dataproc nuevo, ejecuta el siguiente comando
gcloud dataproc clusters create
, que contiene la configuración que necesitas para usar BigLake Metastore:gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --single-node
Reemplaza lo siguiente:
CLUSTER_NAME
: Es un nombre para tu clúster de Dataproc.PROJECT_ID
: Es el ID del Google Cloud proyecto en el que crearás el clúster.LOCATION
: Es la región de Compute Engine en la que crearás el clúster.
Envía un trabajo de Spark con uno de los siguientes métodos:
Google Cloud CLI
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
Reemplaza lo siguiente:
PROJECT_ID
: Es el ID del proyecto Google Cloud que contiene el clúster de Dataproc.CLUSTER_NAME
: Es el nombre del clúster de Dataproc que usas para ejecutar el trabajo de Spark SQL.REGION
: La región de Compute Engine en la que se encuentra tu clúster.LOCATION
: Es la ubicación de los recursos de BigQuery.CATALOG_NAME
: Es el nombre del catálogo de Spark que se usará con tu trabajo de SQL.WAREHOUSE_DIRECTORY
: Es la carpeta de Cloud Storage que contiene tu almacén de datos. Este valor comienza congs://
.SPARK_SQL_COMMAND
: Es la consulta de Spark SQL que deseas ejecutar. Esta consulta incluye los comandos para crear tus recursos. Por ejemplo, para crear un espacio de nombres y una tabla.
CLI de spark-sql
En la Google Cloud consola, ve a la página Instancias de VM.
Para conectarte a una instancia de VM de Dataproc, haz clic en SSH en la fila que enumera el nombre de la instancia de VM principal del clúster de Dataproc, que es el nombre del clúster seguido de un sufijo
-m
. El resultado es similar a este:Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$
En la terminal, ejecuta el siguiente comando de inicialización del metastore de BigLake:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Reemplaza lo siguiente:
CATALOG_NAME
: Es el nombre del catálogo de Spark que usas con tu trabajo de SQL.PROJECT_ID
: Es el ID del Google Cloud proyecto del catálogo de metastore de BigLake con el que se vincula tu catálogo de Spark.LOCATION
: Es la Google Cloud ubicación del metastore de BigLake.WAREHOUSE_DIRECTORY
: Es la carpeta de Cloud Storage que contiene tu almacén de datos. Este valor comienza congs://
.
Después de conectarte correctamente al clúster, la terminal de Spark mostrará el mensaje
spark-sql
, que puedes usar para enviar trabajos de Spark.spark-sql (default)>
Flink
- Crea un clúster de Dataproc con el componente Flink opcional habilitado y asegúrate de usar Dataproc
2.2
o una versión posterior. En la Google Cloud consola, ve a la página Instancias de VM.
En la lista de instancias de máquina virtual, haz clic en SSH para conectarte a la instancia principal de la VM del clúster de Dataproc, que se muestra como el nombre del clúster seguido de un sufijo
-m
.Configura el complemento del catálogo personalizado de Iceberg para BigLake Metastore:
FLINK_VERSION=1.17 ICEBERG_VERSION=1.5.2 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
Inicia la sesión de Flink en YARN:
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
Crea un catálogo en Flink:
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp_project'='PROJECT_ID', 'gcp_location'='LOCATION' );
Reemplaza lo siguiente:
CATALOG_NAME
: Es el identificador del catálogo de Flink, que está vinculado a un catálogo de metastore de BigLake.WAREHOUSE_DIRECTORY
: Es la ruta de acceso base para el directorio del almacén (la carpeta de Cloud Storage en la que Flink crea archivos). Este valor comienza congs://
.PROJECT_ID
: Es el ID del proyecto del catálogo de BigLake Metastore con el que se vincula el catálogo de Flink.LOCATION
: La ubicación de los recursos de BigQuery.
Tu sesión de Flink ahora está conectada a BigLake Metastore, y puedes ejecutar comandos de Flink SQL.
Administra recursos de BigLake Metastore
Ahora que te conectaste a BigLake Metastore, puedes crear y ver recursos basados en los metadatos almacenados en BigLake Metastore.
Por ejemplo, intenta ejecutar los siguientes comandos en tu sesión interactiva de Flink SQL para crear una base de datos y una tabla de Iceberg.
Usa el catálogo de Iceberg personalizado:
USE CATALOG CATALOG_NAME;
Reemplaza
CATALOG_NAME
por el identificador de tu catálogo de Flink.Crea una base de datos, lo que crea un conjunto de datos en BigQuery:
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
Reemplaza
DATABASE_NAME
por el nombre de tu nueva base de datos.Usa la base de datos que creaste:
USE DATABASE_NAME;
Crea una tabla de Iceberg. En el siguiente ejemplo, se crea una tabla de ventas:
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
Reemplaza
ICEBERG_TABLE_NAME
por un nombre para tu tabla nueva.Sigue estos pasos para ver los metadatos de la tabla:
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
Enumera las tablas de la base de datos:
SHOW TABLES;
Transfiere datos a tu tabla
Después de crear una tabla de Iceberg en la sección anterior, puedes usar Flink DataGen como fuente de datos para transferir datos en tiempo real a tu tabla. Los siguientes pasos son un ejemplo de este flujo de trabajo:
Sigue estos pasos para crear una tabla temporal con DataGen:
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
Reemplaza lo siguiente:
DATABASE_NAME
: Es el nombre de la base de datos en la que se almacenará tu tabla temporal.TEMP_TABLE_NAME
: Es un nombre para tu tabla temporal.ICEBERG_TABLE_NAME
: Es el nombre de la tabla de Iceberg que creaste en la sección anterior.
Establece el paralelismo en 1:
SET 'parallelism.default' = '1';
Establece el intervalo del punto de control:
SET 'execution.checkpointing.interval' = '10second';
Establece el punto de control:
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
Inicia el trabajo de transmisión en tiempo real:
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
El resultado es similar a este:
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
Para verificar el estado del trabajo de transmisión, haz lo siguiente:
En la consola de Google Cloud , ve a la página Clústeres.
Selecciona tu clúster.
Haz clic en la pestaña Interfaces web.
Haz clic en el vínculo YARN ResourceManager.
En la interfaz de YARN ResourceManager, busca tu sesión de Flink y haz clic en el vínculo ApplicationMaster en Tracking UI.
En la columna Estado, confirma que el estado del trabajo sea En ejecución.
Consulta datos de transmisión en el cliente de Flink SQL:
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
Consulta datos de transmisión en BigQuery:
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
Finaliza el trabajo de transmisión en el cliente de Flink SQL:
STOP JOB 'JOB_ID';
Reemplaza
JOB_ID
por el ID de trabajo que se mostró en el resultado cuando creaste el trabajo de transmisión.
Configura tu metastore con Serverless for Apache Spark
Puedes configurar el metastore de BigLake con Serverless for Apache Spark usando Spark SQL o PySpark.
Spark SQL
Crea un archivo SQL con los comandos de Spark SQL que deseas ejecutar en BigLake Metastore. Por ejemplo, este comando crea un espacio de nombres y una tabla:
CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME; CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
Reemplaza lo siguiente:
CATALOG_NAME
: Es el nombre del catálogo que hace referencia a tu tabla de Spark.NAMESPACE_NAME
: Es el nombre del espacio de nombres que hace referencia a tu tabla de Spark.TABLE_NAME
: Es el nombre de la tabla de Spark.WAREHOUSE_DIRECTORY
: Es el URI de la carpeta de Cloud Storage en la que se almacena tu data warehouse.
Ejecuta el siguiente comando
gcloud dataproc batches submit spark-sql
para enviar un trabajo por lotes de Spark SQL:gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \ --project=PROJECT_ID \ --region=REGION \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
Reemplaza lo siguiente:
SQL_SCRIPT_PATH
: Es la ruta de acceso al archivo SQL que usa el trabajo por lotes.PROJECT_ID
: Es el ID del proyecto Google Cloud en el que se ejecutará el trabajo por lotes.REGION
: Es la región en la que se ejecuta tu carga de trabajo.SUBNET_NAME
(opcional): Es el nombre de una subred de VPC enREGION
que cumple con los requisitos de subred de sesión.BUCKET_PATH
: Es la ubicación del bucket de Cloud Storage en el que se subirán las dependencias de la carga de trabajo.WAREHOUSE_DIRECTORY
se encuentra en este bucket. No se requiere el prefijo de URIgs://
del bucket. Puedes especificar la ruta de acceso o el nombre del bucket, por ejemplo,mybucketname1
.LOCATION
: Es la ubicación en la que se ejecutará el trabajo por lotes.
Para obtener más información sobre cómo enviar trabajos por lotes de Spark, consulta Ejecuta una carga de trabajo por lotes de Spark.
PySpark
Crea un archivo de Python con los comandos de PySpark que deseas ejecutar en BigLake Metastore.
Por ejemplo, el siguiente comando configura un entorno de Spark para interactuar con las tablas de Iceberg almacenadas en BigLake Metastore. Luego, el comando crea un espacio de nombres nuevo y una tabla de Iceberg dentro de ese espacio de nombres.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("BigLake Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")
Reemplaza lo siguiente:
PROJECT_ID
: Es el ID del proyecto Google Cloud en el que se ejecutará el trabajo por lotes.LOCATION
: La ubicación en la que se encuentran los recursos de BigQuery.CATALOG_NAME
: Es el nombre del catálogo que hace referencia a tu tabla de Spark.TABLE_NAME
: Es el nombre de la tabla de Spark.WAREHOUSE_DIRECTORY
: Es el URI de la carpeta de Cloud Storage en la que se almacena tu data warehouse.NAMESPACE_NAME
: Es el nombre del espacio de nombres que hace referencia a tu tabla de Spark.
Envía el trabajo por lotes con el siguiente comando
gcloud dataproc batches submit pyspark
:gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
Reemplaza lo siguiente:
PYTHON_SCRIPT_PATH
: Es la ruta de acceso a la secuencia de comandos de Python que usa el trabajo por lotes.PROJECT_ID
: Es el ID del proyecto Google Cloud en el que se ejecutará el trabajo por lotes.REGION
: Es la región en la que se ejecuta tu carga de trabajo.BUCKET_PATH
: Es la ubicación del bucket de Cloud Storage en el que se subirán las dependencias de la carga de trabajo. No se requiere el prefijo de URIgs://
del bucket. Puedes especificar la ruta de acceso o el nombre del bucket, por ejemplo,mybucketname1
.
Para obtener más información sobre el envío de trabajos por lotes de PySpark, consulta la referencia de gcloud de PySpark.
¿Qué sigue?
- Crea y administra recursos del almacén de metadatos.
- Configura las funciones opcionales de BigLake Metastore.