Nesta página, descrevemos como criar um cluster do Dataproc executando o Spark.
Visão geral
Crie um cluster depois que a instância de serviço do Metastore do Dataproc estiver associada ao lake do Dataplex para garantir que o cluster possa confiar no endpoint do Metastore do Hive para ter acesso aos metadados do Dataplex.
Os metadados gerenciados no Dataplex podem ser acessados usando interfaces padrão, como o Metastore do Hive, para potencializar as consultas do Spark. As consultas são executadas no cluster do Dataproc.
Para dados Parquet, defina a propriedade Spark spark.sql.hive.convertMetastoreParquet
como false
para evitar erros de execução. Mais detalhes.
Criar um cluster do Dataproc
Execute os seguintes comandos para criar um cluster do Dataproc, especificando o serviço Metastore do Dataproc associado ao lake do Dataplex:
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
Explorar metadados
Executar consultas DQL para explorar os metadados e executar consultas Spark para consultar dados.
Antes de começar
Abra uma sessão SSH no nó principal do cluster do Dataproc.
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
No prompt de comando do nó principal, abra um novo REPL do Python.
python3
Listar bancos de dados
Cada zona do Dataplex no lake é mapeada para um banco de dados de metastore.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
Listar tabelas
Listar tabelas em uma das zonas.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
Consultar dados
Consulte os dados em uma das tabelas.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
Criar tabelas e partições em metadados
Execute consultas DDL para criar tabelas e partições em metadados do Dataplex usando o Apache Spark.
Para mais informações sobre os tipos de dados, formatos de arquivo e formatos de linha compatíveis, consulte Valores compatíveis.
Antes de começar
Antes de criar uma tabela, crie um recurso do Dataplex que seja mapeado para o bucket do Cloud Storage que contém os dados. Para mais informações, consulte Adicionar um recurso.
Criar uma tabela
Tabelas Parquet, ORC, AVRO, CSV e JSON são compatíveis.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
Alterar uma tabela
O Dataplex não permite alterar o local de uma tabela ou editar as
colunas de partição de uma tabela. Alterar uma tabela não define automaticamente userManaged como true
.
No Spark SQL, é possível renomear uma tabela, adicionar colunas e definir o formato do arquivo.
Renomear uma tabela
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
Adicionar colunas
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
Definir o formato do arquivo
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
Remover uma tabela
Descartar uma tabela da API metadata do Dataplex não exclui os dados subjacentes no Cloud Storage.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
Adicionar uma partição
O Dataplex não permite alterar uma partição depois de criada. No entanto, a partição pode ser descartada.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
df.show()
É possível adicionar várias partições da mesma chave e diferentes valores de partição, como mostrado no exemplo anterior.
Remover uma partição
Para descartar uma partição, execute o seguinte comando:
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Consultar tabelas do Iceberg
É possível consultar tabelas do Iceberg usando o Apache Spark.
Antes de começar
Configurar uma sessão do Spark SQL com o Iceberg.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Criar uma tabela Iceberg
Para criar uma tabela do Iceberg, execute o seguinte comando:
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Conheça a história e o resumo do Iceberg
É possível acessar snapshots e o histórico das tabelas do Iceberg usando o Apache Spark.
Antes de começar
Configurar uma sessão do PySpark com o suporte do Iceberg.
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Acessar histórico das tabelas do Iceberg
Para ver o histórico de uma tabela Iceberg, execute o seguinte comando:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Gerar snapshots de tabelas do Iceberg
Para gerar um snapshot de uma tabela do Iceberg, execute o seguinte comando:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
Tipos de dados e formatos de arquivo aceitos
Os tipos de dados compatíveis são definidos da seguinte maneira:
Tipo de dados | Valores |
---|---|
Primário |
|
Matriz | ARRAY < DATA_TYPE > |
Estrutura | STRUCT < COLUMN : DATA_TYPE > |
Os formatos de arquivo compatíveis são definidos da seguinte maneira:
TEXTFILE
ORC
PARQUET
AVRO
JSONFILE
Para mais informações sobre os formatos de arquivo, consulte Formatos de armazenamento.
Os formatos de linha compatíveis são definidos da seguinte maneira:
- DELIMITADO [CAMPOS TERMINADOS POR CHAR]
- SERDE SERDE_NAME [COM SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]
A seguir
- Saiba mais sobre como gerenciar metadados.