Esta página descreve como criar um cluster do Dataproc que executa o Spark.
Visão geral
Você cria um cluster depois que a instância do serviço do metastore do Dataproc é associada ao lake do Dataplex para garantir que o cluster possa usar o endpoint do metastore do Hive para ter acesso aos metadados do Dataplex.
Os metadados gerenciados no Dataplex podem ser acessados usando interfaces padrão, como o metastore do Hive, para ativar consultas do Spark. As consultas são executadas no cluster do Dataproc.
Para dados Parquet, defina a propriedade spark.sql.hive.convertMetastoreParquet
do Spark como
false
para evitar erros de execução. Mais detalhes.
Criar um cluster do Dataproc
Execute os comandos abaixo para criar um cluster do Dataproc, especificando o serviço do Dataproc Metastore associado ao lago do Dataplex:
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
Analisar metadados
Execute consultas DQL para analisar os metadados e consultas do Spark para consultar dados.
Antes de começar
Abra uma sessão SSH no nó principal do cluster do Dataproc.
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
No prompt de comando do nó principal, abra um novo REPL do Python.
python3
Listar bancos de dados
Cada zona do Dataplex no lake é mapeada para um banco de dados de metastore.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
Listar tabelas
Listar tabelas em uma das zonas.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
Consultar dados
Consultar os dados em uma das tabelas.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
Criar tabelas e partições em metadados
Execute consultas DDL para criar tabelas e partições nos metadados do Dataplex usando o Apache Spark.
Para mais informações sobre os tipos de dados, formatos de arquivo e formatos de linha aceitos, consulte Valores aceitos.
Antes de começar
Antes de criar uma tabela, crie um recurso do Dataplex que seja mapeado para o bucket do Cloud Storage que contém os dados. Para mais informações, consulte Adicionar um recurso.
Criar uma tabela
As tabelas Parquet, ORC, AVRO, CSV e JSON são compatíveis.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
Alterar uma tabela
O Dataplex não permite alterar o local de uma tabela nem editar as colunas de partição dela. Alterar uma tabela não define automaticamente
userManaged
como true
.
No Spark SQL, é possível renomear uma tabela, adicionar colunas e definir o formato de arquivo de uma tabela.
Renomear uma tabela
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
Adicionar colunas
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
Definir o formato do arquivo
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
Remover uma tabela
A exclusão de uma tabela da API de metadados do Dataplex não exclui os dados no Cloud Storage.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
Adicionar uma partição
O Dataplex não permite alterar uma partição depois que ela é criada. No entanto, a partição pode ser descartada.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
df.show()
É possível adicionar várias partições com a mesma chave e valores diferentes, conforme mostrado no exemplo anterior.
Excluir uma partição
Para excluir uma partição, execute o seguinte comando:
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Consultar tabelas Iceberg
É possível consultar tabelas Iceberg usando o Apache Spark.
Antes de começar
Configure uma sessão do Spark SQL com o Iceberg.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Criar uma tabela Iceberg
Para criar uma tabela Iceberg, execute o seguinte comando:
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Conhecer o histórico e o snapshot do Iceberg
É possível acessar snapshots e o histórico das tabelas do Iceberg usando o Apache Spark.
Antes de começar
Configure uma sessão do PySpark com suporte ao Iceberg:
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Acessar o histórico das tabelas Iceberg
Para conferir o histórico de uma tabela Iceberg, execute o seguinte comando:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Receber snapshots de tabelas Iceberg
Para conferir um snapshot de uma tabela Iceberg, execute o seguinte comando:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
Tipos de dados e formatos de arquivo aceitos
Os tipos de dados compatíveis são definidos da seguinte maneira:
Tipo de dado | Valores |
---|---|
Primário |
|
Matriz | ARRAY < DATA_TYPE > |
Estrutura | STRUCT < COLUMN : DATA_TYPE > |
Estes são os formatos de arquivo compatíveis:
TEXTFILE
ORC
PARQUET
AVRO
JSONFILE
Para mais informações sobre os formatos de arquivo, consulte Formatos de armazenamento.
Estes são os formatos de linha compatíveis:
- DELIMITED [FIELDS TERMINATED BY CHAR]
- SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]
A seguir
- Saiba mais sobre como gerenciar metadados.