Esta página descreve como criar um cluster do Dataproc que execute o Spark. Pode usar este cluster para trabalhar com metadados do Dataplex Universal Catalog para lagos, zonas e recursos.
Vista geral
Cria um cluster depois de a instância do serviço Dataproc Metastore estar associada ao lake do Dataplex Universal Catalog para garantir que o cluster pode confiar no ponto final do Hive Metastore para aceder aos metadados do Dataplex Universal Catalog.
Os metadados geridos no Dataplex Universal Catalog podem ser acedidos através de interfaces padrão, como o Hive Metastore, para ativar as consultas do Spark. As consultas são executadas no cluster do Dataproc.
Para dados Parquet, defina a propriedade do Spark spark.sql.hive.convertMetastoreParquet
como
false
para evitar erros de execução. Mais detalhes.
Crie um cluster do Dataproc
Execute os seguintes comandos para criar um cluster do Dataproc, especificando o serviço Dataproc Metastore associado ao lake do catálogo universal do Dataplex:
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
Explore os metadados
Execute consultas DQL para explorar os metadados e consultas Spark para consultar dados.
Antes de começar
Abra uma sessão SSH no nó principal do cluster do Dataproc.
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
Na linha de comandos do nó principal, abra um novo REPL do Python.
python3
Apresentar bases de dados
Cada zona do Dataplex Universal Catalog no lake é mapeada para uma base de dados de metastore.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
Listar tabelas
Listar tabelas numa das zonas.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
Consultar dados
Consultar os dados numa das tabelas.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
Crie tabelas e partições nos metadados
Execute consultas DDL para criar tabelas e partições nos metadados do catálogo universal do Dataplex com o Apache Spark.
Para mais informações sobre os tipos de dados, os formatos de ficheiros e os formatos de linhas suportados, consulte o artigo Valores suportados.
Antes de começar
Antes de criar uma tabela, crie um recurso do catálogo universal do Dataplex que seja mapeado para o contentor do Cloud Storage que contém os dados subjacentes. Para mais informações, consulte o artigo Adicione um recurso.
Criar uma tabela
As tabelas Parquet, ORC, AVRO, CSV e JSON são suportadas.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
Altere uma tabela
O catálogo universal do Dataplex não lhe permite alterar a localização de uma tabela nem editar as colunas de partição de uma tabela. A alteração de uma tabela não define automaticamente
userManaged
como true
.
No Spark SQL, pode mudar o nome de uma tabela, adicionar colunas e definir o formato de ficheiro de uma tabela.
Mude o nome de uma tabela
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
Adicione colunas
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
Defina o formato de ficheiro
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
Elimine uma tabela
A eliminação de uma tabela da API de metadados do catálogo universal do Dataplex não elimina os dados subjacentes no Cloud Storage.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
Adicione uma partição
O catálogo universal do Dataplex não permite alterar uma partição depois de criada. No entanto, a partição pode ser eliminada.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
df.show()
Pode adicionar várias partições da mesma chave de partição e valores de partição diferentes, conforme mostrado no exemplo anterior.
Elimine uma partição
Para eliminar uma partição, execute o seguinte comando:
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Consulte tabelas Iceberg
Pode consultar tabelas Iceberg através do Apache Spark.
Antes de começar
Configure uma sessão do Spark SQL com o Iceberg.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Crie uma tabela Iceberg
Para criar uma tabela Iceberg, execute o seguinte comando:
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Explore o resumo e o histórico do Iceberg
Pode obter capturas de ecrã e o histórico de tabelas Iceberg através do Apache Spark.
Antes de começar
Configure uma sessão do PySpark com suporte do Iceberg:
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Obtenha o histórico de tabelas Iceberg
Para obter o histórico de uma tabela Iceberg, execute o seguinte comando:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Obtenha instantâneos de tabelas Iceberg
Para obter uma captura instantânea de uma tabela Iceberg, execute o seguinte comando:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
Tipos de dados e formatos de ficheiros suportados
Os tipos de dados suportados são definidos da seguinte forma:
Tipo de dados | Valores |
---|---|
Primitivo |
|
Matriz | ARRAY < DATA_TYPE > |
Estrutura | STRUCT < COLUMN : DATA_TYPE > |
Seguem-se os formatos de ficheiros suportados:
TEXTFILE
ORC
PARQUET
AVRO
JSONFILE
Para mais informações sobre os formatos de ficheiros, consulte o artigo Formatos de armazenamento.
Seguem-se os formatos de linhas suportados:
- DELIMITED [FIELDS TERMINATED BY CHAR]
- SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]
O que se segue?
- Saiba mais sobre a gestão de metadados para lagos, zonas e recursos.