Kontinuierliche Abfragen erstellen

In diesem Dokument wird beschrieben, wie Sie eine kontinuierliche Abfrage in BigQuery ausführen.

BigQuery-Continuous Queries sind SQL-Anweisungen, die kontinuierlich ausgeführt werden. Mit kontinuierlichen Abfragen können Sie eingehende Daten in BigQuery in Echtzeit analysieren und die Ergebnisse dann entweder nach Bigtable, Pub/Sub oder Spanner exportieren oder in eine BigQuery-Tabelle schreiben.

Kontotyp auswählen

Sie können einen Job für kontinuierliche Abfragen mit einem Nutzerkonto erstellen und ausführen. Sie haben auch die Möglichkeit, einen Job für kontinuierliche Abfragen mit einem Nutzerkonto zu erstellen und dann mit einem Dienstkonto auszuführen. Sie müssen ein Dienstkonto verwenden, um eine kontinuierliche Abfrage auszuführen, mit der Ergebnisse in ein Pub/Sub-Thema exportiert werden.

Wenn Sie ein Nutzerkonto verwenden, wird eine kontinuierliche Abfrage bis zu zwei Tage lang ausgeführt. Wenn Sie ein Dienstkonto verwenden, wird eine kontinuierliche Abfrage bis zu 150 Tage lang ausgeführt. Weitere Informationen finden Sie unter Autorisierung.

Erforderliche Berechtigungen

In diesem Abschnitt werden die Berechtigungen beschrieben, die Sie zum Erstellen und Ausführen einer kontinuierlichen Abfrage benötigen. Alternativ zu den erwähnten IAM-Rollen (Identity and Access Management) können Sie die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen erhalten.

Berechtigungen bei Verwendung eines Nutzerkontos

Dieser Abschnitt enthält Informationen zu den Rollen und Berechtigungen, die zum Erstellen und Ausführen einer kontinuierlichen Abfrage mit einem Nutzerkonto erforderlich sind.

Zum Erstellen eines Jobs in BigQuery muss das Nutzerkonto die IAM-Berechtigung bigquery.jobs.create haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.jobs.create:

Zum Exportieren von Daten aus einer BigQuery-Tabelle muss das Nutzerkonto die IAM-Berechtigung bigquery.tables.export haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.export:

Zum Aktualisieren von Daten in einer BigQuery-Tabelle muss das Nutzerkonto die IAM-Berechtigung bigquery.tables.updateData haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.updateData:

Wenn das Nutzerkonto die APIs aktivieren muss, die für Ihren Anwendungsfall für kontinuierliche Abfragen erforderlich sind, muss das Nutzerkonto die Rolle Service Usage Admin (roles/serviceusage.serviceUsageAdmin) haben.

Berechtigungen bei Verwendung eines Dienstkontos

Dieser Abschnitt enthält Informationen zu den Rollen und Berechtigungen, die für das Nutzerkonto erforderlich sind, mit dem die kontinuierliche Abfrage erstellt wird, sowie zum Dienstkonto, das die kontinuierliche Abfrage ausführt.

Nutzerkontoberechtigungen

Zum Erstellen eines Jobs in BigQuery muss das Nutzerkonto die IAM-Berechtigung bigquery.jobs.create haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.jobs.create:

Wenn Sie einen Job einreichen möchten, der mit einem Dienstkonto ausgeführt wird, muss das Nutzerkonto die Rolle Dienstkontonutzer (roles/iam.serviceAccountUser) haben. Wenn Sie dasselbe Nutzerkonto zum Erstellen des Dienstkontos verwenden, muss das Nutzerkonto die Rolle Service Account Admin (roles/iam.serviceAccountAdmin) haben. Informationen dazu, wie Sie den Zugriff eines Nutzers auf ein einzelnes Dienstkonto anstelle aller Dienstkonten in einem Projekt einschränken, finden Sie unter Einzelne Rolle zuweisen.

Wenn das Nutzerkonto die APIs aktivieren muss, die für Ihren Anwendungsfall mit kontinuierlichen Abfragen erforderlich sind, muss das Nutzerkonto die Rolle Service Usage Admin (roles/serviceusage.serviceUsageAdmin) haben.

