Kontinuierliche Abfragen erstellen
In diesem Dokument wird beschrieben, wie Sie eine kontinuierliche Abfrage in BigQuery ausführen.
BigQuery-kontinuierliche Abfragen sind SQL-Anweisungen, die kontinuierlich ausgeführt werden. Mit kontinuierlichen Abfragen können Sie eingehende Daten in BigQuery in Echtzeit analysieren und die Ergebnisse dann entweder nach Bigtable oder Pub/Sub exportieren oder in eine BigQuery-Tabelle schreiben.
Kontotyp auswählen
Sie können einen Job für kontinuierliche Abfragen mit einem Nutzerkonto erstellen und ausführen. Sie haben auch die Möglichkeit, einen Job für kontinuierliche Abfragen mit einem Nutzerkonto zu erstellen und dann mit einem Dienstkonto auszuführen. Sie müssen ein Dienstkonto verwenden, um eine kontinuierliche Abfrage auszuführen, die Ergebnisse in ein Pub/Sub-Thema exportiert.
Wenn Sie ein Nutzerkonto verwenden, wird eine kontinuierliche Abfrage zwei Tage lang ausgeführt. Wenn Sie ein Dienstkonto verwenden, wird eine kontinuierliche Abfrage so lange ausgeführt, bis sie explizit abgebrochen wird. Weitere Informationen finden Sie unter Autorisierung.
Erforderliche Berechtigungen
In diesem Abschnitt werden die Berechtigungen beschrieben, die Sie zum Erstellen und Ausführen einer kontinuierlichen Abfrage benötigen. Alternativ zu den genannten IAM-Rollen (Identity and Access Management) können Sie die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen erhalten.
Berechtigungen bei Verwendung eines Nutzerkontos
Dieser Abschnitt enthält Informationen zu den Rollen und Berechtigungen, die zum Erstellen und Ausführen einer kontinuierlichen Abfrage mit einem Nutzerkonto erforderlich sind.
Zum Erstellen eines Jobs in BigQuery muss das Nutzerkonto die IAM-Berechtigung bigquery.jobs.create
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.jobs.create
:
- BigQuery-Nutzer (
roles/bigquery.user
) - BigQuery-Jobnutzer(
roles/bigquery.jobUser
) - BigQuery Admin (
roles/bigquery.admin
)
Zum Exportieren von Daten aus einer BigQuery-Tabelle muss das Nutzerkonto die IAM-Berechtigung bigquery.tables.export
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.export
:
- BigQuery Data Viewer (
roles/bigquery.dataViewer
) - BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
Zum Aktualisieren von Daten in einer BigQuery-Tabelle muss das Nutzerkonto die IAM-Berechtigung bigquery.tables.updateData
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.updateData
:
- BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
Wenn das Nutzerkonto die APIs aktivieren muss, die für Ihren Anwendungsfall für kontinuierliche Abfragen erforderlich sind, muss das Nutzerkonto die Rolle Service Usage Admin (roles/serviceusage.serviceUsageAdmin
) haben.
Berechtigungen bei Verwendung eines Dienstkontos
Dieser Abschnitt enthält Informationen zu den Rollen und Berechtigungen, die für das Nutzerkonto erforderlich sind, mit dem die kontinuierliche Abfrage erstellt wird, sowie zum Dienstkonto, das die kontinuierliche Abfrage ausführt.
Nutzerkontoberechtigungen
Zum Erstellen eines Jobs in BigQuery muss das Nutzerkonto die IAM-Berechtigung bigquery.jobs.create
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.jobs.create
:
- BigQuery-Nutzer (
roles/bigquery.user
) - BigQuery-Jobnutzer(
roles/bigquery.jobUser
) - BigQuery Admin (
roles/bigquery.admin
)
Um einen Job einzureichen, der mit einem Dienstkonto ausgeführt wird, muss das Nutzerkonto die Rolle Dienstkontonutzerroles/iam.serviceAccountUser
() haben. Wenn Sie dasselbe Nutzerkonto zum Erstellen des Dienstkontos verwenden, muss das Nutzerkonto die Rolle Service Account Admin (roles/iam.serviceAccountAdmin
) haben. Informationen dazu, wie Sie den Zugriff eines Nutzers auf ein einzelnes Dienstkonto statt auf alle Dienstkonten in einem Projekt beschränken, finden Sie unter Eine Rolle zuweisen.
Wenn das Nutzerkonto die APIs aktivieren muss, die für Ihren Anwendungsfall mit kontinuierlichen Abfragen erforderlich sind, muss das Nutzerkonto die Rolle Service Usage Admin (roles/serviceusage.serviceUsageAdmin
) haben.
Dienstkontoberechtigungen
Zum Exportieren von Daten aus einer BigQuery-Tabelle muss das Dienstkonto die IAM-Berechtigung bigquery.tables.export
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.export
:
- BigQuery Data Viewer (
roles/bigquery.dataViewer
) - BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
bigquery.tables.updateData
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.updateData
:
- BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
Hinweise
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery API.
Reservierung erstellen
Erstellen Sie eine Reservierung für die Enterprise- oder Enterprise Plus-Version und dann eine Reservierungszuweisung mit dem Jobtyp CONTINUOUS
.
Wenn Sie eine Reservierungszuweisung für eine kontinuierliche Abfrage erstellen, ist die zugehörige Reservierung auf maximal 500 Slots beschränkt und kann nicht für die Verwendung von Autoscaling konfiguriert werden.
Nach Pub/Sub exportieren
Zusätzliche APIs, IAM-Berechtigungen und Google Cloud -Ressourcen sind erforderlich, um Daten nach Pub/Sub zu exportieren. Weitere Informationen finden Sie unter In Pub/Sub exportieren.
Benutzerdefinierte Attribute als Metadaten in Pub/Sub-Nachrichten einbetten
Mit Pub/Sub-Attributen können Sie zusätzliche Informationen zur Nachricht angeben, z. B. ihre Priorität, ihren Ursprung, ihr Ziel oder zusätzliche Metadaten. Sie können auch Attribute verwenden, um Nachrichten im Abo zu filtern.
Wenn eine Spalte in einem kontinuierlichen Abfrageergebnis den Namen _ATTRIBUTES
hat, werden ihre Werte in die Pub/Sub-Nachrichtenattribute kopiert.
Die angegebenen Felder in _ATTRIBUTES
werden als Attributschlüssel verwendet.
Die Spalte _ATTRIBUTES
muss vom Typ JSON
sein und das Format ARRAY<STRUCT<STRING, STRING>>
oder STRUCT<STRING>
haben.
Ein Beispiel finden Sie unter Daten in ein Pub/Sub-Thema exportieren.
Nach Bigtable exportieren
Zusätzliche APIs, IAM-Berechtigungen und Google Cloud-Ressourcen sind erforderlich, um Daten nach Bigtable zu exportieren. Weitere Informationen finden Sie unter In Bigtable exportieren.
Daten in eine BigQuery-Tabelle schreiben
Mit einer INSERT
-Anweisung können Sie Daten in eine BigQuery-Tabelle schreiben.
KI-Funktionen verwenden
Zusätzliche APIs, IAM-Berechtigungen und Google Cloud-Ressourcen sind erforderlich, um eine unterstützte KI-Funktion in einer kontinuierlichen Abfrage zu verwenden. Weitere Informationen finden Sie je nach Anwendungsfall in den folgenden Themen:
- Text mit der Funktion
ML.GENERATE_TEXT
generieren - Text-Embeddings mit der Funktion
ML.GENERATE_EMBEDDING
generieren - Text mit der Funktion
ML.UNDERSTAND_TEXT
verstehen - Text mit der Funktion
ML.TRANSLATE
übersetzen
Wenn Sie eine KI-Funktion in einer kontinuierlichen Abfrage verwenden, sollten Sie prüfen, ob die Abfrageausgabe innerhalb des Kontingents für die Funktion bleibt. Wenn Sie das Kontingent überschreiten, müssen Sie die nicht verarbeiteten Daten möglicherweise separat verarbeiten.
Kontinuierliche Abfrage mit einem Nutzerkonto ausführen
In diesem Abschnitt wird beschrieben, wie Sie eine kontinuierliche Abfrage mit einem Nutzerkonto ausführen. Nachdem die kontinuierliche Abfrage ausgeführt wird, können Sie die Google Cloud -Konsole, das Terminalfenster oder die Anwendung schließen, ohne die Abfrageausführung zu unterbrechen.
So führen Sie eine kontinuierliche Abfrage aus:
Console
Öffnen Sie in der Google Cloud Console die Seite BigQuery.
Klicken Sie im Abfrageeditor auf das Dreipunkt-Menü.
Wählen Sie im Bereich Abfragemodus auswählen die Option Kontinuierliche Abfrage aus.
Klicken Sie auf Bestätigen.
Geben Sie im Abfrageeditor die SQL-Anweisung für die Continuous Query ein. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
Klicken Sie auf Ausführen.
bq
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Führen Sie in Cloud Shell die kontinuierliche Abfrage mit dem Befehl
bq query
und dem Flag--continuous
aus:bq query --use_legacy_sql=false --continuous=true 'QUERY'
Ersetzen Sie
QUERY
durch die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
API
Führen Sie die kontinuierliche Abfrage durch Aufrufen der Methode jobs.insert
aus.
Sie müssen das Feld continuous
in der JobConfigurationQuery
der Job
-Ressource auf true
setzen, die Sie übergeben.
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))' --compressed
Ersetzen Sie Folgendes:
PROJECT_ID
: Ihre Projekt-ID.QUERY
: Die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
Kontinuierliche Abfrage mit einem Dienstkonto ausführen
In diesem Abschnitt wird beschrieben, wie Sie eine kontinuierliche Abfrage mit einem Dienstkonto ausführen. Nachdem die kontinuierliche Abfrage ausgeführt wird, können Sie die Google Cloud -Konsole, das Terminalfenster oder die Anwendung schließen, ohne die Abfrageausführung zu unterbrechen.
So führen Sie mit einem Dienstkonto eine kontinuierliche Abfrage aus:
Console
- Erstellen Sie ein Dienstkonto.
- Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
Öffnen Sie in der Google Cloud Console die Seite BigQuery.
Klicken Sie im Abfrageeditor auf das Dreipunkt-Menü.
Wählen Sie im Bereich Abfragemodus auswählen die Option Kontinuierliche Abfrage aus.
Klicken Sie auf Bestätigen.
Klicken Sie im Abfrageeditor auf Mehr > Abfrageeinstellungen.
Wählen Sie im Bereich Kontinuierliche Abfrage im Feld Dienstkonto das von Ihnen erstellte Dienstkonto aus.
Klicken Sie auf Speichern.
Geben Sie im Abfrageeditor die SQL-Anweisung für die Continuous Query ein. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
Klicken Sie auf Ausführen.
bq
- Erstellen Sie ein Dienstkonto.
- Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Führen Sie die kontinuierliche Abfrage in der Befehlszeile mit dem Befehl
bq query
und den folgenden Flags aus:- Legen Sie das Flag
--continuous
auftrue
fest, um die Abfrage kontinuierlich auszuführen. - Verwenden Sie das Flag
--connection_property
, um ein Dienstkonto anzugeben.
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
Ersetzen Sie Folgendes:
PROJECT_ID
: Ihre Projekt-ID.SERVICE_ACCOUNT_EMAIL
: die E-Mail-Adresse des Dienstkontos. Sie finden die E-Mail-Adresse des Dienstkontos auf der Seite Dienstkonten der Google Cloud -Konsole.QUERY
: Die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
- Legen Sie das Flag
API
- Erstellen Sie ein Dienstkonto.
- Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
Führen Sie die kontinuierliche Abfrage durch Aufrufen der Methode
jobs.insert
aus. Legen Sie die folgenden Felder in derJobConfigurationQuery
-Ressource der übergebenenJob
-Ressource fest:- Legen Sie das Feld
continuous
auftrue
fest, um die Abfrage fortlaufend auszuführen. - Geben Sie im Feld
connection_property
ein Dienstkonto an, das verwendet werden soll.
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth print-access-token) \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \ --compressed
Ersetzen Sie Folgendes:
PROJECT_ID
: Ihre Projekt-ID.QUERY
: die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.SERVICE_ACCOUNT_EMAIL
: die E-Mail-Adresse des Dienstkontos. Sie finden die E-Mail-Adresse des Dienstkontos auf der Seite Dienstkonten der Google Cloud -Konsole.
- Legen Sie das Feld
Beispiele
Die folgenden SQL-Beispiele zeigen gängige Anwendungsfälle für kontinuierliche Abfragen.
Daten in ein Pub/Sub-Thema exportieren
Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert, die Informationen zu Streamingtaxifahrten empfängt, und die Daten dann in Echtzeit mit Nachrichtenattributen in einem Pub/Sub-Thema veröffentlicht:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM `myproject.real_time_taxi_streaming.taxi_rides` WHERE ride_status = 'enroute' );
Daten in eine Bigtable-Tabelle exportieren
Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert, die Informationen zu Streamingtaxifahrten empfängt, und die Daten in Echtzeit in eine Bigtable-Tabelle exportiert:
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' );
Daten in eine BigQuery-Tabelle schreiben
Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert und transformiert, die Informationen zu Streamingtaxifahrten empfängt, und die Daten dann in Echtzeit in eine andere BigQuery-Tabelle schreibt. So stehen die Daten für weitere nachgelagerte Analysen zur Verfügung.
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'dropoff';
Daten mit einem Vertex AI-Modell verarbeiten
Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die ein Vertex AI-Modell verwendet, um eine Anzeige für Taxigäste anhand des aktuellen Breiten- und Längengrads zu generieren. Anschließend werden die Ergebnisse in ein Pub/Sub-Thema in Echtzeit exportiert:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
Kontinuierliche Abfrage ab einem bestimmten Zeitpunkt starten
Wenn Sie eine kontinuierliche Abfrage starten, werden alle Zeilen in der Tabelle verarbeitet, aus der Sie auswählen, und neue Zeilen verarbeitet, sobald sie eingehen. Wenn Sie die Verarbeitung einiger oder aller vorhandenen Daten überspringen möchten, können Sie die Änderungsverlaufsfunktion APPENDS
verwenden, um die Verarbeitung ab einem bestimmten Zeitpunkt zu starten.
Im folgenden Beispiel wird veranschaulicht, wie Sie mithilfe der Funktion APPENDS
eine kontinuierliche Abfrage ab einem bestimmten Zeitpunkt starten:
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL) WHERE ride_status = 'enroute');
SQL-Code einer kontinuierlichen Abfrage ändern
Sie können die in einer Continuous Query verwendete SQL-Abfrage nicht aktualisieren, während der Continuous Query-Job ausgeführt wird. Sie müssen den Job für die kontinuierliche Abfrage abbrechen, die SQL-Anweisung ändern und dann einen neuen Job für die kontinuierliche Abfrage an dem Punkt starten, an dem Sie den ursprünglichen Job für die kontinuierliche Abfrage angehalten haben.
So ändern Sie die in einer Continuous Query verwendete SQL-Abfrage:
- Rufen Sie die Jobdetails für den Job mit fortlaufender Abfrage auf, den Sie aktualisieren möchten, und notieren Sie sich die Job-ID.
- Pausieren Sie nach Möglichkeit die Erhebung von Upstream-Daten. Wenn Sie das nicht tun, kann es beim Neustart der kontinuierlichen Abfrage zu Datenduplikationen kommen.
- Brechen Sie die kontinuierliche Abfrage ab, die Sie ändern möchten.
Rufen Sie den
end_time
-Wert für den ursprünglichen kontinuierlichen Abfragejob mithilfe der AnsichtINFORMATION_SCHEMA
JOBS
ab:SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
Ersetzen Sie Folgendes:
PROJECT_ID
: Ihre Projekt-ID.REGION
: die Region, die für Ihr Projekt verwendet wird.JOB_ID
: Die ID des Jobs für kontinuierliche Abfragen, die Sie in Schritt 1 ermittelt haben.
Ändern Sie die SQL-Anweisung für die kontinuierliche Abfrage so, dass die kontinuierliche Abfrage ab einem bestimmten Zeitpunkt gestartet wird. Verwenden Sie dabei den in Schritt 5 abgerufenen
end_time
-Wert als Ausgangspunkt.Ändern Sie die SQL-Anweisung für die kontinuierliche Abfrage entsprechend den erforderlichen Änderungen.
Führen Sie die geänderte kontinuierliche Abfrage aus.
Kontinuierliche Abfrage abbrechen
Sie können einen Job für kontinuierliche Abfragen wie jeden anderen Job abbrechen. Es kann bis zu einer Minute dauern, bis die Abfrage nach dem Abbruch des Jobs beendet wird.