Configure o metastore do BigLake

Este documento explica como configurar o metastore do BigLake com o Dataproc ou o Google Cloud Serverless para Apache Spark para criar um metastore único e partilhado que funciona em motores de código aberto, como o Apache Spark ou o Apache Flink.

Antes de começar

  1. Ative a faturação para o seu Google Cloud projeto. Saiba como verificar se a faturação está ativada num projeto.
  2. Ative as APIs BigQuery e Dataproc.

    Ative as APIs

  3. Opcional: compreenda como funciona o metastore do BigLake e por que motivo o deve usar.

Funções necessárias

Para receber as autorizações de que precisa para configurar o metastore do BigLake, peça ao seu administrador para lhe conceder as seguintes funções de IAM:

Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.

Também pode conseguir as autorizações necessárias através de funções personalizadas ou outras funções predefinidas.

Configure o metastore com o Dataproc

Pode configurar o metastore do BigLake com o Dataproc através do Spark ou do Flink:

Spark

  1. Configurar um novo cluster. Para criar um novo cluster do Dataproc, execute o seguinte comando gcloud dataproc clusters create, que contém as definições que tem de usar o metastore do BigLake:

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=LOCATION \
        --single-node

    Substitua o seguinte:

    • CLUSTER_NAME: um nome para o seu cluster do Dataproc.
    • PROJECT_ID: o ID do Google Cloud projeto onde está a criar o cluster.
    • LOCATION: a região do Compute Engine onde está a criar o cluster.
  2. Envie uma tarefa do Spark através de um dos seguintes métodos:

    CLI do Google Cloud

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
        --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
        --execute="SPARK_SQL_COMMAND"

    Substitua o seguinte:

    • PROJECT_ID: o ID do Google Cloud projeto que contém o cluster do Dataproc.
    • CLUSTER_NAME: o nome do cluster do Dataproc que está a usar para executar a tarefa do Spark SQL.
    • REGION: a região do Compute Engine onde o cluster está localizado.
    • LOCATION: a localização dos recursos do BigQuery.
    • CATALOG_NAME: o nome do catálogo do Spark a usar com a sua tarefa de SQL.
    • WAREHOUSE_DIRECTORY: a pasta de armazenamento na nuvem que contém o seu armazém de dados. Este valor começa com gs://.
    • SPARK_SQL_COMMAND: a consulta SQL do Spark que quer executar. Esta consulta inclui os comandos para criar os seus recursos. Por exemplo, para criar um espaço de nomes e uma tabela.

    CLI spark-sql

    1. Na Google Cloud consola, aceda à página Instâncias de VM.

      Aceder a Instâncias de VM

    2. Para estabelecer ligação a uma instância de VM do Dataproc, clique em SSH na linha que apresenta o nome da instância de VM principal do cluster do Dataproc, que é o nome do cluster seguido de um sufixo -m. O resultado é semelhante ao seguinte:

      Connected, host fingerprint: ssh-rsa ...
      Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
      ...
      example-cluster@cluster-1-m:~$
      
    3. No terminal, execute o seguinte comando de inicialização do metastore do BigLake:

      spark-sql \
          --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
          --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
          --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

      Substitua o seguinte:

      • CATALOG_NAME: o nome do catálogo do Spark que está a usar com a sua tarefa de SQL.
      • PROJECT_ID: o ID do projeto do catálogo do metastore do BigLake ao qual o seu catálogo do Spark está associado. Google Cloud
      • LOCATION: a Google Cloud localização do metastore do BigLake.
      • WAREHOUSE_DIRECTORY: a pasta de armazenamento na nuvem que contém o seu armazém de dados. Este valor começa com gs://.

      Depois de se ligar com êxito ao cluster, o terminal do Spark apresenta o comando spark-sql, que pode usar para enviar tarefas do Spark.

      spark-sql (default)>
      
  1. Crie um cluster do Dataproc com o componente Flink opcional ativado e certifique-se de que está a usar o Dataproc 2.2 ou posterior.
  2. Na Google Cloud consola, aceda à página Instâncias de VM.

    Aceder às instâncias de VM

  3. Na lista de instâncias de máquinas virtuais, clique em SSH para estabelecer ligação à instância de VM do cluster do Dataproc principal, que é apresentada como o nome do cluster seguido do sufixo -m.

  4. Configure o plug-in de catálogo personalizado do Iceberg para o metastore do BigLake:

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. Inicie a sessão do Flink no YARN:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
    -s yarn-session
  6. Crie um catálogo no Flink:

    CREATE CATALOG CATALOG_NAME WITH (
    'type'='iceberg',
    'warehouse'='WAREHOUSE_DIRECTORY',
    'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
    'gcp_project'='PROJECT_ID',
    'gcp_location'='LOCATION'
    );

    Substitua o seguinte:

    • CATALOG_NAME: o identificador do catálogo do Flink, que está associado a um catálogo do metastore do BigLake.
    • WAREHOUSE_DIRECTORY: o caminho base para o diretório do data warehouse (a pasta do Cloud Storage onde o Flink cria ficheiros). Este valor começa com gs://.
    • PROJECT_ID: o ID do projeto do catálogo do metastore do BigLake ao qual o catálogo do Flink está associado.
    • LOCATION: a localização dos recursos do BigQuery.