Dienstkontoberechtigungen

Zum Exportieren von Daten aus einer BigQuery-Tabelle muss das Dienstkonto die IAM-Berechtigung bigquery.tables.export haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.export:

Zum Aktualisieren von Daten in einer BigQuery-Tabelle muss das Dienstkonto die IAM-Berechtigung bigquery.tables.updateData haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.updateData:

Hinweise

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Verify that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Enable the API

Reservierung erstellen

Erstellen Sie eine Reservierung für die Enterprise- oder Enterprise Plus-Version und dann eine Reservierungszuweisung mit dem Jobtyp CONTINUOUS. Für diese Reservierung kann Autoscaling verwendet werden. Für die Reservierungszuweisung für kontinuierliche Abfragen gelten Reservierungsbeschränkungen.

Nach Pub/Sub exportieren

Für den Export von Daten nach Pub/Sub sind zusätzliche APIs, IAM-Berechtigungen und Google Cloud -Ressourcen erforderlich. Weitere Informationen finden Sie unter In Pub/Sub exportieren.

Benutzerdefinierte Attribute als Metadaten in Pub/Sub-Nachrichten einbetten

Mit Pub/Sub-Attributen können Sie zusätzliche Informationen zur Nachricht angeben, z. B. ihre Priorität, ihren Ursprung, ihr Ziel oder zusätzliche Metadaten. Sie können auch Attribute verwenden, um Nachrichten im Abo zu filtern.

Wenn eine Spalte in einem kontinuierlichen Abfrageergebnis den Namen _ATTRIBUTES hat, werden ihre Werte in die Pub/Sub-Nachrichtenattribute kopiert. Die angegebenen Felder in _ATTRIBUTES werden als Attributschlüssel verwendet.

Die Spalte _ATTRIBUTES muss vom Typ JSON sein und das Format ARRAY<STRUCT<STRING, STRING>> oder STRUCT<STRING> haben.

Ein Beispiel finden Sie unter Daten in ein Pub/Sub-Thema exportieren.

Nach Bigtable exportieren

Für den Export von Daten nach Bigtable sind zusätzliche APIs, IAM-Berechtigungen und Google Cloud-Ressourcen erforderlich. Weitere Informationen finden Sie unter In Bigtable exportieren.

In Spanner exportieren

Für den Export von Daten nach Spanner sind zusätzliche APIs, IAM-Berechtigungen und Google CloudRessourcen erforderlich. Weitere Informationen finden Sie unter Daten nach Spanner exportieren (umgekehrte ETL).

Daten in eine BigQuery-Tabelle schreiben

Mit einer INSERT-Anweisung können Sie Daten in eine BigQuery-Tabelle schreiben.

KI‑Funktionen verwenden

Für die Verwendung einer unterstützten KI-Funktion in einer kontinuierlichen Abfrage sind zusätzliche APIs, IAM-Berechtigungen und Google CloudRessourcen erforderlich. Weitere Informationen finden Sie in einem der folgenden Themen, je nach Anwendungsfall:

Wenn Sie eine KI-Funktion in einer kontinuierlichen Abfrage verwenden, sollten Sie prüfen, ob die Ausgabe der Abfrage innerhalb des Kontingents für die Funktion bleibt. Wenn Sie das Kontingent überschreiten, müssen Sie die Datensätze, die nicht verarbeitet werden, möglicherweise separat verarbeiten.

Startpunkt angeben

Sie müssen die Funktion APPENDS in der FROM-Klausel einer kontinuierlichen Abfrage verwenden, um die frühesten zu verarbeitenden Daten anzugeben. Mit APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) wird BigQuery beispielsweise angewiesen, Daten zu verarbeiten, die der Tabelle my_table höchstens 10 Minuten vor dem Start der kontinuierlichen Abfrage hinzugefügt wurden. Daten, die my_table hinzugefügt werden, werden bei Eingang verarbeitet. Es gibt keine Verzögerung bei der Datenverarbeitung. Geben Sie kein end_timestamp-Argument für die Funktion APPENDS an, wenn Sie sie in einer kontinuierlichen Abfrage verwenden.

