Usa el metastore de BigQuery con Dataproc

En este documento, se explica cómo usar el metastore de BigQuery con Dataproc en Compute Engine. Esta conexión te proporciona un almacén de metadatos único y compartido que funciona en motores de software de código abierto, como Apache Spark.

Antes de comenzar

  1. Habilita la facturación para tu proyecto de Google Cloud . Obtén información sobre cómo verificar si la facturación está habilitada en un proyecto.
  2. Habilita las APIs de BigQuery y Dataproc.

    Habilitar las API

  3. Opcional: Comprende cómo funciona el metastore de BigQuery y por qué deberías usarlo.

Roles obligatorios

Para obtener los permisos que necesitas para usar Spark y Dataproc con el metastore de BigQuery como un almacén de metadatos, pídele a tu administrador que te otorgue los siguientes roles de IAM:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Flujo de trabajo general

Para usar Dataproc en Compute Engine con el metastore de BigQuery, sigue estos pasos generales:

  1. Crea un clúster de Dataproc o configura uno existente.
  2. Conéctate a tu motor de software de código abierto preferido, como Spark.
  3. Usa un archivo JAR para instalar el complemento del catálogo de Apache Iceberg en el clúster.
  4. Crea y administra tus recursos de metastore de BigQuery según sea necesario, según el motor de software de código abierto que uses.
  5. En BigQuery, accede a tus recursos de metastore de BigQuery y úsalo.

Conecta BigQuery Metastore a Spark

En las siguientes instrucciones, se muestra cómo conectar Dataproc al metastore de BigQuery con Spark SQL interactivo.

Descarga el complemento del catálogo de Iceberg

Para conectar el metastore de BigQuery con Dataproc y Spark, debes usar el archivo jar del complemento del catálogo de Iceberg del metastore de BigQuery.

Este archivo se incluye de forma predeterminada en la versión 2.2 de la imagen de Dataproc. Si tus clústeres de Dataproc no tienen acceso directo a Internet, debes descargar el complemento y subirlo a un bucket de Cloud Storage al que pueda acceder tu clúster de Dataproc.

Descarga el complemento de catálogo de Apache Iceberg de BigQuery Metastore.

Configura un clúster de Dataproc

Antes de conectarte al metastore de BigQuery, debes configurar un clúster de Dataproc.

Para ello, puedes crear un clúster nuevo o usar uno existente. Luego, podrás usar este clúster para ejecutar Spark SQL interactivo y administrar tus recursos de metastore de BigQuery.

  • La subred de la región en la que se crea el clúster debe tener habilitado el Acceso privado a Google (PGA). De forma predeterminada, las VMs del clúster de Dataproc, creadas con una versión de imagen 2.2 (predeterminada) o posterior, solo tienen direcciones IP internas. Para permitir que las VMs del clúster se comuniquen con las APIs de Google, habilita el Acceso privado a Google en la subred de red default (o el nombre de red especificado por el usuario, si corresponde) en la región donde se crea el clúster.

  • Si deseas ejecutar el ejemplo de la interfaz web de Zeppelin en esta guía, debes usar o crear un clúster de Dataproc con el componente opcional de Zeppelin habilitado.

Clúster nuevo

Para crear un clúster de Dataproc nuevo, ejecuta el siguiente comando gcloud dataproc clusters create. Esta configuración contiene la configuración que necesitas para usar el metastore de BigQuery.

gcloud dataproc clusters create CLUSTER_NAME \
  --project=PROJECT_ID \
  --region=LOCATION \
  --optional-components=ZEPPELIN \
  --enable-component-gateway \
  --single-node

Reemplaza lo siguiente:

  • CLUSTER_NAME: Es un nombre para tu clúster de Dataproc.
  • PROJECT_ID: Es el ID del proyecto de Google Cloud en el que creas el clúster.
  • LOCATION: Es la región de Google Cloud en la que crearás el clúster.

Clúster existente

Para configurar un clúster existente, agrega el siguiente tiempo de ejecución de Iceberg Spark a tu clúster.

org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2

Puedes agregar el entorno de ejecución con una de las siguientes opciones:

Enviar un trabajo de Spark

Para enviar un trabajo de Spark, usa uno de los siguientes métodos:

gcloud CLI

gcloud dataproc jobs submit spark-sql \
--project=PROJECT_ID \
--cluster=CLUSTER_NAME \
--region==REGION \
--jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar \
--properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
--execute="SPARK_SQL_COMMAND"

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID del proyecto de Google Cloud que contiene el clúster de Dataproc.
  • CLUSTER_NAME: Es el nombre del clúster de Dataproc que usas para ejecutar el trabajo de Spark SQL.
  • REGION: La región de Compute Engine en la que se encuentra tu clúster.
  • LOCATION: la ubicación de los recursos de BigQuery.
  • CATALOG_NAME: Es el nombre del catálogo de Spark que usas con tu trabajo de SQL.
  • WAREHOUSE_DIRECTORY: Es la carpeta de Cloud Storage que contiene tu almacén de datos. Este valor comienza con gs://.
  • SPARK_SQL_COMMAND: Es la consulta en SQL de Spark que deseas ejecutar. Esta consulta incluye los comandos para crear tus recursos. Por ejemplo, para crear un espacio de nombres y una tabla.

