Acessar metadados no Apache Spark

Esta página descreve como criar um cluster do Dataproc que executa o Spark.

Visão geral

Você cria um cluster depois que a instância do serviço do metastore do Dataproc é associada ao lake do Dataplex para garantir que o cluster possa usar o endpoint do metastore do Hive para ter acesso aos metadados do Dataplex.

Os metadados gerenciados no Dataplex podem ser acessados usando interfaces padrão, como o metastore do Hive, para ativar consultas do Spark. As consultas são executadas no cluster do Dataproc.

Para dados Parquet, defina a propriedade spark.sql.hive.convertMetastoreParquet do Spark como false para evitar erros de execução. Mais detalhes.

Criar um cluster do Dataproc

Execute os comandos abaixo para criar um cluster do Dataproc, especificando o serviço do Dataproc Metastore associado ao lago do Dataplex:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

Analisar metadados

Execute consultas DQL para analisar os metadados e consultas do Spark para consultar dados.

Antes de começar

  1. Abra uma sessão SSH no nó principal do cluster do Dataproc.

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. No prompt de comando do nó principal, abra um novo REPL do Python.

    python3
    

Listar bancos de dados

Cada zona do Dataplex no lake é mapeada para um banco de dados de metastore.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

Listar tabelas

Listar tabelas em uma das zonas.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

Consultar dados

Consultar os dados em uma das tabelas.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

Criar tabelas e partições em metadados

Execute consultas DDL para criar tabelas e partições nos metadados do Dataplex usando o Apache Spark.

Para mais informações sobre os tipos de dados, formatos de arquivo e formatos de linha aceitos, consulte Valores aceitos.

Antes de começar

Antes de criar uma tabela, crie um recurso do Dataplex que seja mapeado para o bucket do Cloud Storage que contém os dados. Para mais informações, consulte Adicionar um recurso.

Criar uma tabela

As tabelas Parquet, ORC, AVRO, CSV e JSON são compatíveis.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

Alterar uma tabela

O Dataplex não permite alterar o local de uma tabela nem editar as colunas de partição dela. Alterar uma tabela não define automaticamente userManaged como true.

No Spark SQL, é possível renomear uma tabela, adicionar colunas e definir o formato de arquivo de uma tabela.

Renomear uma tabela

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

Adicionar colunas

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

Definir o formato do arquivo

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

Remover uma tabela

A exclusão de uma tabela da API de metadados do Dataplex não exclui os dados no Cloud Storage.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

Adicionar uma partição

O Dataplex não permite alterar uma partição depois que ela é criada. No entanto, a partição pode ser descartada.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

É possível adicionar várias partições com a mesma chave e valores diferentes, conforme mostrado no exemplo anterior.

Excluir uma partição

Para excluir uma partição, execute o seguinte comando:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

Consultar tabelas Iceberg

É possível consultar tabelas Iceberg usando o Apache Spark.

Antes de começar

Configure uma sessão do Spark SQL com o Iceberg.

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Criar uma tabela Iceberg

Para criar uma tabela Iceberg, execute o seguinte comando:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

Conhecer o histórico e o snapshot do Iceberg

É possível acessar snapshots e o histórico das tabelas do Iceberg usando o Apache Spark.

Antes de começar

Configure uma sessão do PySpark com suporte ao Iceberg:

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Acessar o histórico das tabelas Iceberg

Para conferir o histórico de uma tabela Iceberg, execute o seguinte comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

Receber snapshots de tabelas Iceberg

Para conferir um snapshot de uma tabela Iceberg, execute o seguinte comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

Tipos de dados e formatos de arquivo aceitos

Os tipos de dados compatíveis são definidos da seguinte maneira:

Tipo de dado Valores
Primário
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
Matriz ARRAY < DATA_TYPE >
Estrutura STRUCT < COLUMN : DATA_TYPE >

Estes são os formatos de arquivo compatíveis:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

Para mais informações sobre os formatos de arquivo, consulte Formatos de armazenamento.

Estes são os formatos de linha compatíveis:

  • DELIMITED [FIELDS TERMINATED BY CHAR]
  • SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

A seguir