BigLake Metastore konfigurieren
In diesem Dokument wird beschrieben, wie Sie den BigLake-Metastore mit Dataproc oder Google Cloud Serverless for Apache Spark konfigurieren, um einen einzelnen, freigegebenen Metastore zu erstellen, der für Open-Source-Engines wie Apache Spark oder Apache Flink funktioniert.
Hinweise
- Aktivieren Sie die Abrechnung für Ihr Google Cloud -Projekt. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist.
Aktivieren Sie die BigQuery- und Dataproc-APIs.
Optional: Funktionsweise von BigLake Metastore und Gründe für die Verwendung.
Erforderliche Rollen
Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Konfigurieren des BigLake-Metastores benötigen:
-
Dataproc-Cluster erstellen:
Dataproc-Worker (
roles/dataproc.worker
) für das Compute Engine-Standarddienstkonto im Projekt -
BigLake Metastore-Tabellen erstellen:
-
Dataproc-Worker (
roles/dataproc.worker
) für das Dataproc-VM-Dienstkonto im Projekt -
BigQuery Data Editor (
roles/bigquery.dataEditor
) für das Dataproc-VM-Dienstkonto im Projekt -
Storage-Objekt-Administrator (
roles/storage.objectAdmin
) für das Dataproc-VM-Dienstkonto im Projekt
-
Dataproc-Worker (
-
BigLake Metastore-Tabellen abfragen:
-
BigQuery Data Viewer (
roles/bigquery.dataViewer
) für das Projekt -
BigQuery-Nutzer (
roles/bigquery.user
) für das Projekt -
Storage-Objekt-Betrachter (
roles/storage.objectViewer
) für das Projekt
-
BigQuery Data Viewer (
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.
Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.
Metastore mit Dataproc konfigurieren
Sie können BigLake Metastore mit Dataproc konfigurieren, indem Sie entweder Spark oder Flink verwenden:
Spark
Konfigurieren Sie einen neuen Cluster. Führen Sie zum Erstellen eines neuen Dataproc-Clusters den folgenden
gcloud dataproc clusters create
-Befehl aus, der die Einstellungen enthält, die Sie für die Verwendung von BigLake Metastore benötigen:gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --single-node
Ersetzen Sie Folgendes:
CLUSTER_NAME
: ein Name für Ihren Dataproc-Cluster.PROJECT_ID
: die ID des Google Cloud Projekts, in dem Sie den Cluster erstellen.LOCATION
: die Compute Engine-Region, in der Sie den Cluster erstellen.
Sie haben folgende Möglichkeiten, einen Spark-Job zu senden:
Google Cloud CLI
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
Ersetzen Sie Folgendes:
PROJECT_ID
: die ID des Google Cloud Projekts, das den Dataproc-Cluster enthält.CLUSTER_NAME
: der Name des Dataproc-Clusters, den Sie zum Ausführen des Spark SQL-Jobs verwenden.REGION
: die Compute Engine-Region, in der sich Ihr Cluster befindet.LOCATION
: der Standort der BigQuery-Ressourcen.CATALOG_NAME
: Der Name des Spark-Katalogs, der für Ihren SQL-Job verwendet werden soll.WAREHOUSE_DIRECTORY
: Der Cloud Storage-Ordner, der Ihr Data Warehouse enthält. Dieser Wert beginnt mitgs://
.SPARK_SQL_COMMAND
: Die Spark SQL-Abfrage, die Sie ausführen möchten. Diese Abfrage enthält die Befehle zum Erstellen Ihrer Ressourcen. So erstellen Sie beispielsweise einen Namespace und eine Tabelle.
spark-sql-Befehlszeile
Rufen Sie in der Google Cloud Console die Seite VM-Instanzen auf.
Wenn Sie eine Verbindung zu einer Dataproc-VM-Instanz herstellen möchten, klicken Sie in der Zeile mit dem Namen der primären VM-Instanz des Dataproc-Clusters auf SSH. Der Name der primären VM-Instanz des Clusters besteht aus dem Clusternamen, gefolgt vom Suffix
-m
. Die Ausgabe sieht etwa so aus:Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$
Führen Sie im Terminal den folgenden Befehl zur Initialisierung des BigLake-Metastores aus:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Ersetzen Sie Folgendes:
CATALOG_NAME
: der Name des Spark-Katalogs, den Sie mit Ihrem SQL-Job verwenden.PROJECT_ID
: Die Google Cloud Projekt-ID des BigLake Metastore-Katalogs, mit dem Ihr Spark-Katalog verknüpft ist.LOCATION
: Der Google Cloud Standort des BigLake-Metastores.WAREHOUSE_DIRECTORY
: Der Cloud Storage-Ordner, der Ihr Data Warehouse enthält. Dieser Wert beginnt mitgs://
.
Nachdem Sie erfolgreich eine Verbindung zum Cluster hergestellt haben, wird im Spark-Terminal der Prompt
spark-sql
angezeigt, über den Sie Spark-Jobs senden können.spark-sql (default)>
Flink
- Erstellen Sie einen Dataproc-Cluster mit aktivierter optionaler Flink-Komponente und achten Sie darauf, dass Sie Dataproc
2.2
oder höher verwenden. Rufen Sie in der Google Cloud Console die Seite VM-Instanzen auf:
Klicken Sie in der Liste der VM-Instanzen auf SSH, um eine Verbindung zur Haupt-VM-Instanz des Dataproc-Clusters herzustellen. Diese wird als Clustername gefolgt vom Suffix
-m
aufgeführt.Konfigurieren Sie das benutzerdefinierte Iceberg-Katalog-Plug-in für BigLake Metastore:
FLINK_VERSION=1.17 ICEBERG_VERSION=1.5.2 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
Starten Sie die Flink-Sitzung in YARN:
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
Katalog in Flink erstellen:
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp_project'='PROJECT_ID', 'gcp_location'='LOCATION' );
Ersetzen Sie Folgendes:
CATALOG_NAME
: Die Flink-Katalogkennung, die mit einem BigLake Metastore-Katalog verknüpft ist.WAREHOUSE_DIRECTORY
: Der Basispfad für das Warehouse-Verzeichnis (der Cloud Storage-Ordner, in dem Flink Dateien erstellt). Dieser Wert beginnt mitgs://
.PROJECT_ID
: Die Projekt-ID des BigLake Metastore-Katalogs, mit dem der Flink-Katalog verknüpft ist.LOCATION
: Der Standort der BigQuery-Ressourcen.
Ihre Flink-Sitzung ist jetzt mit BigLake Metastore verbunden und Sie können Flink SQL-Befehle ausführen.
BigLake-Metastore-Ressourcen verwalten
Nachdem Sie eine Verbindung zu BigLake Metastore hergestellt haben, können Sie Ressourcen basierend auf den in BigLake Metastore gespeicherten Metadaten erstellen und ansehen.
Führen Sie beispielsweise die folgenden Befehle in Ihrer interaktiven Flink SQL-Sitzung aus, um eine Iceberg-Datenbank und -Tabelle zu erstellen.
Benutzerdefinierten Iceberg-Katalog verwenden:
USE CATALOG CATALOG_NAME;
Ersetzen Sie
CATALOG_NAME
durch Ihre Flink-Katalog-ID.Erstellen Sie eine Datenbank. Dadurch wird ein Dataset in BigQuery erstellt:
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
Ersetzen Sie
DATABASE_NAME
durch den Namen Ihrer neuen Datenbank.Verwenden Sie die Datenbank, die Sie erstellt haben:
USE DATABASE_NAME;
Iceberg-Tabelle erstellen Im Folgenden wird eine Beispielverkaufstabelle erstellt:
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
Ersetzen Sie
ICEBERG_TABLE_NAME
durch einen Namen für die neue Tabelle.Tabellenmetadaten ansehen:
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
Tabellen in der Datenbank auflisten:
SHOW TABLES;
Daten in die Tabelle aufnehmen
Nachdem Sie im vorherigen Abschnitt eine Iceberg-Tabelle erstellt haben, können Sie Flink DataGen als Datenquelle verwenden, um Echtzeitdaten in Ihre Tabelle aufzunehmen. Die folgenden Schritte sind ein Beispiel für diesen Workflow:
So erstellen Sie eine temporäre Tabelle mit DataGen:
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
Ersetzen Sie Folgendes:
DATABASE_NAME
: der Name der Datenbank, in der die temporäre Tabelle gespeichert werden soll.TEMP_TABLE_NAME
: Ein Name für die temporäre Tabelle.ICEBERG_TABLE_NAME
: Der Name der Iceberg-Tabelle, die Sie im vorherigen Abschnitt erstellt haben.
Legen Sie die Parallelität auf 1 fest:
SET 'parallelism.default' = '1';
Legen Sie das Prüfpunktintervall fest:
SET 'execution.checkpointing.interval' = '10second';
Prüfpunkt festlegen:
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
Starten Sie den Echtzeit-Streamingjob:
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
Die Ausgabe sieht etwa so aus:
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
So prüfen Sie den Status des Streamingjobs:
Rufen Sie in der Google Cloud Console die Seite Cluster auf.
Wählen Sie Ihren Cluster aus.
Klicken Sie auf den Tab Weboberflächen.
Klicken Sie auf den Link YARN ResourceManager.
Suchen Sie in der YARN ResourceManager-Oberfläche nach Ihrer Flink-Sitzung und klicken Sie unter Tracking UI auf den Link ApplicationMaster.
Prüfen Sie in der Spalte Status, ob der Jobstatus Wird ausgeführt lautet.
Streamingdaten im Flink SQL-Client abfragen:
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
Streamingdaten in BigQuery abfragen:
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
Beenden Sie den Streamingjob im Flink SQL-Client:
STOP JOB 'JOB_ID';
Ersetzen Sie
JOB_ID
durch die Job-ID, die in der Ausgabe angezeigt wurde, als Sie den Streamingjob erstellt haben.
Metastore mit Serverless for Apache Spark konfigurieren
Sie können den BigLake-Metastore mit Serverless for Apache Spark entweder mit Spark SQL oder PySpark konfigurieren.
Spark SQL
Erstellen Sie eine SQL-Datei mit den Spark SQL-Befehlen, die Sie im BigLake Metastore ausführen möchten. Mit diesem Befehl werden beispielsweise ein Namespace und eine Tabelle erstellt:
CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME; CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
Ersetzen Sie Folgendes:
CATALOG_NAME
: Der Katalogname, der auf Ihre Spark-Tabelle verweist.NAMESPACE_NAME
: Der Namespace-Name, der auf Ihre Spark-Tabelle verweist.TABLE_NAME
: ein Tabellenname für Ihre Spark-Tabelle.WAREHOUSE_DIRECTORY
: Der URI des Cloud Storage-Ordners, in dem Ihr Data Warehouse gespeichert ist.
Senden Sie einen Spark SQL-Batchjob, indem Sie den folgenden
gcloud dataproc batches submit spark-sql
-Befehl ausführen:gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \ --project=PROJECT_ID \ --region=REGION \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
Ersetzen Sie Folgendes:
SQL_SCRIPT_PATH
: der Pfad zur SQL-Datei, die vom Batchjob verwendet wird.PROJECT_ID
: Die ID des Google Cloud Projekts, in dem der Batchjob ausgeführt werden soll.REGION
: die Region, in der Ihre Arbeitslast ausgeführt wird.SUBNET_NAME
(optional): der Name eines VPC-Subnetzes inREGION
, das die Anforderungen an das Sitzungssubnetz erfüllt.BUCKET_PATH
: Der Speicherort des Cloud Storage-Bucket zum Hochladen von Arbeitslastabhängigkeiten. Die DateiWAREHOUSE_DIRECTORY
befindet sich in diesem Bucket. Dasgs://
-URI-Präfix des Buckets ist nicht erforderlich. Sie können den Bucket-Pfad oder den Bucket-Namen angeben, z. B.mybucketname1
.LOCATION
: Der Standort, an dem der Batchjob ausgeführt werden soll.
Weitere Informationen zum Einreichen von Spark-Batchjobs finden Sie unter Spark-Batcharbeitslast ausführen.
PySpark
Erstellen Sie eine Python-Datei mit den PySpark-Befehlen, die Sie im BigLake Metastore ausführen möchten.
Mit dem folgenden Befehl wird beispielsweise eine Spark-Umgebung für die Interaktion mit Iceberg-Tabellen eingerichtet, die im BigLake Metastore gespeichert sind. Mit dem Befehl wird dann ein neuer Namespace und eine Iceberg-Tabelle in diesem Namespace erstellt.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("BigLake Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")
Ersetzen Sie Folgendes:
PROJECT_ID
: Die ID des Google Cloud Projekts, in dem der Batchjob ausgeführt werden soll.LOCATION
: Der Standort, an dem sich die BigQuery-Ressourcen befinden.CATALOG_NAME
: Der Katalogname, der auf Ihre Spark-Tabelle verweist.TABLE_NAME
: ein Tabellenname für Ihre Spark-Tabelle.WAREHOUSE_DIRECTORY
: Der URI des Cloud Storage-Ordners, in dem Ihr Data Warehouse gespeichert ist.NAMESPACE_NAME
: Der Namespace-Name, der auf Ihre Spark-Tabelle verweist.
Senden Sie den Batchjob mit dem folgenden
gcloud dataproc batches submit pyspark
-Befehl:gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
Ersetzen Sie Folgendes:
PYTHON_SCRIPT_PATH
: Der Pfad zum Python-Skript, das vom Batchjob verwendet wird.PROJECT_ID
: Die ID des Google Cloud Projekts, in dem der Batchjob ausgeführt werden soll.REGION
: die Region, in der Ihre Arbeitslast ausgeführt wird.BUCKET_PATH
: Der Speicherort des Cloud Storage-Bucket zum Hochladen von Arbeitslastabhängigkeiten. Dasgs://
-URI-Präfix des Buckets ist nicht erforderlich. Sie können den Bucket-Pfad oder den Bucket-Namen angeben, z. B.mybucketname1
.
Weitere Informationen zum Senden von PySpark-Batchjobs finden Sie in der gcloud-Referenz für PySpark.