Configura BigLake Metastore

Questo documento spiega come configurare BigLake Metastore con Dataproc o Google Cloud Serverless per Apache Spark per creare un metastore singolo e condiviso che funzioni con motori open source, come Apache Spark o Apache Flink.

Prima di iniziare

  1. Abilita la fatturazione per il tuo progetto Google Cloud . Scopri come verificare se la fatturazione è abilitata per un progetto.
  2. Abilita le API BigQuery e Dataproc.

    Abilita le API

  3. (Facoltativo) Scopri come funziona BigLake Metastore e perché dovresti utilizzarlo.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per configurare il metastore BigLake, chiedi all'amministratore di concederti i seguenti ruoli IAM:

Per ulteriori informazioni sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Configura il metastore con Dataproc

Puoi configurare BigLake Metastore con Dataproc utilizzando Spark o Flink:

Spark

  1. Configura un nuovo cluster. Per creare un nuovo cluster Dataproc, esegui il seguente comando gcloud dataproc clusters create, che contiene le impostazioni che devi utilizzare BigLake Metastore:

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=LOCATION \
        --single-node

    Sostituisci quanto segue:

    • CLUSTER_NAME: un nome per il tuo cluster Dataproc.
    • PROJECT_ID: l'ID del Google Cloud progetto in cui stai creando il cluster.
    • LOCATION: la regione Compute Engine in cui stai creando il cluster.
  2. Invia un job Spark utilizzando uno dei seguenti metodi:

    Google Cloud CLI

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
        --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
        --execute="SPARK_SQL_COMMAND"

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto Google Cloud che contiene il cluster Dataproc.
    • CLUSTER_NAME: il nome del cluster Dataproc che utilizzi per eseguire il job Spark SQL.
    • REGION: la regione di Compute Engine in cui si trova il cluster.
    • LOCATION: la posizione delle risorse BigQuery.
    • CATALOG_NAME: il nome del catalogo Spark da utilizzare con il job SQL.
    • WAREHOUSE_DIRECTORY: la cartella Cloud Storage che contiene il data warehouse. Questo valore inizia con gs://.
    • SPARK_SQL_COMMAND: la query Spark SQL che vuoi eseguire. Questa query include i comandi per creare le risorse. Ad esempio, per creare uno spazio dei nomi e una tabella.

    Interfaccia a riga di comando spark-sql

    1. Nella console Google Cloud , vai alla pagina Istanze VM.

      Vai a Istanze VM

    2. Per connetterti a un'istanza VM Dataproc, fai clic su SSH nella riga che elenca il nome dell'istanza VM principale del cluster Dataproc, ovvero il nome del cluster seguito dal suffisso -m. L'output è simile al seguente:

      Connected, host fingerprint: ssh-rsa ...
      Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
      ...
      example-cluster@cluster-1-m:~$
      
    3. Nel terminale, esegui il seguente comando di inizializzazione del metastore BigLake:

      spark-sql \
          --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
          --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
          --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

      Sostituisci quanto segue:

      • CATALOG_NAME: il nome del catalogo Spark che utilizzi con il tuo job SQL.
      • PROJECT_ID: l'ID progetto Google Cloud del catalogo BigLake Metastore a cui è collegato il catalogo Spark.
      • LOCATION: la posizione Google Cloud del metastore BigLake.
      • WAREHOUSE_DIRECTORY: la cartella Cloud Storage che contiene il data warehouse. Questo valore inizia con gs://.

      Dopo aver eseguito correttamente la connessione al cluster, il terminale Spark visualizza il prompt spark-sql, che puoi utilizzare per inviare job Spark.

      spark-sql (default)>
      
  1. Crea un cluster Dataproc con il componente Flink facoltativo abilitato e assicurati di utilizzare Dataproc 2.2 o versioni successive.
  2. Nella console Google Cloud , vai alla pagina Istanze VM.

    Vai a Istanze VM

  3. Nell'elenco delle istanze della macchina virtuale, fai clic su SSH per connetterti all'istanza VM del cluster Dataproc principale, elencata come nome del cluster seguito dal suffisso -m.

  4. Configura il plug-in del catalogo personalizzato Iceberg per BigLake Metastore:

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. Avvia la sessione Flink su YARN:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
    -s yarn-session
  6. Crea un catalogo in Flink:

    CREATE CATALOG CATALOG_NAME WITH (
    'type'='iceberg',
    'warehouse'='WAREHOUSE_DIRECTORY',
    'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
    'gcp_project'='PROJECT_ID',
    'gcp_location'='LOCATION'
    );

    Sostituisci quanto segue:

    • CATALOG_NAME: l'identificatore del catalogo Flink, collegato a un catalogo BigLake Metastore.
    • WAREHOUSE_DIRECTORY: il percorso di base per la directory del warehouse (la cartella Cloud Storage in cui Flink crea i file). Questo valore inizia con gs://.
    • PROJECT_ID: l'ID progetto del catalogo BigLake Metastore a cui è collegato il catalogo Flink.
    • LOCATION: la posizione delle risorse BigQuery.

