Kontinuierliche Abfragen erstellen
In diesem Dokument wird beschrieben, wie Sie eine kontinuierliche Abfrage in BigQuery ausführen.
Kontinuierliche BigQuery-Abfragen sind SQL-Anweisungen, die kontinuierlich ausgeführt werden. Mit kontinuierlichen Abfragen können Sie eingehende Daten in BigQuery in Echtzeit analysieren und die Ergebnisse entweder nach Bigtable oder Pub/Sub exportieren oder in eine BigQuery-Tabelle schreiben.
Kontotyp auswählen
Sie können einen Job für kontinuierliche Abfragen mit einem Nutzerkonto erstellen und ausführen. Sie haben auch die Möglichkeit, einen Job für kontinuierliche Abfragen mit einem Nutzerkonto zu erstellen und dann mit einem Dienstkonto auszuführen. Sie müssen ein Dienstkonto verwenden, um eine kontinuierliche Abfrage auszuführen, die Ergebnisse in ein Pub/Sub-Thema exportiert.
Bei Verwendung eines Nutzerkontos wird eine kontinuierliche Abfrage zwei Tage lang ausgeführt. Wenn Sie ein Dienstkonto verwenden, wird eine kontinuierliche Abfrage ausgeführt, bis sie explizit abgebrochen wird. Weitere Informationen finden Sie unter Autorisierung.
Erforderliche Berechtigungen
In diesem Abschnitt werden die Berechtigungen beschrieben, die Sie zum Erstellen und Ausführen einer kontinuierlichen Abfrage benötigen. Als Alternative zu den genannten IAM-Rollen (Identity and Access Management) können Sie die erforderlichen Berechtigungen über benutzerdefinierte Rollen erhalten.
Berechtigungen bei Verwendung eines Nutzerkontos
Dieser Abschnitt enthält Informationen zu den Rollen und Berechtigungen, die zum Erstellen und Ausführen einer kontinuierlichen Abfrage mithilfe eines Nutzerkontos erforderlich sind.
Zum Erstellen eines Jobs in BigQuery muss das Nutzerkonto die IAM-Berechtigung bigquery.jobs.create
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.jobs.create
:
- BigQuery-Nutzer (
roles/bigquery.user
) - BigQuery-Jobnutzer(
roles/bigquery.jobUser
) - BigQuery Admin (
roles/bigquery.admin
)
Zum Exportieren von Daten aus einer BigQuery-Tabelle muss das Nutzerkonto die IAM-Berechtigung bigquery.tables.export
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.export
:
- BigQuery Data Viewer (
roles/bigquery.dataViewer
) - BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
Zum Aktualisieren von Daten in einer BigQuery-Tabelle muss das Nutzerkonto die IAM-Berechtigung bigquery.tables.updateData
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.updateData
:
- BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
Wenn das Nutzerkonto die APIs aktivieren muss, die für Ihren Anwendungsfall für kontinuierliche Abfragen erforderlich sind, muss das Nutzerkonto die Rolle Service Usage Admin (roles/serviceusage.serviceUsageAdmin
) haben.
Berechtigungen bei Verwendung eines Dienstkontos
Dieser Abschnitt enthält Informationen zu den Rollen und Berechtigungen, die für das Nutzerkonto erforderlich sind, mit dem die kontinuierliche Abfrage erstellt wird, sowie für das Dienstkonto, das die kontinuierliche Abfrage ausführt.
Nutzerkontoberechtigungen
Zum Erstellen eines Jobs in BigQuery muss das Nutzerkonto die IAM-Berechtigung bigquery.jobs.create
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.jobs.create
:
- BigQuery-Nutzer (
roles/bigquery.user
) - BigQuery-Jobnutzer(
roles/bigquery.jobUser
) - BigQuery Admin (
roles/bigquery.admin
)
Zum Senden eines Jobs, der mit einem Dienstkonto ausgeführt wird, muss das Nutzerkonto die Rolle Dienstkontonutzer (roles/iam.serviceAccountUser
) haben. Wenn Sie dasselbe Nutzerkonto zum Erstellen des Dienstkontos verwenden, muss das Nutzerkonto die Rolle Service Account Admin (roles/iam.serviceAccountAdmin
) haben. Informationen zum Beschränken des Zugriffs eines Nutzers auf ein einzelnes Dienstkonto und nicht auf alle Dienstkonten innerhalb eines Projekts finden Sie unter Einzelne Rolle zuweisen.
Wenn das Nutzerkonto die APIs aktivieren muss, die für Ihren Anwendungsfall mit kontinuierlichen Abfragen erforderlich sind, muss das Nutzerkonto die Rolle Service Usage Admin (roles/serviceusage.serviceUsageAdmin
) haben.
Dienstkontoberechtigungen
Zum Exportieren von Daten aus einer BigQuery-Tabelle muss das Dienstkonto die IAM-Berechtigung bigquery.tables.export
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.export
:
- BigQuery Data Viewer (
roles/bigquery.dataViewer
) - BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
bigquery.tables.updateData
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.updateData
:
- BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
Hinweise
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery API.
Reservierung erstellen
Erstellen Sie eine Reservierung für Enterprise oder Enterprise Plus und erstellen Sie dann eine Reservierungszuweisung mit einem CONTINUOUS
-Jobtyp.
Nach Pub/Sub exportieren
Zum Exportieren von Daten nach Pub/Sub sind zusätzliche APIs, IAM-Berechtigungen und Google Cloud-Ressourcen erforderlich. Weitere Informationen finden Sie unter Nach Pub/Sub exportieren.
Nach Bigtable exportieren
Zum Exportieren von Daten nach Bigtable sind zusätzliche APIs, IAM-Berechtigungen und Google Cloud-Ressourcen erforderlich. Weitere Informationen finden Sie unter Nach Bigtable exportieren.
Daten in eine BigQuery-Tabelle schreiben
Mit einer INSERT
-Anweisung können Sie Daten in eine BigQuery-Tabelle schreiben.
KI-Funktionen verwenden
Es sind zusätzliche APIs, IAM-Berechtigungen und Google Cloud-Ressourcen erforderlich, um eine unterstützte KI-Funktion in einer kontinuierlichen Abfrage zu verwenden. Weitere Informationen finden Sie je nach Anwendungsfall in einem der folgenden Themen:
- Text mit der Funktion
ML.GENERATE_TEXT
generieren - Texteinbettungen mit der Funktion
ML.GENERATE_EMBEDDING
generieren - Text mit der Funktion
ML.UNDERSTAND_TEXT
verstehen - Text mit der Funktion
ML.TRANSLATE
übersetzen
Wenn Sie eine KI-Funktion in einer kontinuierlichen Abfrage verwenden, sollten Sie prüfen, ob die Abfrageausgabe im Kontingent für die Funktion verbleibt. Wenn Sie das Kontingent überschreiten, müssen Sie möglicherweise die Einträge, die nicht verarbeitet werden, separat verarbeiten.
Kontinuierliche Abfrage mit einem Nutzerkonto ausführen
In diesem Abschnitt wird beschrieben, wie Sie eine kontinuierliche Abfrage mit einem Nutzerkonto ausführen. Sobald die kontinuierliche Abfrage ausgeführt wurde, können Sie die Google Cloud Console, das Terminalfenster oder die Anwendung schließen, ohne die Abfrageausführung zu unterbrechen.
So führen Sie eine kontinuierliche Abfrage aus:
Console
Öffnen Sie in der Google Cloud Console die Seite BigQuery.
Klicken Sie im Abfrageeditor auf Mehr.
Wählen Sie im Abschnitt Abfragemodus auswählen die Option Kontinuierliche Abfrage aus.
Klicken Sie auf Bestätigen.
Geben Sie im Abfrageeditor die SQL-Anweisung für die kontinuierliche Abfrage ein. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
Klicken Sie auf Ausführen.
bq
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Führen Sie in Cloud Shell die kontinuierliche Abfrage mit dem Befehl
bq query
und dem Flag--continuous
aus:bq query --use_legacy_sql=false --continuous=true 'QUERY'
Ersetzen Sie
QUERY
durch die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
API
Führen Sie die kontinuierliche Abfrage aus, indem Sie die Methode jobs.insert
aufrufen.
Sie müssen das Feld continuous
in der JobConfigurationQuery
der Job
-Ressource auf true
setzen, die Sie übergeben.
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))' --compressed
Ersetzen Sie Folgendes:
PROJECT_ID
: Ihre Projekt-ID.QUERY
: Die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
Kontinuierliche Abfrage mithilfe eines Dienstkontos ausführen
In diesem Abschnitt wird beschrieben, wie Sie eine kontinuierliche Abfrage mithilfe eines Dienstkontos ausführen. Sobald die kontinuierliche Abfrage ausgeführt wurde, können Sie die Google Cloud Console, das Terminalfenster oder die Anwendung schließen, ohne die Abfrageausführung zu unterbrechen.
So führen Sie über ein Dienstkonto eine kontinuierliche Abfrage aus:
Console
- Erstellen Sie ein Dienstkonto.
- Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
Öffnen Sie in der Google Cloud Console die Seite BigQuery.
Klicken Sie im Abfrageeditor auf Mehr.
Wählen Sie im Abschnitt Abfragemodus auswählen die Option Kontinuierliche Abfrage aus.
Klicken Sie auf Bestätigen.
Klicken Sie im Abfrageeditor auf Mehr > Abfrageeinstellungen.
Wählen Sie im Abschnitt Kontinuierliche Abfrage im Feld Dienstkonto das von Ihnen erstellte Dienstkonto aus.
Klicken Sie auf Speichern.
Geben Sie im Abfrageeditor die SQL-Anweisung für die kontinuierliche Abfrage ein. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
Klicken Sie auf Ausführen.
bq
- Erstellen Sie ein Dienstkonto.
- Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Führen Sie in der Befehlszeile die kontinuierliche Abfrage mit dem Befehl
bq query
und den folgenden Flags aus:- Setzen Sie das Flag
--continuous
auftrue
, um die Abfrage kontinuierlich zu machen. - Verwenden Sie das Flag
--connection_property
, um das zu verwendende Dienstkonto anzugeben.
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
Ersetzen Sie Folgendes:
PROJECT_ID
: Ihre Projekt-ID.SERVICE_ACCOUNT_EMAIL
: die E-Mail-Adresse des Dienstkontos. Sie können die E-Mail-Adresse des Dienstkontos auf der Seite Dienstkonten in der Google Cloud Console abrufen.QUERY
: Die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
- Setzen Sie das Flag
API
- Erstellen Sie ein Dienstkonto.
- Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
Führen Sie die kontinuierliche Abfrage aus, indem Sie die Methode
jobs.insert
aufrufen. Legen Sie die folgenden Felder in der RessourceJobConfigurationQuery
der RessourceJob
fest, die Sie übergeben:- Setzen Sie das Feld
continuous
auftrue
, um die Abfrage kontinuierlich zu machen. - Verwenden Sie das Feld
connection_property
, um das zu verwendende Dienstkonto anzugeben.
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth print-access-token) \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \ --compressed
Ersetzen Sie Folgendes:
PROJECT_ID
: Ihre Projekt-ID.QUERY
: Die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.SERVICE_ACCOUNT_EMAIL
: die E-Mail-Adresse des Dienstkontos. Sie können die E-Mail-Adresse des Dienstkontos auf der Seite Dienstkonten in der Google Cloud Console abrufen.
- Setzen Sie das Feld
Beispiele
Die folgenden SQL-Beispiele zeigen gängige Anwendungsfälle für kontinuierliche Abfragen.
Daten in ein Pub/Sub-Thema exportieren
Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert, die gestreamte Taxifahrteninformationen empfängt, und diese Daten in Echtzeit in einem Pub/Sub-Thema veröffentlicht:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message FROM `myproject.real_time_taxi_streaming.taxi_rides` WHERE ride_status = 'enroute' );
Daten in eine Bigtable-Tabelle exportieren
Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert, die Streaminginformationen zu Taxifahrten empfängt, und die Daten in Echtzeit in die Bigtable-Tabelle exportiert:
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' );
Daten in eine BigQuery-Tabelle schreiben
Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert und transformiert, die Informationen zu Streamingtaxifahrten empfängt, und die Daten dann in Echtzeit in eine andere BigQuery-Tabelle schreibt. Dadurch stehen die Daten für weitere nachgelagerte Analysen zur Verfügung.
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'dropoff';
Daten mit einem Vertex AI-Modell verarbeiten
Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die ein Vertex AI-Modell verwendet, um eine Anzeige für Taxigäste anhand des aktuellen Breiten- und Längengrads zu generieren. Anschließend werden die Ergebnisse in ein Pub/Sub-Thema in Echtzeit exportiert:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
Kontinuierliche Abfrage ab einem bestimmten Zeitpunkt starten
Wenn Sie eine kontinuierliche Abfrage starten, werden alle Zeilen in der Tabelle verarbeitet, aus der Sie auswählen, und neue Zeilen verarbeitet, sobald sie eingehen. Wenn Sie die Verarbeitung einiger oder aller vorhandenen Daten überspringen möchten, können Sie die Änderungsverlaufsfunktion APPENDS
verwenden, um die Verarbeitung ab einem bestimmten Zeitpunkt zu starten.
Das folgende Beispiel zeigt, wie eine kontinuierliche Abfrage mit der Funktion APPENDS
zu einem bestimmten Zeitpunkt gestartet wird:
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL) WHERE ride_status = 'enroute');
SQL einer kontinuierlichen Abfrage ändern
Sie können den SQL-Code, der in einer kontinuierlichen Abfrage verwendet wird, nicht aktualisieren, während der kontinuierliche Abfragejob ausgeführt wird. Sie müssen den kontinuierlichen Abfragejob abbrechen, den SQL-Code ändern und dann einen neuen kontinuierlichen Abfragejob ab dem Punkt starten, an dem Sie den ursprünglichen kontinuierlichen Abfragejob beendet haben.
So ändern Sie den SQL-Code, der in einer kontinuierlichen Abfrage verwendet wird:
- Rufen Sie die Jobdetails für den kontinuierlichen Abfragejob auf, den Sie aktualisieren möchten, und notieren Sie sich die Job-ID.
- Pausieren Sie nach Möglichkeit die Erfassung von vorgelagerten Daten. Wenn dies nicht möglich ist, erhalten Sie beim Neustart der kontinuierlichen Abfrage möglicherweise eine Datenduplizierung.
- Brechen Sie die kontinuierliche Abfrage ab, die Sie ändern möchten.
Rufen Sie den Wert
end_time
für den ursprünglichen kontinuierlichen Abfragejob mit der AnsichtJOBS
INFORMATION_SCHEMA
ab:SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
Ersetzen Sie Folgendes:
PROJECT_ID
: Ihre Projekt-ID.REGION
ist die Region, die von Ihrem Projekt verwendet wird.JOB_ID
: Die Job-ID der kontinuierlichen Abfrage, die Sie in Schritt 1 identifiziert haben.
Ändern Sie die SQL-Anweisung für die kontinuierliche Abfrage so, dass die kontinuierliche Abfrage ab einem bestimmten Zeitpunkt gestartet wird. Verwenden Sie dabei den in Schritt 5 abgerufenen
end_time
-Wert als Ausgangspunkt.Ändern Sie die SQL-Anweisung für die kontinuierliche Abfrage entsprechend Ihren erforderlichen Änderungen.
Führen Sie die geänderte kontinuierliche Abfrage aus.
Kontinuierliche Abfrage abbrechen
Sie können einen kontinuierlichen Abfragejob wie jeden anderen Job abbrechen. Nach dem Abbruch des Jobs kann es bis zu einer Minute dauern, bis die Abfrage nicht mehr ausgeführt wird.