BigQuery-Metastore mit Dataproc Serverless verwenden

In diesem Dokument wird erläutert, wie Sie den BigQuery-Metastore mit Dataproc Serverless verwenden.

Hinweis

  1. Aktivieren Sie die Abrechnung für Ihr Google Cloud-Projekt. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist.
  2. Aktivieren Sie die BigQuery- und Dataproc-APIs.

    APIs aktivieren

  3. Optional: Informationen zur Funktionsweise des BigQuery-Metaspeichers und zu den Vorteilen seiner Verwendung

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um Spark und Dataproc Serverless mit dem BigQuery-Metastore als Metadatenspeicher verwenden zu können:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Allgemeiner Workflow

So verwenden Sie BigQuery mit Dataproc Serverless:

  1. Erstellen Sie eine Datei mit den Befehlen, die Sie im BigQuery-Metastore ausführen möchten.
  2. Stellen Sie eine Verbindung zu Ihrer bevorzugten Open-Source-Software-Engine her.
  3. Reichen Sie einen Batchjob mit der gewünschten Methode ein, z. B. Spark SQL oder PySpark.

BigQuery-Metastore mit Spark verbinden

In der folgenden Anleitung erfahren Sie, wie Sie Dataproc Serverless mit dem BigQuery-Metastore verbinden:

SparkSQL

Führen Sie die folgenden Schritte aus, um einen Spark SQL-Batchjob einzureichen.

  1. Erstellen Sie eine SQL-Datei mit den Spark-SQL-Befehlen, die Sie im BigQuery-Metastore ausführen möchten. Mit diesem Befehl werden beispielsweise ein Namespace und eine Tabelle erstellt.

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    Ersetzen Sie Folgendes:

    • CATALOG_NAME: Der Katalogname, der auf Ihre Spark-Tabelle verweist.
    • NAMESPACE_NAME: Der Name des Namespace, der auf Ihre Spark-Tabelle verweist.
    • TABLE_NAME: ein Tabellenname für Ihre Spark-Tabelle.
    • WAREHOUSE_DIRECTORY: Der URI des Cloud Storage-Ordners, in dem Ihr Data Warehouse gespeichert ist.
  2. Reichen Sie einen Spark SQL-Batchjob ein, indem Sie den folgenden gcloud dataproc batches submit spark-sql-Befehl in der gcloud CLI ausführen:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
    --project=PROJECT_ID \
    --region=REGION \
    --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
    --deps-bucket=BUCKET_PATH \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Ersetzen Sie Folgendes:

    • SQL_SCRIPT_PATH: der Pfad zur SQL-Datei, die vom Batchjob verwendet wird.
    • PROJECT_ID: die ID des Google Cloud-Projekts, in dem der Batchjob ausgeführt werden soll.
    • REGION: die Region, in der Ihre Arbeitslast ausgeführt wird.
    • SUBNET_NAME: Optional. Der Name eines VPC-Subnetzes in REGION, für das der private Google-Zugriff aktiviert ist und das die anderen Anforderungen an Sitzungssubnetze erfüllt.
    • LOCATION: Der Speicherort, an dem der Batchjob ausgeführt werden soll.
    • BUCKET_PATH: den Speicherort des Cloud Storage-Buckets, in den Arbeitslastabhängigkeiten hochgeladen werden sollen. Die WAREHOUSE_FOLDER befindet sich in diesem Bucket. Das gs://-URI-Präfix des Buckets ist nicht erforderlich. Sie können den Bucket-Pfad oder den Bucket-Namen angeben, z. B. mybucketname1.

    Weitere Informationen zum Einreichen von Spark-Batchjobs finden Sie unter Spark-Batcharbeitslast ausführen.

PySpark

Führen Sie die folgenden Schritte aus, um einen PySpark-Batchjob einzureichen.

  1. Erstellen Sie eine Python-Datei mit den PySpark-Befehlen, die Sie im BigQuery-Metastore ausführen möchten.

    Mit dem folgenden Befehl wird beispielsweise eine Spark-Umgebung eingerichtet, um mit Iceberg-Tabellen zu interagieren, die im BigQuery-Metastore gespeichert sind. Der Befehl erstellt dann einen neuen Namespace und eine Iceberg-Tabelle in diesem Namespace.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    Ersetzen Sie Folgendes:

    • PROJECT_ID: die ID des Google Cloud-Projekts, in dem der Batchjob ausgeführt werden soll.
    • LOCATION: den Speicherort der BigQuery-Ressourcen.
    • CATALOG_NAME: Der Katalogname, der auf Ihre Spark-Tabelle verweist.
    • TABLE_NAME: ein Tabellenname für Ihre Spark-Tabelle.
    • WAREHOUSE_DIRECTORY: Der URI des Cloud Storage-Ordners, in dem Ihr Data Warehouse gespeichert ist.
    • NAMESPACE_NAME: Der Name des Namespace, der auf Ihre Spark-Tabelle verweist.
  2. Senden Sie den Batchjob mit dem folgenden Befehl gcloud dataproc batches submit pyspark.

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
     --version=2.2 \
     --project=PROJECT_ID \
     --region=REGION \
     --deps-bucket=BUCKET_PATH \
     --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar
     --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Ersetzen Sie Folgendes:

    • PYTHON_SCRIPT_PATH: Der Pfad zum Python-Script, das vom Batchjob verwendet wird.
    • PROJECT_ID: die ID des Google Cloud-Projekts, in dem der Batchjob ausgeführt werden soll.
    • REGION: die Region, in der Ihre Arbeitslast ausgeführt wird.
    • BUCKET_PATH: den Speicherort des Cloud Storage-Buckets, in den Arbeitslastabhängigkeiten hochgeladen werden sollen. Das gs://-URI-Präfix des Buckets ist nicht erforderlich. Sie können den Bucket-Pfad oder den Bucket-Namen angeben, z. B. mybucketname1.

    Weitere Informationen zum Einreichen von PySpark-Batchjobs finden Sie in der PySpark-gcloud-Referenz.

Nächste Schritte