La sessione Flink è ora connessa a BigLake Metastore e puoi eseguire comandi Flink SQL.

Ora che hai stabilito la connessione al metastore BigLake, puoi creare e visualizzare le risorse in base ai metadati archiviati nel metastore BigLake.

Ad esempio, prova a eseguire i seguenti comandi nella sessione Flink SQL interattiva per creare un database e una tabella Iceberg.

  1. Utilizza il catalogo Iceberg personalizzato:

    USE CATALOG CATALOG_NAME;

    Sostituisci CATALOG_NAME con l'identificatore del catalogo Flink.

  2. Crea un database, che crea un set di dati in BigQuery:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    Sostituisci DATABASE_NAME con il nome del nuovo database.

  3. Utilizza il database che hai creato:

    USE DATABASE_NAME;
  4. Crea una tabella Iceberg. Il seguente codice crea una tabella delle vendite di esempio:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
      order_number BIGINT,
      price        DECIMAL(32,2),
      buyer        ROW<first_name STRING, last_name STRING>,
      order_time   TIMESTAMP(3)
    );

    Sostituisci ICEBERG_TABLE_NAME con un nome per la nuova tabella.

  5. Visualizza i metadati della tabella:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. Elenca le tabelle nel database:

    SHOW TABLES;

Importa i dati nella tabella

Dopo aver creato una tabella Iceberg nella sezione precedente, puoi utilizzare Flink DataGen come origine dati per importare dati in tempo reale nella tabella. I seguenti passaggi sono un esempio di questo flusso di lavoro:

  1. Crea una tabella temporanea utilizzando DataGen:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.order_number.kind' = 'sequence',
      'fields.order_number.start' = '1',
      'fields.order_number.end' = '1000000',
      'fields.price.min' = '0',
      'fields.price.max' = '10000',
      'fields.buyer.first_name.length' = '10',
      'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    Sostituisci quanto segue:

    • DATABASE_NAME: il nome del database in cui archiviare la tabella temporanea.
    • TEMP_TABLE_NAME: un nome per la tabella temporanea.
    • ICEBERG_TABLE_NAME: il nome della tabella Iceberg creata nella sezione precedente.
  2. Imposta il parallelismo su 1:

    SET 'parallelism.default' = '1';
  3. Imposta l'intervallo del checkpoint:

    SET 'execution.checkpointing.interval' = '10second';
  4. Imposta il checkpoint:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. Avvia il job di streaming in tempo reale:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    L'output è simile al seguente:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. Per controllare lo stato del job di streaming:

    1. Nella console Google Cloud , vai alla pagina Cluster.

      Vai a Cluster

    2. Seleziona il cluster.

    3. Fai clic sulla scheda Interfacce web.

    4. Fai clic sul link YARN ResourceManager.

    5. Nell'interfaccia YARN ResourceManager, trova la sessione Flink e fai clic sul link ApplicationMaster nella sezione UI di monitoraggio.

    6. Nella colonna Stato, verifica che lo stato del job sia In esecuzione.

  7. Esegui query sui dati di streaming nel client Flink SQL:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. Esegui query sui dati di streaming in BigQuery:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Termina il job di streaming nel client Flink SQL:

    STOP JOB 'JOB_ID';

    Sostituisci JOB_ID con l'ID job visualizzato nell'output quando hai creato il job di streaming.

