Acessar metadados no Apache Spark

Nesta página, descrevemos como criar um cluster do Dataproc executando o Spark.

Visão geral

Crie um cluster depois que a instância de serviço do Metastore do Dataproc estiver associada ao lake do Dataplex para garantir que o cluster possa confiar no endpoint do Metastore do Hive para ter acesso aos metadados do Dataplex.

Os metadados gerenciados no Dataplex podem ser acessados usando interfaces padrão, como o Metastore do Hive, para potencializar as consultas do Spark. As consultas são executadas no cluster do Dataproc.

Para dados Parquet, defina a propriedade Spark spark.sql.hive.convertMetastoreParquet como false para evitar erros de execução. Mais detalhes.

Criar um cluster do Dataproc

Execute os seguintes comandos para criar um cluster do Dataproc, especificando o serviço Metastore do Dataproc associado ao lake do Dataplex:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

Explorar metadados

Executar consultas DQL para explorar os metadados e executar consultas Spark para consultar dados.

Antes de começar

  1. Abra uma sessão SSH no nó principal do cluster do Dataproc.

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. No prompt de comando do nó principal, abra um novo REPL do Python.

    python3
    

Listar bancos de dados

Cada zona do Dataplex no lake é mapeada para um banco de dados de metastore.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

Listar tabelas

Listar tabelas em uma das zonas.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

Consultar dados

Consulte os dados em uma das tabelas.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

Criar tabelas e partições em metadados

Execute consultas DDL para criar tabelas e partições em metadados do Dataplex usando o Apache Spark.

Para mais informações sobre os tipos de dados, formatos de arquivo e formatos de linha compatíveis, consulte Valores compatíveis.

Antes de começar

Antes de criar uma tabela, crie um recurso do Dataplex que seja mapeado para o bucket do Cloud Storage que contém os dados. Para mais informações, consulte Adicionar um recurso.

Criar uma tabela

Tabelas Parquet, ORC, AVRO, CSV e JSON são compatíveis.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

Alterar uma tabela

O Dataplex não permite alterar o local de uma tabela ou editar as colunas de partição de uma tabela. Alterar uma tabela não define automaticamente userManaged como true.

No Spark SQL, é possível renomear uma tabela, adicionar colunas e definir o formato do arquivo.

Renomear uma tabela

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

Adicionar colunas

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

Definir o formato do arquivo

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

Remover uma tabela

Descartar uma tabela da API metadata do Dataplex não exclui os dados subjacentes no Cloud Storage.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

Adicionar uma partição

O Dataplex não permite alterar uma partição depois de criada. No entanto, a partição pode ser descartada.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

É possível adicionar várias partições da mesma chave e diferentes valores de partição, como mostrado no exemplo anterior.

Remover uma partição

Para descartar uma partição, execute o seguinte comando:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

Consultar tabelas do Iceberg

É possível consultar tabelas do Iceberg usando o Apache Spark.

Antes de começar

Configurar uma sessão do Spark SQL com o Iceberg.

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Criar uma tabela Iceberg

Para criar uma tabela do Iceberg, execute o seguinte comando:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

Conheça a história e o resumo do Iceberg

É possível acessar snapshots e o histórico das tabelas do Iceberg usando o Apache Spark.

Antes de começar

Configurar uma sessão do PySpark com o suporte do Iceberg.

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Acessar histórico das tabelas do Iceberg

Para ver o histórico de uma tabela Iceberg, execute o seguinte comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

Gerar snapshots de tabelas do Iceberg

Para gerar um snapshot de uma tabela do Iceberg, execute o seguinte comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

Tipos de dados e formatos de arquivo aceitos

Os tipos de dados compatíveis são definidos da seguinte maneira:

Tipo de dados Valores
Primário
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
Matriz ARRAY < DATA_TYPE >
Estrutura STRUCT < COLUMN : DATA_TYPE >

Os formatos de arquivo compatíveis são definidos da seguinte maneira:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

Para mais informações sobre os formatos de arquivo, consulte Formatos de armazenamento.

Os formatos de linha compatíveis são definidos da seguinte maneira:

  • DELIMITADO [CAMPOS TERMINADOS POR CHAR]
  • SERDE SERDE_NAME [COM SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

A seguir