Acessar metadados no Apache Spark

Nesta página, descrevemos como criar um bucket do Dataproc cluster que executa o Spark.

Visão geral

Você cria um cluster depois que a instância do serviço do metastore do Dataproc é associada ao lake do Dataplex para garantir que o cluster possa usar o endpoint do metastore do Hive para ter acesso aos metadados do Dataplex.

Os metadados gerenciados no Dataplex podem ser acessados usando interfaces de rede, como Hive Metastore, para realizar consultas Spark. As consultas são executadas no cluster do Dataproc.

Para dados Parquet, defina a propriedade spark.sql.hive.convertMetastoreParquet do Spark como false para evitar erros de execução. Mais detalhes.

Criar um cluster do Dataproc

Execute os comandos a seguir para criar um cluster do Dataproc: especificando o serviço do Dataproc Metastore associado com o lake do Dataplex:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

Explorar metadados

Execute consultas DQL para analisar os metadados e consultas do Spark para consultar dados.

Antes de começar

  1. Abra uma sessão SSH no nó principal do cluster do Dataproc.

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. No prompt de comando do nó principal, abra um novo REPL do Python.

    python3
    

Listar bancos de dados

Cada zona do Dataplex no lake é mapeada para um banco de dados de metastore.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

Listar tabelas

Listar tabelas em uma das zonas.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

Consultar dados

Consultar os dados em uma das tabelas.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

Criar tabelas e partições em metadados

Execute consultas DDL para criar tabelas e partições nos metadados do Dataplex usando o Apache Spark.

Para mais informações sobre os tipos de dados, formatos de arquivo e formatos de linha compatíveis, consulte Valores compatíveis.

Antes de começar

Antes de criar uma tabela, crie um recurso do Dataplex que seja mapeado para o bucket do Cloud Storage que contém os dados. Para mais informações, consulte Adicionar um recurso.

Criar uma tabela

As tabelas Parquet, ORC, AVRO, CSV e JSON são compatíveis.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

Alterar uma tabela

O Dataplex não permite alterar o local de uma tabela nem editar as colunas de partição dela. Alterar uma tabela não define automaticamente userManaged para true.

No Spark SQL, é possível renomear uma tabela, adicionar colunas, e defina o formato do arquivo de uma tabela.

Renomear uma tabela

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

Adicionar colunas

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

Definir o formato do arquivo

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

Remover uma tabela

A exclusão de uma tabela da API de metadados do Dataplex não exclui os dados no Cloud Storage.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

Adicionar uma partição

O Dataplex não permite alterar uma partição depois de criada. No entanto, a partição pode ser descartada.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

É possível adicionar várias partições da mesma chave de partição e valores de partição diferentes, como mostrado no exemplo anterior.

Remover uma partição

Para descartar uma partição, execute o seguinte comando:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

Consultar tabelas Iceberg

É possível consultar tabelas Iceberg usando o Apache Spark.

Antes de começar

Configurar uma sessão do Spark SQL com o Iceberg.

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Criar uma tabela Iceberg

Para criar uma tabela do Iceberg, execute o seguinte comando:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

Explore a história e o instantâneo do Iceberg

É possível conseguir snapshots e histórico de tabelas do Iceberg usando o Apache Spark.

Antes de começar

Configurar uma sessão do PySpark com suporte do Iceberg.

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Conferir o histórico das tabelas Iceberg

Para conferir o histórico de uma tabela Iceberg, execute o seguinte comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

Acessar snapshots de tabelas do Iceberg

Para conseguir um snapshot de uma tabela do Iceberg, execute o seguinte comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

Tipos de dados e formatos de arquivo compatíveis

Os tipos de dados compatíveis são definidos da seguinte maneira:

Tipo de dado Valores
Primário
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
Matriz ARRAY < DATA_TYPE >
Estrutura STRUCT < COLUMN : DATA_TYPE >

Os formatos de arquivo compatíveis são definidos da seguinte maneira:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

Para mais informações sobre os formatos do arquivo, consulte Formatos de armazenamento.

Os formatos de linha compatíveis são definidos da seguinte maneira:

  • DELIMITED [FIELDS TERMINATED BY CHAR]
  • SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

A seguir