Kontinuierliche Abfragen erstellen
In diesem Dokument wird beschrieben, wie Sie eine kontinuierliche Abfrage in BigQuery ausführen.
BigQuery-Continuous Queries sind SQL-Anweisungen, die kontinuierlich ausgeführt werden. Mit kontinuierlichen Abfragen können Sie eingehende Daten in BigQuery in Echtzeit analysieren und die Ergebnisse dann entweder nach Bigtable, Pub/Sub oder Spanner exportieren oder in eine BigQuery-Tabelle schreiben.
Kontotyp auswählen
Sie können einen Job für kontinuierliche Abfragen mit einem Nutzerkonto erstellen und ausführen. Sie haben auch die Möglichkeit, einen Job für kontinuierliche Abfragen mit einem Nutzerkonto zu erstellen und dann mit einem Dienstkonto auszuführen. Sie müssen ein Dienstkonto verwenden, um eine kontinuierliche Abfrage auszuführen, mit der Ergebnisse in ein Pub/Sub-Thema exportiert werden.
Wenn Sie ein Nutzerkonto verwenden, wird eine kontinuierliche Abfrage bis zu zwei Tage lang ausgeführt. Wenn Sie ein Dienstkonto verwenden, wird eine kontinuierliche Abfrage bis zu 150 Tage lang ausgeführt. Weitere Informationen finden Sie unter Autorisierung.
Erforderliche Berechtigungen
In diesem Abschnitt werden die Berechtigungen beschrieben, die Sie zum Erstellen und Ausführen einer kontinuierlichen Abfrage benötigen. Alternativ zu den erwähnten IAM-Rollen (Identity and Access Management) können Sie die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen erhalten.
Berechtigungen bei Verwendung eines Nutzerkontos
Dieser Abschnitt enthält Informationen zu den Rollen und Berechtigungen, die zum Erstellen und Ausführen einer kontinuierlichen Abfrage mit einem Nutzerkonto erforderlich sind.
Zum Erstellen eines Jobs in BigQuery muss das Nutzerkonto die IAM-Berechtigung bigquery.jobs.create
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.jobs.create
:
- BigQuery-Nutzer (
roles/bigquery.user
) - BigQuery-Jobnutzer(
roles/bigquery.jobUser
) - BigQuery Admin (
roles/bigquery.admin
)
Zum Exportieren von Daten aus einer BigQuery-Tabelle muss das Nutzerkonto die IAM-Berechtigung bigquery.tables.export
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.export
:
- BigQuery Data Viewer (
roles/bigquery.dataViewer
) - BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
Zum Aktualisieren von Daten in einer BigQuery-Tabelle muss das Nutzerkonto die IAM-Berechtigung bigquery.tables.updateData
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.updateData
:
- BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
Wenn das Nutzerkonto die APIs aktivieren muss, die für Ihren Anwendungsfall für kontinuierliche Abfragen erforderlich sind, muss das Nutzerkonto die Rolle Service Usage Admin (roles/serviceusage.serviceUsageAdmin
) haben.
Berechtigungen bei Verwendung eines Dienstkontos
Dieser Abschnitt enthält Informationen zu den Rollen und Berechtigungen, die für das Nutzerkonto erforderlich sind, mit dem die kontinuierliche Abfrage erstellt wird, sowie zum Dienstkonto, das die kontinuierliche Abfrage ausführt.
Nutzerkontoberechtigungen
Zum Erstellen eines Jobs in BigQuery muss das Nutzerkonto die IAM-Berechtigung bigquery.jobs.create
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.jobs.create
:
- BigQuery-Nutzer (
roles/bigquery.user
) - BigQuery-Jobnutzer(
roles/bigquery.jobUser
) - BigQuery Admin (
roles/bigquery.admin
)
Wenn Sie einen Job einreichen möchten, der mit einem Dienstkonto ausgeführt wird, muss das Nutzerkonto die Rolle Dienstkontonutzer (roles/iam.serviceAccountUser
) haben. Wenn Sie dasselbe Nutzerkonto zum Erstellen des Dienstkontos verwenden, muss das Nutzerkonto die Rolle Service Account Admin (roles/iam.serviceAccountAdmin
) haben. Informationen dazu, wie Sie den Zugriff eines Nutzers auf ein einzelnes Dienstkonto anstelle aller Dienstkonten in einem Projekt einschränken, finden Sie unter Einzelne Rolle zuweisen.
Wenn das Nutzerkonto die APIs aktivieren muss, die für Ihren Anwendungsfall mit kontinuierlichen Abfragen erforderlich sind, muss das Nutzerkonto die Rolle Service Usage Admin (roles/serviceusage.serviceUsageAdmin
) haben.
Dienstkontoberechtigungen
Zum Exportieren von Daten aus einer BigQuery-Tabelle muss das Dienstkonto die IAM-Berechtigung bigquery.tables.export
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.export
:
- BigQuery Data Viewer (
roles/bigquery.dataViewer
) - BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
bigquery.tables.updateData
haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.updateData
:
- BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
Hinweise
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the BigQuery API.
Reservierung erstellen
Erstellen Sie eine Reservierung für die Enterprise- oder Enterprise Plus-Version und dann eine Reservierungszuweisung mit dem Jobtyp CONTINUOUS
. Für diese Reservierung kann Autoscaling verwendet werden.
Für die Reservierungszuweisung für kontinuierliche Abfragen gelten Reservierungsbeschränkungen.
Nach Pub/Sub exportieren
Für den Export von Daten nach Pub/Sub sind zusätzliche APIs, IAM-Berechtigungen und Google Cloud -Ressourcen erforderlich. Weitere Informationen finden Sie unter In Pub/Sub exportieren.
Benutzerdefinierte Attribute als Metadaten in Pub/Sub-Nachrichten einbetten
Mit Pub/Sub-Attributen können Sie zusätzliche Informationen zur Nachricht angeben, z. B. ihre Priorität, ihren Ursprung, ihr Ziel oder zusätzliche Metadaten. Sie können auch Attribute verwenden, um Nachrichten im Abo zu filtern.
Wenn eine Spalte in einem kontinuierlichen Abfrageergebnis den Namen _ATTRIBUTES
hat, werden ihre Werte in die Pub/Sub-Nachrichtenattribute kopiert.
Die angegebenen Felder in _ATTRIBUTES
werden als Attributschlüssel verwendet.
Die Spalte _ATTRIBUTES
muss vom Typ JSON
sein und das Format ARRAY<STRUCT<STRING, STRING>>
oder STRUCT<STRING>
haben.
Ein Beispiel finden Sie unter Daten in ein Pub/Sub-Thema exportieren.
Nach Bigtable exportieren
Für den Export von Daten nach Bigtable sind zusätzliche APIs, IAM-Berechtigungen und Google Cloud-Ressourcen erforderlich. Weitere Informationen finden Sie unter In Bigtable exportieren.
In Spanner exportieren
Für den Export von Daten nach Spanner sind zusätzliche APIs, IAM-Berechtigungen und Google CloudRessourcen erforderlich. Weitere Informationen finden Sie unter Daten nach Spanner exportieren (umgekehrte ETL).
Daten in eine BigQuery-Tabelle schreiben
Mit einer INSERT
-Anweisung können Sie Daten in eine BigQuery-Tabelle schreiben.
KI‑Funktionen verwenden
Für die Verwendung einer unterstützten KI-Funktion in einer kontinuierlichen Abfrage sind zusätzliche APIs, IAM-Berechtigungen und Google CloudRessourcen erforderlich. Weitere Informationen finden Sie in einem der folgenden Themen, je nach Anwendungsfall:
- Text mit der Funktion
ML.GENERATE_TEXT
generieren - Texteinbettungen mit der Funktion
ML.GENERATE_EMBEDDING
generieren - Text mit der Funktion
ML.UNDERSTAND_TEXT
verstehen - Text mit der Funktion
ML.TRANSLATE
übersetzen
Wenn Sie eine KI-Funktion in einer kontinuierlichen Abfrage verwenden, sollten Sie prüfen, ob die Ausgabe der Abfrage innerhalb des Kontingents für die Funktion bleibt. Wenn Sie das Kontingent überschreiten, müssen Sie die Datensätze, die nicht verarbeitet werden, möglicherweise separat verarbeiten.
Startpunkt angeben
Sie müssen die Funktion APPENDS
in der FROM
-Klausel einer kontinuierlichen Abfrage verwenden, um die frühesten zu verarbeitenden Daten anzugeben. Mit APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
wird BigQuery beispielsweise angewiesen, Daten zu verarbeiten, die der Tabelle my_table
höchstens 10 Minuten vor dem Start der kontinuierlichen Abfrage hinzugefügt wurden.
Daten, die my_table
hinzugefügt werden, werden bei Eingang verarbeitet. Es gibt keine Verzögerung bei der Datenverarbeitung.
Geben Sie kein end_timestamp
-Argument für die Funktion APPENDS
an, wenn Sie sie in einer kontinuierlichen Abfrage verwenden.
Das folgende Beispiel zeigt, wie Sie eine kontinuierliche Abfrage ab einem bestimmten Zeitpunkt mit der Funktion APPENDS
starten, wenn Sie eine BigQuery-Tabelle abfragen, die Informationen zu Streamingtaxifahrten empfängt:
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute');
Geben Sie einen Startpunkt an, der vor der aktuellen Uhrzeit liegt.
Wenn Sie Daten aus der Zeit vor dem aktuellen Zeitpunkt verarbeiten möchten, können Sie mit der Funktion APPENDS
einen früheren Startpunkt für die Abfrage angeben. Der angegebene Startpunkt muss innerhalb des Zeitreisefensters für die Tabelle liegen, aus der Sie Daten auswählen. Das Zeitreisefenster umfasst standardmäßig die letzten sieben Tage.
Wenn Sie Daten einbeziehen möchten, die außerhalb des Zeitreisefensters liegen, verwenden Sie eine Standardabfrage, um Daten bis zu einem bestimmten Zeitpunkt einzufügen oder zu exportieren, und starten Sie dann eine kontinuierliche Abfrage ab diesem Zeitpunkt.
Beispiel
Das folgende Beispiel zeigt, wie ältere Daten aus einer BigQuery-Tabelle, die Informationen zu Streamingtaxifahrten bis zu einem bestimmten Zeitpunkt empfängt, in eine Tabelle geladen und dann eine kontinuierliche Abfrage ab dem Cutoff-Punkt für die älteren Daten gestartet wird.
So führen Sie eine Standardabfrage aus, um Daten bis zu einem bestimmten Zeitpunkt zu ergänzen:
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` -- Include all data inserted into the table up to this point in time. -- This timestamp must be within the time travel window. FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC' WHERE ride_status = 'dropoff';
Führen Sie eine kontinuierliche Abfrage ab dem Zeitpunkt aus, an dem die Abfrage beendet wurde:
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to start processing -- data right where the batch query left off. -- This timestamp must be within the time travel window. TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND) WHERE ride_status = 'dropoff';
Kontinuierliche Abfrage mit einem Nutzerkonto ausführen
In diesem Abschnitt wird beschrieben, wie Sie eine kontinuierliche Abfrage mit einem Nutzerkonto ausführen. Nachdem die kontinuierliche Abfrage ausgeführt wird, können Sie die Google Cloud Konsole, das Terminalfenster oder die Anwendung schließen, ohne die Ausführung der Abfrage zu unterbrechen. Eine kontinuierliche Abfrage, die von einem Nutzerkonto ausgeführt wird, läuft maximal zwei Tage lang und wird dann automatisch beendet. Wenn Sie neue eingehende Daten weiterverarbeiten möchten, starten Sie eine neue kontinuierliche Abfrage und geben Sie einen Startpunkt an. Informationen zum Automatisieren dieses Prozesses finden Sie unter Fehlerhafte Abfragen wiederholen.
So führen Sie eine kontinuierliche Abfrage aus:
Console
Rufen Sie in der Google Cloud Console die Seite BigQuery auf.
Klicken Sie im Abfrageeditor auf
Mehr.- Wählen Sie im Bereich Abfragemodus auswählen die Option Kontinuierliche Abfrage aus.
- Klicken Sie auf Bestätigen.
- Optional: Wenn Sie festlegen möchten, wie lange die Abfrage ausgeführt wird, klicken Sie auf Abfrageeinstellungen und legen Sie das Job-Zeitlimit in Millisekunden fest.
Geben Sie im Abfrageeditor die SQL-Anweisung für die Continuous Query ein. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
Klicken Sie auf Ausführen.
bq
-
In the Google Cloud console, activate Cloud Shell.
Führen Sie in Cloud Shell die fortlaufende Abfrage mit dem
bq query
-Befehl und dem Flag--continuous
aus:bq query --use_legacy_sql=false --continuous=true 'QUERY'
Ersetzen Sie
QUERY
durch die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten. Mit dem Flag--job_timeout_ms
können Sie festlegen, wie lange die Abfrage ausgeführt wird.PROJECT_ID
: Ihre Projekt-ID.QUERY
: Die SQL-Anweisung für die fortlaufende Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
API
Führen Sie die Continuous Query aus, indem Sie die Methode jobs.insert
aufrufen.
Sie müssen das Feld continuous
in der JobConfigurationQuery
der Job
-Ressource auf true
setzen, die Sie übergeben.
Optional können Sie die Laufzeit der Abfrage mit dem Feld jobTimeoutMs
festlegen.
curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \ --compressed
Ersetzen Sie Folgendes:
Kontinuierliche Abfrage mit einem Dienstkonto ausführen
In diesem Abschnitt wird beschrieben, wie Sie eine kontinuierliche Abfrage mit einem Dienstkonto ausführen. Nachdem die kontinuierliche Abfrage ausgeführt wird, können Sie die Google Cloud Konsole, das Terminalfenster oder die Anwendung schließen, ohne die Ausführung der Abfrage zu unterbrechen. Eine Continuous Query, die mit einem Dienstkonto ausgeführt wird, kann bis zu 150 Tage lang ausgeführt werden und wird dann automatisch beendet. Wenn Sie neue eingehende Daten weiterverarbeiten möchten, starten Sie eine neue kontinuierliche Abfrage und geben Sie einen Startpunkt an. Informationen zum Automatisieren dieses Prozesses finden Sie unter Fehlerhafte Abfragen wiederholen.
So verwenden Sie ein Dienstkonto zum Ausführen einer Continuous Query:
Console
- Erstellen Sie ein Dienstkonto.
- Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
Rufen Sie in der Google Cloud Console die Seite BigQuery auf.
Klicken Sie im Abfrageeditor auf Mehr.
Wählen Sie im Bereich Abfragemodus auswählen die Option Kontinuierliche Abfrage aus.
Klicken Sie auf Bestätigen.
Klicken Sie im Abfrageeditor auf Mehr > Abfrageeinstellungen.
Wählen Sie im Abschnitt Kontinuierliche Abfrage im Feld Dienstkonto das von Ihnen erstellte Dienstkonto aus.
Optional: Wenn Sie festlegen möchten, wie lange die Abfrage ausgeführt wird, legen Sie das Job-Zeitlimit in Millisekunden fest.
Klicken Sie auf Speichern.
Geben Sie im Abfrageeditor die SQL-Anweisung für die Continuous Query ein. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
Klicken Sie auf Ausführen.
bq
- Erstellen Sie ein Dienstkonto.
- Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
-
In the Google Cloud console, activate Cloud Shell.
Führen Sie die kontinuierliche Abfrage in der Befehlszeile mit dem Befehl
bq query
mit den folgenden Flags aus:- Setzen Sie das Flag
--continuous
auftrue
, um die Abfrage kontinuierlich auszuführen. - Verwenden Sie das Flag
--connection_property
, um ein zu verwendendes Dienstkonto anzugeben. - Optional: Legen Sie das Flag
--job_timeout_ms
fest, um die Laufzeit der Abfrage zu begrenzen.
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
Ersetzen Sie Folgendes:
PROJECT_ID
: Ihre Projekt-ID.SERVICE_ACCOUNT_EMAIL
: die E-Mail-Adresse des Dienstkontos. Sie können die E‑Mail-Adresse des Dienstkontos auf der Seite Dienstkonten der Google Cloud Console abrufen.QUERY
: Die SQL-Anweisung für die fortlaufende Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
- Setzen Sie das Flag
- Erstellen Sie ein Dienstkonto.
- Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
Führen Sie die Continuous Query aus, indem Sie die Methode
jobs.insert
aufrufen. Legen Sie in derJobConfigurationQuery
-Ressource derJob
-Ressource, die Sie übergeben, die folgenden Felder fest:- Setzen Sie das Feld
continuous
auftrue
, um die Abfrage kontinuierlich zu machen. - Verwenden Sie das Feld
connectionProperties
, um ein zu verwendendes Dienstkonto anzugeben.
Optional können Sie die Laufzeit der Abfrage steuern, indem Sie das Feld
jobTimeoutMs
in der RessourceJobConfiguration
festlegen.curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \ --compressed
Ersetzen Sie Folgendes:
PROJECT_ID
: Ihre Projekt-ID.QUERY
: Die SQL-Anweisung für die fortlaufende Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.SERVICE_ACCOUNT_EMAIL
: die E-Mail-Adresse des Dienstkontos. Sie können die E‑Mail-Adresse des Dienstkontos in der Google Cloud Console auf der Seite Dienstkonten abrufen.
- Setzen Sie das Feld
API
Benutzerdefinierte Job-ID erstellen
Jedem Abfragejob wird eine Job-ID zugewiesen, mit der Sie nach dem Job suchen und ihn verwalten können. Job-IDs werden standardmäßig zufällig generiert. Um die Suche nach der Job-ID einer fortlaufenden Abfrage über den Jobverlauf oder den Job-Explorer zu vereinfachen, können Sie ein benutzerdefiniertes Job-ID-Präfix zuweisen:
Rufen Sie in der Google Cloud Console die Seite BigQuery auf.
Klicken Sie im Abfrageeditor auf Mehr.
Wählen Sie im Bereich Abfragemodus auswählen die Option Kontinuierliche Abfrage aus.
Klicken Sie auf Bestätigen.
Klicken Sie im Abfrageeditor auf Mehr > Abfrageeinstellungen.
Geben Sie im Bereich Benutzerdefiniertes Job-ID-Präfix ein benutzerdefiniertes Namenspräfix ein.
Klicken Sie auf Speichern.
Beispiele
Die folgenden SQL-Beispiele zeigen gängige Anwendungsfälle für kontinuierliche Abfragen.
Daten in ein Pub/Sub-Thema exportieren
Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert, die Informationen zu Streamingtaxifahrten empfängt, und die Daten in Echtzeit mit Nachrichtenattributen in einem Pub/Sub-Thema veröffentlicht:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
Daten in eine Bigtable-Tabelle exportieren
Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert, die Informationen zu Streamingtaxifahrten empfängt, und die Daten in Echtzeit in eine Bigtable-Tabelle exportiert:
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
Daten in eine Cloud Spanner-Tabelle exportieren
Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert, die Informationen zu Streamingtaxifahrten empfängt, und die Daten dann in Echtzeit in eine Spanner-Tabelle exportiert:
EXPORT DATA OPTIONS ( format = 'CLOUD_SPANNER', uri = 'https://spanner.googleapis.com/projects/myproject/instances/myspannerinstance/databases/taxi-real-time-rides', spanner_options ="""{ "table": "rides", -- To ensure data is written to Spanner in the correct sequence -- during a continuous export, use the change_timestamp_column -- option. This should be mapped to a timestamp column from your -- BigQuery data. If your source data lacks a timestamp, the -- _CHANGE_TIMESTAMP pseudocolumn provided by the APPENDS function -- will be automatically mapped to the "change_timestamp" column. "change_timestamp_column": "change_timestamp" }""" ) AS ( SELECT ride_id, latitude, longitude, meter_reading, ride_status, passenger_count FROM APPENDS( TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
Daten in eine BigQuery-Tabelle schreiben
Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert und transformiert, die Informationen zu Streamingtaxifahrten empfängt, und die Daten dann in Echtzeit in eine andere BigQuery-Tabelle schreibt. So stehen die Daten für weitere nachgelagerte Analysen zur Verfügung.
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'dropoff';
Daten mit einem Vertex AI-Modell verarbeiten
Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die ein Vertex AI-Modell verwendet, um eine Anzeige für Taxigäste anhand des aktuellen Breiten- und Längengrads zu generieren. Anschließend werden die Ergebnisse in ein Pub/Sub-Thema in Echtzeit exportiert:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you -- want to start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
SQL einer kontinuierlichen Abfrage ändern
Sie können die in einer Continuous Query verwendete SQL-Abfrage nicht aktualisieren, während der Continuous Query-Job ausgeführt wird. Sie müssen den Job für die kontinuierliche Abfrage abbrechen, den SQL-Code ändern und dann einen neuen Job für die kontinuierliche Abfrage ab dem Punkt starten, an dem Sie den ursprünglichen Job für die kontinuierliche Abfrage beendet haben.
So ändern Sie den in einer Continuous Query verwendeten SQL-Code:
- Rufen Sie die Jobdetails für den kontinuierlichen Abfragejob auf, den Sie aktualisieren möchten, und notieren Sie sich die Job-ID.
- Pausieren Sie nach Möglichkeit die Erhebung von Upstream-Daten. Wenn Sie das nicht tun können, kann es beim Neustart der kontinuierlichen Abfrage zu doppelten Daten kommen.
- Brechen Sie die Continuous Query ab, die Sie ändern möchten.
Rufen Sie den
end_time
-Wert für den ursprünglichen kontinuierlichen Abfragejob mit derJOBS
-AnsichtINFORMATION_SCHEMA
ab:SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
Ersetzen Sie Folgendes:
PROJECT_ID
: Ihre Projekt-ID.REGION
: die Region, die von Ihrem Projekt verwendet wird.JOB_ID
: Die ID des Jobs für kontinuierliche Abfragen, die Sie in Schritt 1 ermittelt haben.
Ändern Sie die SQL-Anweisung für die kontinuierliche Abfrage so, dass die kontinuierliche Abfrage ab einem bestimmten Zeitpunkt gestartet wird. Verwenden Sie dabei den in Schritt 5 abgerufenen
end_time
-Wert als Ausgangspunkt.Ändern Sie die SQL-Anweisung für die kontinuierliche Abfrage entsprechend den erforderlichen Änderungen.
Führen Sie die geänderte Continuous Query aus.
Continuous Query abbrechen
Sie können einen kontinuierlichen Abfragejob wie jeden anderen Job abbrechen. Es kann bis zu einer Minute dauern, bis die Abfrage nach dem Abbrechen des Jobs beendet wird.
Wenn Sie eine Abfrage abbrechen und dann neu starten, verhält sich die neu gestartete Abfrage wie eine neue, unabhängige Abfrage. Die neu gestartete Abfrage beginnt nicht mit der Verarbeitung von Daten an der Stelle, an der der vorherige Job beendet wurde, und kann nicht auf die Ergebnisse der vorherigen Abfrage verweisen. Weitere Informationen finden Sie unter Kontinuierliche Abfrage ab einem bestimmten Zeitpunkt starten.
Abfragen überwachen und Fehler beheben
Eine kontinuierliche Abfrage kann durch Faktoren wie Dateninkonsistenzen, Schemaänderungen, vorübergehende Dienstunterbrechungen oder Wartungsarbeiten unterbrochen werden. BigQuery behebt zwar einige vorübergehende Fehler, aber die folgenden Best Practices können die Jobstabilität verbessern:
- Kontinuierliche Abfragen überwachen
- Benachrichtigung bei fehlgeschlagenen Abfragen:
- Fehlgeschlagene Abfragen wiederholen