Accéder aux métadonnées dans Apache Spark

Cette page explique comment créer un Dataproc un cluster exécutant Spark.

Présentation

Vous créez un cluster après le service Dataproc Metastore est associée au lac Dataplex pour garantir que cluster peut s'appuyer sur le point de terminaison Hive Metastore pour accéder Métadonnées Dataplex.

Les métadonnées gérées dans Dataplex sont accessibles via des (telles que Hive Metastore) qui permettent d'alimenter les requêtes Spark. Les requêtes s'exécutent sur la cluster Dataproc.

Pour les données Parquet, définissez la propriété Spark spark.sql.hive.convertMetastoreParquet sur false pour éviter les erreurs d'exécution. En savoir plus

Créer un cluster Dataproc

Exécutez les commandes suivantes pour créer un cluster Dataproc, en spécifiant le service Dataproc Metastore associé avec le lac Dataplex:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

Explorer les métadonnées

Exécuter des requêtes DQL pour explorer les métadonnées et des requêtes Spark pour interroger des données

Avant de commencer

  1. Ouvrez une session SSH sur le nœud principal du cluster Dataproc.

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. Lorsque l'invite de commande du nœud principal s'affiche, ouvrez un nouveau REPL Python.

    python3
    

Répertorier des bases de données

Chaque zone Dataplex dans le lac est mappée à une base de données de métastore.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

Répertorier des tables

Répertorier les tables de l'une des zones.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

Interroger les données

Interrogez les données de l'une des tables.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

Créer des tables et des partitions dans les métadonnées

Exécuter des requêtes LDD pour créer des tables et des partitions dans les métadonnées Dataplex à l'aide d'Apache Spark

Pour en savoir plus sur les types de données, les formats de fichiers et les formats de lignes compatibles, consultez Valeurs acceptées.

Avant de commencer

Avant de créer une table, créez un élément Dataplex correspondant au bucket Cloud Storage. contenant les données sous-jacentes. Pour en savoir plus, consultez Ajouter un asset.

Créer une table

Les tables Parquet, ORC, AVRO, CSV et JSON sont acceptées.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

Modifier un tableau

Dataplex ne vous permet pas de modifier l'emplacement d'une table ni la colonnes de partition d'une table. La modification d'un tableau n'est pas automatiquement définie userManaged à true.

Dans Spark SQL, vous pouvez renommer une table, ajouter des colonnes, et définir le format de fichier d'une table.

Renommer une table

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

Ajouter des colonnes

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

Définir le format de fichier

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

Déposer une table

Supprimer une table de l'API de métadonnées de Dataplex ne supprime pas les données sous-jacentes dans Cloud Storage.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

Ajouter une partition

Dataplex ne permet pas de modifier une partition après sa création. Toutefois, la partition peut être supprimée.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

Vous pouvez ajouter plusieurs partitions de la même clé de partition et de valeurs de partition différentes, comme indiqué dans l'exemple précédent.

Supprimer une partition

Pour supprimer une partition, exécutez la commande suivante:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

Interroger les tables Iceberg

Vous pouvez interroger les tables Iceberg à l'aide d'Apache Spark.

Avant de commencer

Configurez une session Spark SQL avec Iceberg.

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Créer une table Iceberg

Pour créer une table Iceberg, exécutez la commande suivante:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

Découvrez l'histoire et l'instantané d'Iceberg

Vous pouvez obtenir des instantanés et l'historique des tables Iceberg à l'aide d'Apache Spark.

Avant de commencer

Configurez une session PySpark avec l'assistance d'Iceberg.

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Obtenir l'historique des tables d'Iceberg

Pour obtenir l'historique d'une table Iceberg, exécutez la commande suivante:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

Obtenir des instantanés des tables Iceberg

Pour obtenir un instantané d'une table Iceberg, exécutez la commande suivante:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

Types de données et formats de fichiers compatibles

Les types de données acceptés sont définis comme suit:

Type de données Valeurs
Primitif
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
Tableau ARRAY < DATA_TYPE >
Structure STRUCT < COLUMN : DATA_TYPE >

Les formats de fichiers compatibles sont définis comme suit:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

Pour en savoir plus sur les formats de fichiers, consultez Formats de stockage.

Les formats de lignes acceptés sont définis comme suit:

  • LIMITÉE [CHAMPS TERMINÉS PAR CHAR]
  • SERDE SERDE_NAME [AVEC SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

Étape suivante