Das folgende Beispiel zeigt, wie Sie eine kontinuierliche Abfrage ab einem bestimmten Zeitpunkt mit der Funktion APPENDS starten, wenn Sie eine BigQuery-Tabelle abfragen, die Informationen zu Streamingtaxifahrten empfängt:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE
    ride_status = 'enroute');

Geben Sie einen Startpunkt an, der vor der aktuellen Uhrzeit liegt.

Wenn Sie Daten aus der Zeit vor dem aktuellen Zeitpunkt verarbeiten möchten, können Sie mit der Funktion APPENDS einen früheren Startpunkt für die Abfrage angeben. Der angegebene Startpunkt muss innerhalb des Zeitreisefensters für die Tabelle liegen, aus der Sie Daten auswählen. Das Zeitreisefenster umfasst standardmäßig die letzten sieben Tage.

Wenn Sie Daten einbeziehen möchten, die außerhalb des Zeitreisefensters liegen, verwenden Sie eine Standardabfrage, um Daten bis zu einem bestimmten Zeitpunkt einzufügen oder zu exportieren, und starten Sie dann eine kontinuierliche Abfrage ab diesem Zeitpunkt.

Beispiel

Das folgende Beispiel zeigt, wie ältere Daten aus einer BigQuery-Tabelle, die Informationen zu Streamingtaxifahrten bis zu einem bestimmten Zeitpunkt empfängt, in eine Tabelle geladen und dann eine kontinuierliche Abfrage ab dem Cutoff-Punkt für die älteren Daten gestartet wird.

  1. So führen Sie eine Standardabfrage aus, um Daten bis zu einem bestimmten Zeitpunkt zu ergänzen:

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM `myproject.real_time_taxi_streaming.taxirides`
      -- Include all data inserted into the table up to this point in time.
      -- This timestamp must be within the time travel window.
      FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC'
    WHERE
      ride_status = 'dropoff';
  2. Führen Sie eine kontinuierliche Abfrage ab dem Zeitpunkt aus, an dem die Abfrage beendet wurde:

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM
      APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
        -- Configure the APPENDS TVF start_timestamp to start processing
        -- data right where the batch query left off.
        -- This timestamp must be within the time travel window.
        TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND)
    WHERE
      ride_status = 'dropoff';

Kontinuierliche Abfrage mit einem Nutzerkonto ausführen

In diesem Abschnitt wird beschrieben, wie Sie eine kontinuierliche Abfrage mit einem Nutzerkonto ausführen. Nachdem die kontinuierliche Abfrage ausgeführt wird, können Sie die Google Cloud Konsole, das Terminalfenster oder die Anwendung schließen, ohne die Ausführung der Abfrage zu unterbrechen. Eine kontinuierliche Abfrage, die von einem Nutzerkonto ausgeführt wird, läuft maximal zwei Tage lang und wird dann automatisch beendet. Wenn Sie neue eingehende Daten weiterverarbeiten möchten, starten Sie eine neue kontinuierliche Abfrage und geben Sie einen Startpunkt an. Informationen zum Automatisieren dieses Prozesses finden Sie unter Fehlerhafte Abfragen wiederholen.

So führen Sie eine kontinuierliche Abfrage aus:

