Aceda a metadados no Apache Spark

Esta página descreve como criar um cluster do Dataproc que execute o Spark. Pode usar este cluster para trabalhar com metadados do Dataplex Universal Catalog para lagos, zonas e recursos.

Vista geral

Cria um cluster depois de a instância do serviço Dataproc Metastore estar associada ao lake do Dataplex Universal Catalog para garantir que o cluster pode confiar no ponto final do Hive Metastore para aceder aos metadados do Dataplex Universal Catalog.

Os metadados geridos no Dataplex Universal Catalog podem ser acedidos através de interfaces padrão, como o Hive Metastore, para ativar as consultas do Spark. As consultas são executadas no cluster do Dataproc.

Para dados Parquet, defina a propriedade do Spark spark.sql.hive.convertMetastoreParquet como false para evitar erros de execução. Mais detalhes.

Crie um cluster do Dataproc

Execute os seguintes comandos para criar um cluster do Dataproc, especificando o serviço Dataproc Metastore associado ao lake do catálogo universal do Dataplex:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

Explore os metadados

Execute consultas DQL para explorar os metadados e consultas Spark para consultar dados.

Antes de começar

  1. Abra uma sessão SSH no nó principal do cluster do Dataproc.

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. Na linha de comandos do nó principal, abra um novo REPL do Python.

    python3
    

Apresentar bases de dados

Cada zona do Dataplex Universal Catalog no lake é mapeada para uma base de dados de metastore.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

Listar tabelas

Listar tabelas numa das zonas.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

Consultar dados

Consultar os dados numa das tabelas.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

Crie tabelas e partições nos metadados

Execute consultas DDL para criar tabelas e partições nos metadados do catálogo universal do Dataplex com o Apache Spark.

Para mais informações sobre os tipos de dados, os formatos de ficheiros e os formatos de linhas suportados, consulte o artigo Valores suportados.

Antes de começar

Antes de criar uma tabela, crie um recurso do catálogo universal do Dataplex que seja mapeado para o contentor do Cloud Storage que contém os dados subjacentes. Para mais informações, consulte o artigo Adicione um recurso.

Criar uma tabela

As tabelas Parquet, ORC, AVRO, CSV e JSON são suportadas.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

Altere uma tabela

O catálogo universal do Dataplex não lhe permite alterar a localização de uma tabela nem editar as colunas de partição de uma tabela. A alteração de uma tabela não define automaticamente userManaged como true.

No Spark SQL, pode mudar o nome de uma tabela, adicionar colunas e definir o formato de ficheiro de uma tabela.

Mude o nome de uma tabela

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

Adicione colunas

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

Defina o formato de ficheiro

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

Elimine uma tabela

A eliminação de uma tabela da API de metadados do catálogo universal do Dataplex não elimina os dados subjacentes no Cloud Storage.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

Adicione uma partição

O catálogo universal do Dataplex não permite alterar uma partição depois de criada. No entanto, a partição pode ser eliminada.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

Pode adicionar várias partições da mesma chave de partição e valores de partição diferentes, conforme mostrado no exemplo anterior.

Elimine uma partição

Para eliminar uma partição, execute o seguinte comando:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

Consulte tabelas Iceberg

Pode consultar tabelas Iceberg através do Apache Spark.

Antes de começar

Configure uma sessão do Spark SQL com o Iceberg.

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Crie uma tabela Iceberg

Para criar uma tabela Iceberg, execute o seguinte comando:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

Explore o resumo e o histórico do Iceberg

Pode obter capturas de ecrã e o histórico de tabelas Iceberg através do Apache Spark.

Antes de começar

Configure uma sessão do PySpark com suporte do Iceberg:

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Obtenha o histórico de tabelas Iceberg

Para obter o histórico de uma tabela Iceberg, execute o seguinte comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

Obtenha instantâneos de tabelas Iceberg

Para obter uma captura instantânea de uma tabela Iceberg, execute o seguinte comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

Tipos de dados e formatos de ficheiros suportados

Os tipos de dados suportados são definidos da seguinte forma:

Tipo de dados Valores
Primitivo
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
Matriz ARRAY < DATA_TYPE >
Estrutura STRUCT < COLUMN : DATA_TYPE >

Seguem-se os formatos de ficheiros suportados:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

Para mais informações sobre os formatos de ficheiros, consulte o artigo Formatos de armazenamento.

Seguem-se os formatos de linhas suportados:

  • DELIMITED [FIELDS TERMINATED BY CHAR]
  • SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

O que se segue?