A sua sessão do Flink está agora associada ao metastore do BigLake e pode executar comandos SQL do Flink.

Agora que tem ligação ao metastore do BigLake, pode criar e ver recursos com base nos metadados armazenados no metastore do BigLake.

Por exemplo, experimente executar os seguintes comandos na sua sessão interativa do Flink SQL para criar uma base de dados e uma tabela do Iceberg.

  1. Use o catálogo Iceberg personalizado:

    USE CATALOG CATALOG_NAME;

    Substitua CATALOG_NAME pelo identificador do catálogo do Flink.

  2. Crie uma base de dados, que cria um conjunto de dados no BigQuery:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    Substitua DATABASE_NAME pelo nome da sua nova base de dados.

  3. Use a base de dados que criou:

    USE DATABASE_NAME;
  4. Crie uma tabela Iceberg. O seguinte cria uma tabela de vendas de exemplo:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
      order_number BIGINT,
      price        DECIMAL(32,2),
      buyer        ROW<first_name STRING, last_name STRING>,
      order_time   TIMESTAMP(3)
    );

    Substitua ICEBERG_TABLE_NAME por um nome para a nova tabela.

  5. Ver metadados da tabela:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. Listar tabelas na base de dados:

    SHOW TABLES;

Carregue dados para a tabela

Depois de criar uma tabela Iceberg na secção anterior, pode usar o Flink DataGen como uma origem de dados para carregar dados em tempo real para a sua tabela. Os passos seguintes são um exemplo deste fluxo de trabalho:

  1. Crie uma tabela temporária com o DataGen:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.order_number.kind' = 'sequence',
      'fields.order_number.start' = '1',
      'fields.order_number.end' = '1000000',
      'fields.price.min' = '0',
      'fields.price.max' = '10000',
      'fields.buyer.first_name.length' = '10',
      'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    Substitua o seguinte:

    • DATABASE_NAME: o nome da base de dados para armazenar a tabela temporária.
    • TEMP_TABLE_NAME: um nome para a sua tabela temporária.
    • ICEBERG_TABLE_NAME: o nome da tabela Iceberg que criou na secção anterior.
  2. Defina o paralelismo para 1:

    SET 'parallelism.default' = '1';
  3. Defina o intervalo do controlo de segurança:

    SET 'execution.checkpointing.interval' = '10second';
  4. Defina o controlo de segurança:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. Inicie a tarefa de streaming em tempo real:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    O resultado é semelhante ao seguinte:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. Para verificar o estado da tarefa de streaming, faça o seguinte:

    1. Na Google Cloud consola, aceda à página Clusters.

      Aceda a Clusters

    2. Selecione o cluster.

    3. Clique no separador Interfaces Web.

    4. Clique no link YARN ResourceManager.

    5. Na interface do YARN ResourceManager, encontre a sua sessão do Flink e clique no link ApplicationMaster em Tracking UI.

    6. Na coluna Estado, confirme que o estado da tarefa é Em execução.

  7. Consultar dados de streaming no cliente SQL do Flink:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. Consultar dados de streaming no BigQuery:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Termine a tarefa de streaming no cliente SQL do Flink:

    STOP JOB 'JOB_ID';

    Substitua JOB_ID pelo ID da tarefa apresentado no resultado quando criou a tarefa de streaming.