Console

  1. Rufen Sie in der Google Cloud Console die Seite BigQuery auf.

    BigQuery aufrufen

  2. Klicken Sie im Abfrageeditor auf  Mehr.

    1. Wählen Sie im Bereich Abfragemodus auswählen die Option Kontinuierliche Abfrage aus.
    2. Klicken Sie auf Bestätigen.
    3. Optional: Wenn Sie festlegen möchten, wie lange die Abfrage ausgeführt wird, klicken Sie auf Abfrageeinstellungen und legen Sie das Job-Zeitlimit in Millisekunden fest.
  3. Geben Sie im Abfrageeditor die SQL-Anweisung für die Continuous Query ein. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.

  4. Klicken Sie auf Ausführen.

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  2. Führen Sie in Cloud Shell die fortlaufende Abfrage mit dem bq query-Befehl und dem Flag --continuous aus:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    Ersetzen Sie QUERY durch die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten. Mit dem Flag --job_timeout_ms können Sie festlegen, wie lange die Abfrage ausgeführt wird.

  3. API

    Führen Sie die Continuous Query aus, indem Sie die Methode jobs.insert aufrufen. Sie müssen das Feld continuous in der JobConfigurationQuery der Job-Ressource auf true setzen, die Sie übergeben. Optional können Sie die Laufzeit der Abfrage mit dem Feld jobTimeoutMs festlegen.

    curl --request POST \
      "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
      --header "Authorization: Bearer $(gcloud auth print-access-token)" \
      --header "Content-Type: application/json; charset=utf-8" \
      --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \
      --compressed

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • QUERY: Die SQL-Anweisung für die fortlaufende Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.

Kontinuierliche Abfrage mit einem Dienstkonto ausführen

In diesem Abschnitt wird beschrieben, wie Sie eine kontinuierliche Abfrage mit einem Dienstkonto ausführen. Nachdem die kontinuierliche Abfrage ausgeführt wird, können Sie die Google Cloud Konsole, das Terminalfenster oder die Anwendung schließen, ohne die Ausführung der Abfrage zu unterbrechen. Eine Continuous Query, die mit einem Dienstkonto ausgeführt wird, kann bis zu 150 Tage lang ausgeführt werden und wird dann automatisch beendet. Wenn Sie neue eingehende Daten weiterverarbeiten möchten, starten Sie eine neue kontinuierliche Abfrage und geben Sie einen Startpunkt an. Informationen zum Automatisieren dieses Prozesses finden Sie unter Fehlerhafte Abfragen wiederholen.

So verwenden Sie ein Dienstkonto zum Ausführen einer Continuous Query:

Console

  1. Erstellen Sie ein Dienstkonto.
  2. Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
  3. Rufen Sie in der Google Cloud Console die Seite BigQuery auf.

    BigQuery aufrufen

  4. Klicken Sie im Abfrageeditor auf Mehr.

  5. Wählen Sie im Bereich Abfragemodus auswählen die Option Kontinuierliche Abfrage aus.

  6. Klicken Sie auf Bestätigen.

  7. Klicken Sie im Abfrageeditor auf Mehr > Abfrageeinstellungen.

  8. Wählen Sie im Abschnitt Kontinuierliche Abfrage im Feld Dienstkonto das von Ihnen erstellte Dienstkonto aus.

  9. Optional: Wenn Sie festlegen möchten, wie lange die Abfrage ausgeführt wird, legen Sie das Job-Zeitlimit in Millisekunden fest.

  10. Klicken Sie auf Speichern.

  11. Geben Sie im Abfrageeditor die SQL-Anweisung für die Continuous Query ein. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.

  12. Klicken Sie auf Ausführen.

bq

  1. Erstellen Sie ein Dienstkonto.
  2. Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  4. Führen Sie die kontinuierliche Abfrage in der Befehlszeile mit dem Befehl bq query mit den folgenden Flags aus:

    • Setzen Sie das Flag --continuous auf true, um die Abfrage kontinuierlich auszuführen.
    • Verwenden Sie das Flag --connection_property, um ein zu verwendendes Dienstkonto anzugeben.
    • Optional: Legen Sie das Flag --job_timeout_ms fest, um die Laufzeit der Abfrage zu begrenzen.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • SERVICE_ACCOUNT_EMAIL: die E-Mail-Adresse des Dienstkontos. Sie können die E‑Mail-Adresse des Dienstkontos auf der Seite Dienstkonten der Google Cloud Console abrufen.
    • QUERY: Die SQL-Anweisung für die fortlaufende Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
  5. API

    1. Erstellen Sie ein Dienstkonto.
    2. Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
    3. Führen Sie die Continuous Query aus, indem Sie die Methode jobs.insert aufrufen. Legen Sie in der JobConfigurationQuery-Ressource der Job-Ressource, die Sie übergeben, die folgenden Felder fest:

      • Setzen Sie das Feld continuous auf true, um die Abfrage kontinuierlich zu machen.
      • Verwenden Sie das Feld connectionProperties, um ein zu verwendendes Dienstkonto anzugeben.

      Optional können Sie die Laufzeit der Abfrage steuern, indem Sie das Feld jobTimeoutMs in der Ressource JobConfiguration festlegen.

      curl --request POST \
        "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
        --header "Authorization: Bearer $(gcloud auth print-access-token)" \
        --header "Content-Type: application/json; charset=utf-8" \
        --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \
        --compressed

      Ersetzen Sie Folgendes:

      • PROJECT_ID: Ihre Projekt-ID.
      • QUERY: Die SQL-Anweisung für die fortlaufende Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
      • SERVICE_ACCOUNT_EMAIL: die E-Mail-Adresse des Dienstkontos. Sie können die E‑Mail-Adresse des Dienstkontos in der Google Cloud Console auf der Seite Dienstkonten abrufen.