Configurare il metastore con Serverless per Apache Spark

Puoi configurare il metastore BigLake con Serverless per Apache Spark utilizzando Spark SQL o PySpark.

Spark SQL

  1. Crea un file SQL con i comandi Spark SQL che vuoi eseguire in BigLake Metastore. Ad esempio, questo comando crea uno spazio dei nomi e una tabella:

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    Sostituisci quanto segue:

    • CATALOG_NAME: il nome del catalogo che fa riferimento alla tabella Spark.
    • NAMESPACE_NAME: il nome dello spazio dei nomi che fa riferimento alla tabella Spark.
    • TABLE_NAME: un nome di tabella per la tabella Spark.
    • WAREHOUSE_DIRECTORY: l'URI della cartella Cloud Storage in cui è archiviato il data warehouse.
  2. Invia un job batch Spark SQL eseguendo il seguente comando gcloud dataproc batches submit spark-sql:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Sostituisci quanto segue:

    • SQL_SCRIPT_PATH: il percorso del file SQL utilizzato dal job batch.
    • PROJECT_ID: l'ID del Google Cloud progetto in cui eseguire il job batch.
    • REGION: la regione in cui viene eseguito il tuo workload.
    • SUBNET_NAME (facoltativo): il nome di una subnet VPC in REGION che soddisfa i requisiti della subnet di sessione.
    • BUCKET_PATH: la posizione del bucket Cloud Storage in cui caricare le dipendenze del workload. WAREHOUSE_DIRECTORY si trova in questo bucket. Il prefisso URI gs:// del bucket non è obbligatorio. Puoi specificare il percorso del bucket o il nome del bucket, ad esempio mybucketname1.
    • LOCATION: la località in cui eseguire il job batch.

    Per ulteriori informazioni sull'invio di job batch Spark, consulta Eseguire un carico di lavoro batch Spark.

PySpark

  1. Crea un file Python con i comandi PySpark che vuoi eseguire in BigLake Metastore.

    Ad esempio, il seguente comando configura un ambiente Spark per interagire con le tabelle Iceberg archiviate nel metastore BigLake. Il comando crea quindi un nuovo spazio dei nomi e una tabella Iceberg all'interno di questo spazio dei nomi.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del Google Cloud progetto in cui eseguire il job batch.
    • LOCATION: la posizione in cui si trovano le risorse BigQuery.
    • CATALOG_NAME: il nome del catalogo che fa riferimento alla tabella Spark.
    • TABLE_NAME: un nome di tabella per la tabella Spark.
    • WAREHOUSE_DIRECTORY: l'URI della cartella Cloud Storage in cui è archiviato il data warehouse.
    • NAMESPACE_NAME: il nome dello spazio dei nomi che fa riferimento alla tabella Spark.
  2. Invia il job batch utilizzando il seguente comando gcloud dataproc batches submit pyspark:

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
        --version=2.2 \
        --project=PROJECT_ID \
        --region=REGION \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Sostituisci quanto segue:

    • PYTHON_SCRIPT_PATH: il percorso dello script Python utilizzato dal job batch.
    • PROJECT_ID: l'ID del Google Cloud progetto in cui eseguire il job batch.
    • REGION: la regione in cui viene eseguito il tuo workload.
    • BUCKET_PATH: la posizione del bucket Cloud Storage in cui caricare le dipendenze del workload. Il prefisso URI gs:// del bucket non è obbligatorio. Puoi specificare il percorso del bucket o il nome del bucket, ad esempio mybucketname1.

    Per ulteriori informazioni sull'invio di job batch PySpark, consulta il riferimento a gcloud per PySpark.

Passaggi successivi