Configure o metastore com o Serverless para Apache Spark

Pode configurar o metastore do BigLake com o Serverless para Apache Spark usando o Spark SQL ou o PySpark.

Spark SQL

  1. Crie um ficheiro SQL com os comandos SQL do Spark que quer executar no metastore do BigLake. Por exemplo, este comando cria um espaço de nomes e uma tabela:

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    Substitua o seguinte:

    • CATALOG_NAME: o nome do catálogo que faz referência à sua tabela do Spark.
    • NAMESPACE_NAME: o nome do espaço de nomes que faz referência à sua tabela do Spark.
    • TABLE_NAME: um nome de tabela para a sua tabela do Spark.
    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage onde o seu data warehouse está armazenado.
  2. Envie uma tarefa em lote do Spark SQL executando o seguinte comando gcloud dataproc batches submit spark-sql:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Substitua o seguinte:

    • SQL_SCRIPT_PATH: o caminho para o ficheiro SQL que a tarefa em lote usa.
    • PROJECT_ID: o ID do Google Cloud projeto no qual executar a tarefa em lote.
    • REGION: a região onde a sua carga de trabalho é executada.
    • SUBNET_NAME (opcional): o nome de uma sub-rede da VPC na REGION que cumpre os requisitos da sub-rede de sessão.
    • BUCKET_PATH: a localização do contentor do Cloud Storage para carregar dependências da carga de trabalho. O ficheiro WAREHOUSE_DIRECTORY está localizado neste contentor. O prefixo de URI gs:// do contentor não é obrigatório. Pode especificar o caminho do contentor ou o nome do contentor, por exemplo, mybucketname1.
    • LOCATION: a localização onde executar a tarefa em lote.

    Para mais informações sobre o envio de tarefas em lote do Spark, consulte o artigo Execute uma carga de trabalho em lote do Spark.

PySpark

  1. Crie um ficheiro Python com os comandos PySpark que quer executar no metastore do BigLake.

    Por exemplo, o seguinte comando configura um ambiente Spark para interagir com tabelas Iceberg armazenadas no metastore do BigLake. Em seguida, o comando cria um novo espaço de nomes e uma tabela Iceberg nesse espaço de nomes.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    Substitua o seguinte:

    • PROJECT_ID: o ID do Google Cloud projeto no qual executar a tarefa em lote.
    • LOCATION: a localização onde os recursos do BigQuery estão localizados.
    • CATALOG_NAME: o nome do catálogo que faz referência à sua tabela do Spark.
    • TABLE_NAME: um nome de tabela para a sua tabela do Spark.
    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage onde o seu data warehouse está armazenado.
    • NAMESPACE_NAME: o nome do espaço de nomes que faz referência à sua tabela do Spark.
  2. Envie a tarefa em lote através do seguinte gcloud dataproc batches submit pyspark comando:

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
        --version=2.2 \
        --project=PROJECT_ID \
        --region=REGION \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Substitua o seguinte:

    • PYTHON_SCRIPT_PATH: o caminho para o script Python que a tarefa em lote usa.
    • PROJECT_ID: o ID do Google Cloud projeto no qual executar a tarefa em lote.
    • REGION: a região onde a sua carga de trabalho é executada.
    • BUCKET_PATH: a localização do contentor do Cloud Storage para carregar dependências da carga de trabalho. O prefixo de URI gs:// do contentor não é obrigatório. Pode especificar o caminho ou o nome do contentor, por exemplo, mybucketname1.

    Para mais informações sobre o envio de tarefas em lote do PySpark, consulte a referência do gcloud do PySpark.

O que se segue?