Benutzerdefinierte Job-ID erstellen

Jedem Abfragejob wird eine Job-ID zugewiesen, mit der Sie nach dem Job suchen und ihn verwalten können. Job-IDs werden standardmäßig zufällig generiert. Um die Suche nach der Job-ID einer fortlaufenden Abfrage über den Jobverlauf oder den Job-Explorer zu vereinfachen, können Sie ein benutzerdefiniertes Job-ID-Präfix zuweisen:

  1. Rufen Sie in der Google Cloud Console die Seite BigQuery auf.

    BigQuery aufrufen

  2. Klicken Sie im Abfrageeditor auf Mehr.

  3. Wählen Sie im Bereich Abfragemodus auswählen die Option Kontinuierliche Abfrage aus.

  4. Klicken Sie auf Bestätigen.

  5. Klicken Sie im Abfrageeditor auf Mehr > Abfrageeinstellungen.

  6. Geben Sie im Bereich Benutzerdefiniertes Job-ID-Präfix ein benutzerdefiniertes Namenspräfix ein.

  7. Klicken Sie auf Speichern.

Beispiele

Die folgenden SQL-Beispiele zeigen gängige Anwendungsfälle für kontinuierliche Abfragen.

Daten in ein Pub/Sub-Thema exportieren

Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert, die Informationen zu Streamingtaxifahrten empfängt, und die Daten in Echtzeit mit Nachrichtenattributen in einem Pub/Sub-Thema veröffentlicht:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

Daten in eine Bigtable-Tabelle exportieren

Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert, die Informationen zu Streamingtaxifahrten empfängt, und die Daten in Echtzeit in eine Bigtable-Tabelle exportiert:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

Daten in eine Cloud Spanner-Tabelle exportieren

Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert, die Informationen zu Streamingtaxifahrten empfängt, und die Daten dann in Echtzeit in eine Spanner-Tabelle exportiert:

