Use o metastore do BigLake com o Dataproc
Este documento explica como usar o metastore do BigLake com o Dataproc no Compute Engine. Esta ligação oferece-lhe um único metastore partilhado que funciona em motores de software de código aberto, como o Apache Spark ou o Apache Flink.
Antes de começar
- Ative a faturação para o seu Google Cloud projeto. Saiba como verificar se a faturação está ativada num projeto.
Ative as APIs BigQuery e Dataproc.
Opcional: compreenda como funciona o metastore do BigLake e por que motivo o deve usar.
Funções necessárias
Para receber as autorizações de que precisa para usar o Spark ou o Flink e o Dataproc com o metastore do BigLake como uma loja de metadados, peça ao seu administrador para lhe conceder as seguintes funções do IAM:
-
Crie um cluster do Dataproc:
Trabalhador do Dataproc (
roles/dataproc.worker
) na conta de serviço predefinida do Compute Engine no projeto -
Crie tabelas do metastore do BigLake no Spark ou no Flink:
-
Trabalhador do Dataproc (
roles/dataproc.worker
) na conta de serviço da VM do Dataproc no projeto -
Editor de dados do BigQuery (
roles/bigquery.dataEditor
) na conta de serviço da VM do Dataproc no projeto -
Administrador de objetos de armazenamento (
roles/storage.objectAdmin
) na conta de serviço da VM do Dataproc no projeto
-
Trabalhador do Dataproc (
-
Consultar tabelas do metastore do BigLake no BigQuery:
-
Visualizador de dados do BigQuery (
roles/bigquery.dataViewer
) no projeto -
Utilizador do BigQuery (
roles/bigquery.user
) no projeto -
Visualizador de objetos do Storage (
roles/storage.objectViewer
) no projeto
-
Visualizador de dados do BigQuery (
Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.
Também pode conseguir as autorizações necessárias através de funções personalizadas ou outras funções predefinidas.
Fluxo de trabalho geral
Para usar o Dataproc no Compute Engine com o metastore do BigLake, siga estes passos gerais:
- Crie um cluster do Dataproc ou configure um cluster existente.
- Estabeleça ligação ao seu motor de software de código aberto preferido, como o Spark ou o Flink.
- Use um ficheiro JAR para instalar o plug-in do catálogo do Apache Iceberg no cluster.
- Crie e faça a gestão dos recursos do metastore do BigLake conforme necessário, consoante o motor de software de código aberto que estiver a usar.
- No BigQuery, aceda aos recursos do metastore do BigLake e use-os.
Associe o metastore do BigLake ao Spark
As instruções seguintes mostram como ligar o Dataproc ao metastore do BigLake através do Spark SQL interativo.
Transfira o plugin do catálogo Iceberg
Para associar o metastore do BigLake ao Dataproc e ao Spark, tem de usar o ficheiro JAR do plug-in do catálogo Iceberg do metastore do BigLake.
Este ficheiro está incluído por predefinição na versão 2.2 da imagem do Dataproc. Se os seus clusters do Dataproc não tiverem acesso direto à Internet, tem de transferir o plug-in e carregá-lo para um contentor do Cloud Storage ao qual o seu cluster do Dataproc possa aceder.
Transfira o plug-in do catálogo Iceberg da metastore do BigLake.
Configure um cluster do Dataproc
Antes de estabelecer ligação ao metastore do BigLake, tem de configurar um cluster do Dataproc.
Para tal, pode criar um novo cluster ou usar um cluster existente. Depois, usa este cluster para executar o Spark SQL interativo e gerir os recursos do metastore do BigLake.
A sub-rede na região onde o cluster é criado tem de ter o acesso privado à Google (PGA) ativado. Por predefinição, as VMs do cluster do Dataproc, criadas com uma versão de imagem 2.2 (predefinição) ou posterior, têm apenas endereços IP internos. Para permitir que as VMs do cluster comuniquem com as APIs Google, ative o acesso privado à Google na sub-rede da rede
default
(ou no nome da rede especificado pelo utilizador, se aplicável) na região onde o cluster é criado.Se quiser executar o exemplo da interface Web do Zeppelin neste guia, tem de usar ou criar um cluster do Dataproc com o componente opcional do Zeppelin ativado.
Novo cluster
Para criar um novo cluster do Dataproc, execute o seguinte comando gcloud
dataproc clusters create
. Esta configuração contém as definições que tem de usar para o metastore do BigLake.
gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --optional-components=ZEPPELIN \ --enable-component-gateway \ --single-node
Substitua o seguinte:
CLUSTER_NAME
: um nome para o seu cluster do Dataproc.PROJECT_ID
: o ID do Google Cloud projeto onde está a criar o cluster.LOCATION
: a Google Cloud região onde está a criar o cluster.
Cluster existente
Para configurar um cluster existente, adicione o seguinte tempo de execução do Iceberg Spark ao seu cluster.
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1
Pode adicionar o tempo de execução com uma das seguintes opções:
Script de inicialização. Adicione a dependência de tempo de execução a umscript de inicialização personalizado que é executado quando o é criado.
Depois de adicionar a dependência de tempo de execução ao script, siga as instruções para criar, recriar e atualizar um cluster.
Instalação manual. Adicione manualmente o ficheiro JAR do plugin do catálogo Iceberg e configure as propriedades do Spark para incluir o tempo de execução no seu cluster.
Envie uma tarefa do Spark
Para enviar uma tarefa do Spark, use um dos seguintes métodos:
CLI gcloud
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region==REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
Substitua o seguinte:
PROJECT_ID
: o ID do Google Cloud projeto que contém o cluster do Dataproc.CLUSTER_NAME
: o nome do cluster do Dataproc que está a usar para executar a tarefa do Spark SQL.REGION
: a região do Compute Engine onde o cluster está localizado.LOCATION
: a localização dos recursos do BigQuery.CATALOG_NAME
: o nome do catálogo do Spark que está a usar com a sua tarefa de SQL.WAREHOUSE_DIRECTORY
: a pasta de armazenamento na nuvem que contém o seu armazém de dados. Este valor começa comgs://
.SPARK_SQL_COMMAND
: a consulta SQL do Spark que quer executar. Esta consulta inclui os comandos para criar os seus recursos. Por exemplo, para criar um espaço de nomes e uma tabela.
Spark interativo
Ligue-se ao Spark e instale o plug-in do catálogo
Para instalar o plug-in de catálogo para o metastore do BigLake, estabeleça ligação ao cluster do Dataproc através de SSH.
- Na Google Cloud consola, aceda à página Instâncias de VM.
Para estabelecer ligação a uma instância de VM do Dataproc, clique em SSH na lista de instâncias de máquinas virtuais. O resultado é semelhante ao seguinte:
Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$
No terminal, execute o seguinte comando de inicialização do metastore do BigLake:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Substitua o seguinte:
CATALOG_NAME
: o nome do catálogo do Spark que está a usar com a sua tarefa de SQL.PROJECT_ID
: o ID do projeto do catálogo do metastore do BigLake ao qual o seu catálogo do Spark está associado. Google CloudLOCATION
: a Google Cloud localização do metastore do BigLake.WAREHOUSE_DIRECTORY
: a pasta de armazenamento na nuvem que contém o seu armazém de dados. Este valor começa comgs://
.
Depois de se ligar com êxito a um cluster, o terminal do Spark apresenta o comando
spark-sql
.spark-sql (default)>
Faça a gestão dos recursos do metastore do BigLake
Já tem ligação ao metastore do BigLake. Pode ver os recursos existentes ou criar novos recursos com base nos metadados armazenados no metastore do BigLake.
Por exemplo, experimente executar os seguintes comandos na sessão interativa do Spark SQL para criar um espaço de nomes e uma tabela do Iceberg.
Use o catálogo Iceberg personalizado:
USE `CATALOG_NAME`;
Crie um espaço de nomes:
CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
Use o espaço de nomes criado:
USE NAMESPACE_NAME;
Crie uma tabela Iceberg:
CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
Inserir uma linha da tabela:
INSERT INTO TABLE_NAME VALUES (1, "first row");
Adicione uma coluna de tabela:
ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
Ver metadados da tabela:
DESCRIBE EXTENDED TABLE_NAME;
Listar tabelas no espaço de nomes:
SHOW TABLES;
Bloco de notas do Zeppelin
Na Google Cloud consola, aceda à página Clusters do Dataproc.
Clique no nome do cluster que quer usar.
É apresentada a página Detalhes do cluster.
No menu de navegação, clique em Interfaces Web.
Em Gateway de componentes, clique em Zeppelin. É aberta a página do notebook do Zeppelin.
No menu de navegação, clique em Bloco de notas e, de seguida, clique em +Criar nova nota.
Na caixa de diálogo, introduza um nome para o bloco de notas. Deixe o Spark selecionado como o intérprete predefinido.
Clique em Criar. É criado um novo bloco de notas.
No bloco de notas, clique no menu de definições e, de seguida, clique em Intérprete.
No campo Pesquisar intérpretes, pesquise Spark.
Clique em Edit.
No campo Spark.jars, introduza o URI do JAR do Spark.
https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
Clique em Guardar.
Clique em OK.
Copie o código PySpark seguinte para o seu bloco de notas do Zeppelin.
%pyspark from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("BigLake Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("select version()").show() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE TABLE_NAME;").show()
Substitua o seguinte:
CATALOG_NAME
: o nome do catálogo do Spark a usar para a tarefa de SQL.PROJECT_ID
: o ID do Google Cloud projeto que contém o cluster do Dataproc.WAREHOUSE_DIRECTORY
: a pasta de armazenamento na nuvem que contém o seu armazém de dados. Este valor começa comgs://
.NAMESPACE_NAME
: o nome do espaço de nomes que faz referência à sua tabela do Spark.WAREHOUSE_DIRECTORY
: o URI da pasta do Cloud Storage onde o seu data warehouse está armazenado.TABLE_NAME
: um nome de tabela para a sua tabela do Spark.
Clique no ícone de execução ou prima
Shift-Enter
para executar o código. Quando a tarefa estiver concluída, a mensagem de estado mostra "Spark Job Finished" e a saída apresenta o conteúdo da tabela:
Associe o metastore do BigLake ao Flink
As instruções seguintes mostram como ligar o Dataproc ao metastore do BigLake através do cliente Flink SQL.
Instale o plug-in de catálogo e estabeleça ligação a uma sessão do Flink
Para ligar o metastore do BigLake ao Flink, faça o seguinte:
- Crie um cluster do Dataproc com o componente Flink opcional ativado e certifique-se de que está a usar o Dataproc 2.2 ou posterior.
Na Google Cloud consola, aceda à página Instâncias de VM.
Na lista de instâncias de máquinas virtuais, clique em SSH para estabelecer ligação a uma instância de VM do Dataproc.
Configure o plug-in de catálogo personalizado do Iceberg para o metastore do BigLake:
FLINK_VERSION=1.17 ICEBERG_VERSION=1.5.2 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
Inicie a sessão do Flink no YARN:
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
Crie um catálogo no Flink:
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp_project'='PROJECT_ID', 'gcp_location'='LOCATION' );
Substitua o seguinte:
CATALOG_NAME
: o identificador do catálogo do Flink, que está associado a um catálogo do metastore do BigLake.WAREHOUSE_DIRECTORY
: o caminho base para o diretório do data warehouse (a pasta do Cloud Storage onde o Flink cria ficheiros). Este valor começa comgs://
.PROJECT_ID
: o ID do projeto do catálogo do metastore do BigLake ao qual o catálogo do Flink está associado.LOCATION
: a localização dos recursos do BigQuery.
A sua sessão do Flink está agora associada ao metastore do BigLake e pode executar comandos SQL do Flink.
Faça a gestão dos recursos do metastore do BigLake
Agora que tem ligação ao metastore do BigLake, pode criar e ver recursos com base nos metadados armazenados no metastore do BigLake.
Por exemplo, experimente executar os seguintes comandos na sua sessão interativa do Flink SQL para criar uma base de dados e uma tabela do Iceberg.
Use o catálogo Iceberg personalizado:
USE CATALOG CATALOG_NAME;
Substitua
CATALOG_NAME
pelo identificador do catálogo do Flink.Crie uma base de dados, o que cria um conjunto de dados no BigQuery:
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
Substitua
DATABASE_NAME
pelo nome da sua nova base de dados.Use a base de dados que criou:
USE DATABASE_NAME;
Crie uma tabela Iceberg. O seguinte cria uma tabela de vendas de exemplo:
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
Substitua
ICEBERG_TABLE_NAME
por um nome para a nova tabela.Ver metadados da tabela:
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
Listar tabelas na base de dados:
SHOW TABLES;
Carregue dados para a tabela
Depois de criar uma tabela Iceberg na secção anterior, pode usar o Flink DataGen como uma origem de dados para carregar dados em tempo real para a sua tabela. Os passos seguintes são um exemplo deste fluxo de trabalho:
Crie uma tabela temporária com o DataGen:
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
Substitua o seguinte:
DATABASE_NAME
: o nome da base de dados para armazenar a tabela temporária.TEMP_TABLE_NAME
: um nome para a sua tabela temporária.ICEBERG_TABLE_NAME
: o nome da tabela Iceberg que criou na secção anterior.
Defina o paralelismo para 1:
SET 'parallelism.default' = '1';
Defina o intervalo do controlo de segurança:
SET 'execution.checkpointing.interval' = '10second';
Defina o controlo de segurança:
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
Inicie a tarefa de streaming em tempo real:
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
O resultado é semelhante ao seguinte:
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
Para verificar o estado da tarefa de streaming, faça o seguinte:
Na Google Cloud consola, aceda à página Clusters.
Selecione o cluster.
Clique no separador Interfaces Web.
Clique no link YARN ResourceManager.
Na interface do YARN ResourceManager, encontre a sua sessão do Flink e clique no link ApplicationMaster em Tracking UI.
Na coluna Estado, confirme que o estado da tarefa é Em execução.
Consultar dados de streaming no cliente SQL do Flink:
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
Consultar dados de streaming no BigQuery:
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
Termine a tarefa de streaming no cliente SQL do Flink:
STOP JOB 'JOB_ID';
Substitua
JOB_ID
pelo ID da tarefa apresentado no resultado quando criou a tarefa de streaming.
O que se segue?
- Configure funcionalidades opcionais da metastore do BigLake.
- Ver e consultar tabelas do Spark no BigQuery.