Interactive Spark

Conéctate a Spark y, luego, instala el complemento de catálogo

Para instalar el complemento de catálogo para el almacén de metadatos de BigQuery, conéctate a tu clúster de Dataproc con SSH.

  1. En la consola de Google Cloud , ve a la página Instancias de VM.
  2. Para conectarte a una instancia de VM de Dataproc, haz clic en SSH en la lista de instancias de máquina virtual. El resultado es similar al siguiente:

    Connected, host fingerprint: ssh-rsa ...
    Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
    ...
    example-cluster@cluster-1-m:~$
    
  3. En la terminal, ejecuta el siguiente comando de inicialización del metastore de BigQuery:

    spark-sql \
    --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar \
    --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
    --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Reemplaza lo siguiente:

    • CATALOG_NAME: Es el nombre del catálogo de Spark que usas con tu trabajo de SQL.
    • PROJECT_ID: Es el ID del proyecto de Google Cloud del catálogo de metastore de BigQuery con el que se vincula tu catálogo de Spark.
    • LOCATION: La ubicación de Google Cloud del metastore de BigQuery.
    • WAREHOUSE_DIRECTORY: Es la carpeta de Cloud Storage que contiene tu almacén de datos. Este valor comienza con gs://.

    Después de conectarte correctamente a un clúster, la terminal de Spark mostrará el mensaje spark-sql.

    spark-sql (default)>
    

Administra recursos de metastore de BigQuery

Ahora estás conectado al metastore de BigQuery. Puedes ver tus recursos existentes o crear recursos nuevos según los metadatos almacenados en el metastore de BigQuery.

Por ejemplo, intenta ejecutar los siguientes comandos en la sesión interactiva de Spark SQL para crear un espacio de nombres y una tabla de Iceberg.

  • Usa el catálogo de Iceberg personalizado:

    USE `CATALOG_NAME`;
  • Crea un espacio de nombres:

    CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
  • Usa el espacio de nombres creado:

    USE NAMESPACE_NAME;
  • Crea una tabla de Iceberg:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
  • Inserta una fila de la tabla:

    INSERT INTO TABLE_NAME VALUES (1, "first row");
  • Agrega una columna de tabla:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
  • Visualiza los metadatos de la tabla:

    DESCRIBE EXTENDED TABLE_NAME;
  • Obtén una lista de las tablas del espacio de nombres:

    SHOW TABLES;

Notebook de Zeppelin

  1. En la consola de Google Cloud , ve a la página Clústeres de Dataproc.

    Ir a Clústeres de Dataproc

  2. Haz clic en el nombre del clúster que deseas usar.

    Se abrirá la página Detalles del clúster.

  3. En el menú de navegación, haz clic en Interfaces web.

  4. En Puerta de enlace del componente, haz clic en Zeppelin. Se abrirá la página del notebook de Zeppelin.

  5. En el menú de navegación, haz clic en Cuaderno y, luego, en +Crear nota nueva.

  6. En el cuadro de diálogo, ingresa un nombre para el notebook. Deja seleccionado Spark como el intérprete predeterminado.

  7. Haz clic en Crear. Se crea un notebook nuevo.

  8. En el notebook, haz clic en el menú de configuración y, luego, en Intérprete.

  9. En el campo Buscar intérpretes, busca Spark.

  10. Haz clic en Edit.

  11. En el campo Spark.jars, ingresa el URI del jar de Spark.

    https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar
    
  12. Haz clic en Guardar.

  13. Haz clic en Aceptar.

  14. Copia el siguiente código de PySpark en tu notebook de Zeppelin.

    %pyspark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.sql("select version()").show()
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME;").show()

    Reemplaza lo siguiente:

    • CATALOG_NAME: Es el nombre del catálogo de Spark que se usará para la tarea de SQL.
    • PROJECT_ID: Es el ID del proyecto de Google Cloud que contiene el clúster de Dataproc.
    • WAREHOUSE_DIRECTORY: Es la carpeta de Cloud Storage que contiene tu almacén de datos. Este valor comienza con gs://.
    • NAMESPACE_NAME: Es el nombre del espacio de nombres que hace referencia a tu tabla de Spark.
    • WAREHOUSE_DIRECTORY: Es el URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.
    • TABLE_NAME: Es un nombre de tabla para tu tabla de Spark.
  15. Haz clic en el ícono de ejecución o presiona Shift-Enter para ejecutar el código. Cuando se completa el trabajo, el mensaje de estado muestra "Spark Job Finished" y el resultado muestra el contenido de la tabla:

¿Qué sigue?