EXPORT DATA
 OPTIONS (
   format = 'CLOUD_SPANNER',
   uri = 'https://spanner.googleapis.com/projects/myproject/instances/myspannerinstance/databases/taxi-real-time-rides',
   spanner_options ="""{
      "table": "rides",
      -- To ensure data is written to Spanner in the correct sequence
      -- during a continuous export, use the change_timestamp_column
      -- option. This should be mapped to a timestamp column from your
      -- BigQuery data. If your source data lacks a timestamp, the 
      -- _CHANGE_TIMESTAMP pseudocolumn provided by the APPENDS function 
      -- will be automatically mapped to the "change_timestamp" column.
      "change_timestamp_column": "change_timestamp"
   }"""
  )
  AS (
  SELECT
    ride_id,
    latitude,
    longitude,
    meter_reading,
    ride_status,
    passenger_count
  FROM APPENDS(
        TABLE `myproject.real_time_taxi_streaming.taxirides`,
        -- Configure the APPENDS TVF start_timestamp to specify when you want to
        -- start processing data using your continuous query.
        -- This example starts processing at 10 minutes before the current time.
        CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
  );

Daten in eine BigQuery-Tabelle schreiben

Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert und transformiert, die Informationen zu Streamingtaxifahrten empfängt, und die Daten dann in Echtzeit in eine andere BigQuery-Tabelle schreibt. So stehen die Daten für weitere nachgelagerte Analysen zur Verfügung.

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM
  APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
    -- Configure the APPENDS TVF start_timestamp to specify when you want to
    -- start processing data using your continuous query.
    -- This example starts processing at 10 minutes before the current time.
    CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
WHERE
  ride_status = 'dropoff';

Daten mit einem Vertex AI-Modell verarbeiten

Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die ein Vertex AI-Modell verwendet, um eine Anzeige für Taxigäste anhand des aktuellen Breiten- und Längengrads zu generieren. Anschließend werden die Ergebnisse in ein Pub/Sub-Thema in Echtzeit exportiert:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM
          APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
            -- Configure the APPENDS TVF start_timestamp to specify when you
            -- want to start processing data using your continuous query.
            -- This example starts processing at 10 minutes before the current time.
            CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

SQL einer kontinuierlichen Abfrage ändern

Sie können die in einer Continuous Query verwendete SQL-Abfrage nicht aktualisieren, während der Continuous Query-Job ausgeführt wird. Sie müssen den Job für die kontinuierliche Abfrage abbrechen, den SQL-Code ändern und dann einen neuen Job für die kontinuierliche Abfrage ab dem Punkt starten, an dem Sie den ursprünglichen Job für die kontinuierliche Abfrage beendet haben.

So ändern Sie den in einer Continuous Query verwendeten SQL-Code:

  1. Rufen Sie die Jobdetails für den kontinuierlichen Abfragejob auf, den Sie aktualisieren möchten, und notieren Sie sich die Job-ID.
  2. Pausieren Sie nach Möglichkeit die Erhebung von Upstream-Daten. Wenn Sie das nicht tun können, kann es beim Neustart der kontinuierlichen Abfrage zu doppelten Daten kommen.
  3. Brechen Sie die Continuous Query ab, die Sie ändern möchten.
  4. Rufen Sie den end_time-Wert für den ursprünglichen kontinuierlichen Abfragejob mit der JOBS-Ansicht INFORMATION_SCHEMA ab:

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • REGION: die Region, die von Ihrem Projekt verwendet wird.
    • JOB_ID: Die ID des Jobs für kontinuierliche Abfragen, die Sie in Schritt 1 ermittelt haben.
  5. Ändern Sie die SQL-Anweisung für die kontinuierliche Abfrage so, dass die kontinuierliche Abfrage ab einem bestimmten Zeitpunkt gestartet wird. Verwenden Sie dabei den in Schritt 5 abgerufenen end_time-Wert als Ausgangspunkt.

  6. Ändern Sie die SQL-Anweisung für die kontinuierliche Abfrage entsprechend den erforderlichen Änderungen.

  7. Führen Sie die geänderte Continuous Query aus.

Continuous Query abbrechen

Sie können einen kontinuierlichen Abfragejob wie jeden anderen Job abbrechen. Es kann bis zu einer Minute dauern, bis die Abfrage nach dem Abbrechen des Jobs beendet wird.

Wenn Sie eine Abfrage abbrechen und dann neu starten, verhält sich die neu gestartete Abfrage wie eine neue, unabhängige Abfrage. Die neu gestartete Abfrage beginnt nicht mit der Verarbeitung von Daten an der Stelle, an der der vorherige Job beendet wurde, und kann nicht auf die Ergebnisse der vorherigen Abfrage verweisen. Weitere Informationen finden Sie unter Kontinuierliche Abfrage ab einem bestimmten Zeitpunkt starten.

Abfragen überwachen und Fehler beheben

Eine kontinuierliche Abfrage kann durch Faktoren wie Dateninkonsistenzen, Schemaänderungen, vorübergehende Dienstunterbrechungen oder Wartungsarbeiten unterbrochen werden. BigQuery behebt zwar einige vorübergehende Fehler, aber die folgenden Best Practices können die Jobstabilität verbessern:

Nächste Schritte