BigQuery-Metastore mit Spark in BigQuery Studio verwenden

In diesem Dokument wird erläutert, wie Sie den BigQuery-Metastore mit Spark in BigQuery Studio verwenden.

Sie können Spark in BigQuery Studio verwenden, um eine Iceberg-Tabelle mit Apache Spark in BigQuery Studio zu erstellen. Nachdem Sie die Tabelle erstellt haben, können Sie die Daten aus Spark abfragen. Sie können dieselben Daten auch über die BigQuery-Konsole mit SQL abfragen.

Hinweise

  1. Über das folgende Registrierungsformular können Sie Zugriff auf Spark in BigQuery Studio anfordern.
  2. Aktivieren Sie die Abrechnung für Ihr Google Cloud -Projekt. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist
  3. Aktivieren Sie die BigQuery- und Dataflow APIs.

    APIs aktivieren

  4. Optional: Informationen zur Funktionsweise des BigQuery-Metaspeichers und zu den Vorteilen seiner Verwendung

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Verwenden von Spark-Notebooks in BigQuery Studio benötigen:

  • BigQuery Studio-Metastore-Tabellen in Spark erstellen: BigQuery-Dateneditor (roles/bigquery.dataEditor) für das Projekt
  • Erstellen Sie eine Spark-Sitzung aus den Metastore-Tabellen des Notebooks in Spark: Dataproc-Worker (roles/dataproc.serverlessEditor) im Nutzerkonto

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Verbindung mit einem Notebook herstellen

Im folgenden Beispiel wird gezeigt, wie Sie ein Spark-Notebook für die Interaktion mit Iceberg-Tabellen konfigurieren, die im BigQuery-Metastore gespeichert sind.

In diesem Beispiel richten Sie eine Spark-Sitzung ein, erstellen einen Namensbereich und eine Tabelle, fügen der Tabelle einige Daten hinzu und fragen die Daten dann in BigQuery Studio ab.

  1. Erstellen Sie ein Spark-Notebook in BigQuery Studio.

  2. Fügen Sie dem Apache Spark-Notebook die erforderlichen Apache Spark-Importe hinzu:

    from dataproc_spark_session.session.spark.connect import DataprocSparkSession
    from google.cloud.dataproc_v1 import Session
    from pyspark.sql import SparkSession
  3. Definieren Sie einen Katalog, einen Namespace und ein Warehouse-Verzeichnis.

    catalog = "CATALOG_NAME"
    namespace = "NAMESPACE_NAME"
    warehouse_dir = "gs://WAREHOUSE_DIRECTORY"

    Ersetzen Sie Folgendes:

    • CATALOG_NAME: Katalogname, der auf Ihre Spark-Tabelle verweist.
    • NAMESPACE_NAME: ein Namespace-Label, um auf Ihre Spark-Tabelle zu verweisen.
    • WAREHOUSE_DIRECTORY: Der URI des Cloud Storage-Ordners, in dem Ihr Data Warehouse gespeichert ist.
  4. Initialisieren Sie eine Spark-Sitzung.

    session.environment_config.execution_config.network_uri = NETWORK_NAME
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME"] = "org.apache.iceberg.spark.SparkCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_project"] = "PROJECT_ID"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_location"] = "LOCATION"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.warehouse"] = warehouse_dir
    
    spark = (
     DataprocSparkSession.builder
     .appName("BigQuery metastore Iceberg table example")
     .dataprocConfig(session)
     .getOrCreate())

    Ersetzen Sie Folgendes:

    • NETWORK_NAME: der Name oder URI des Netzwerks, in dem der Spark-Code ausgeführt wird. Wenn nichts angegeben ist, wird das Netzwerk default verwendet.
    • PROJECT_ID: die ID des Google Cloud -Projekts, in dem der Spark-Code ausgeführt wird.
    • LOCATION: den Speicherort, an dem der Spark-Job ausgeführt werden soll.
  5. Erstellen Sie einen Katalog und einen Namespace.

    spark.sql(f"USE `CATALOG_NAME`;")
    spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME`;")
    spark.sql(f"USE `NAMESPACE_NAME`;")
  6. Eine Tabelle erstellen

    spark.sql("CREATE OR REPLACE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME ;")

    Ersetzen Sie Folgendes:

    • TABLE_NAME: Name der Iceberg-Tabelle.
  7. DML-Anweisungen (Data Manipulation Language) über Spark ausführen

    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");")
    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  8. Eine DDL-Anweisung (Data Definition Language, Datendefinitionssprache) aus Spark ausführen

    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);")
    spark.sql("DESCRIBE TABLE_NAME ;")
  9. Fügen Sie Daten in die Tabelle ein.

    spark.sql("INSERT INTO TABLE_NAME  VALUES (1, \"It's a sunny day!\", 83);")
  10. Tabelle in Spark abfragen

    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  11. Stellen Sie eine Abfrage für die Tabelle aus der Google Cloud Console in einem neuen Datensatz.

    SELECT * FROM `PROJECT_ID.NAMESPACE_NAME.TABLE_NAME` LIMIT 100

Nächste Schritte