Kontinuierliche Abfragen erstellen

In diesem Dokument wird beschrieben, wie Sie eine kontinuierliche Abfrage in BigQuery ausführen.

BigQuery-kontinuierliche Abfragen sind SQL-Anweisungen, die kontinuierlich ausgeführt werden. Mit kontinuierlichen Abfragen können Sie eingehende Daten in BigQuery in Echtzeit analysieren und die Ergebnisse dann entweder nach Bigtable oder Pub/Sub exportieren oder in eine BigQuery-Tabelle schreiben.

Kontotyp auswählen

Sie können einen Job für kontinuierliche Abfragen mit einem Nutzerkonto erstellen und ausführen. Sie haben auch die Möglichkeit, einen Job für kontinuierliche Abfragen mit einem Nutzerkonto zu erstellen und dann mit einem Dienstkonto auszuführen. Sie müssen ein Dienstkonto verwenden, um eine kontinuierliche Abfrage auszuführen, die Ergebnisse in ein Pub/Sub-Thema exportiert.

Wenn Sie ein Nutzerkonto verwenden, wird eine kontinuierliche Abfrage zwei Tage lang ausgeführt. Wenn Sie ein Dienstkonto verwenden, wird eine kontinuierliche Abfrage so lange ausgeführt, bis sie explizit abgebrochen wird. Weitere Informationen finden Sie unter Autorisierung.

Erforderliche Berechtigungen

In diesem Abschnitt werden die Berechtigungen beschrieben, die Sie zum Erstellen und Ausführen einer kontinuierlichen Abfrage benötigen. Alternativ zu den genannten IAM-Rollen (Identity and Access Management) können Sie die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen erhalten.

Berechtigungen bei Verwendung eines Nutzerkontos

Dieser Abschnitt enthält Informationen zu den Rollen und Berechtigungen, die zum Erstellen und Ausführen einer kontinuierlichen Abfrage mit einem Nutzerkonto erforderlich sind.

Zum Erstellen eines Jobs in BigQuery muss das Nutzerkonto die IAM-Berechtigung bigquery.jobs.create haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.jobs.create:

Zum Exportieren von Daten aus einer BigQuery-Tabelle muss das Nutzerkonto die IAM-Berechtigung bigquery.tables.export haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.export:

Zum Aktualisieren von Daten in einer BigQuery-Tabelle muss das Nutzerkonto die IAM-Berechtigung bigquery.tables.updateData haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.updateData:

Wenn das Nutzerkonto die APIs aktivieren muss, die für Ihren Anwendungsfall für kontinuierliche Abfragen erforderlich sind, muss das Nutzerkonto die Rolle Service Usage Admin (roles/serviceusage.serviceUsageAdmin) haben.

Berechtigungen bei Verwendung eines Dienstkontos

Dieser Abschnitt enthält Informationen zu den Rollen und Berechtigungen, die für das Nutzerkonto erforderlich sind, mit dem die kontinuierliche Abfrage erstellt wird, sowie zum Dienstkonto, das die kontinuierliche Abfrage ausführt.

Nutzerkontoberechtigungen

Zum Erstellen eines Jobs in BigQuery muss das Nutzerkonto die IAM-Berechtigung bigquery.jobs.create haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.jobs.create:

Um einen Job einzureichen, der mit einem Dienstkonto ausgeführt wird, muss das Nutzerkonto die Rolle Dienstkontonutzerroles/iam.serviceAccountUser () haben. Wenn Sie dasselbe Nutzerkonto zum Erstellen des Dienstkontos verwenden, muss das Nutzerkonto die Rolle Service Account Admin (roles/iam.serviceAccountAdmin) haben. Informationen dazu, wie Sie den Zugriff eines Nutzers auf ein einzelnes Dienstkonto statt auf alle Dienstkonten in einem Projekt beschränken, finden Sie unter Eine Rolle zuweisen.

Wenn das Nutzerkonto die APIs aktivieren muss, die für Ihren Anwendungsfall mit kontinuierlichen Abfragen erforderlich sind, muss das Nutzerkonto die Rolle Service Usage Admin (roles/serviceusage.serviceUsageAdmin) haben.

Dienstkontoberechtigungen

Zum Exportieren von Daten aus einer BigQuery-Tabelle muss das Dienstkonto die IAM-Berechtigung bigquery.tables.export haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.export:

Zum Aktualisieren von Daten in einer BigQuery-Tabelle muss das Dienstkonto die IAM-Berechtigung bigquery.tables.updateData haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.updateData:

Hinweise

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Enable the API

Reservierung erstellen

Erstellen Sie eine Reservierung für die Enterprise- oder Enterprise Plus-Version und dann eine Reservierungszuweisung mit dem Jobtyp CONTINUOUS.

Wenn Sie eine Reservierungszuweisung für eine kontinuierliche Abfrage erstellen, ist die zugehörige Reservierung auf maximal 500 Slots beschränkt und kann nicht für die Verwendung von Autoscaling konfiguriert werden.

Nach Pub/Sub exportieren

Zusätzliche APIs, IAM-Berechtigungen und Google Cloud -Ressourcen sind erforderlich, um Daten nach Pub/Sub zu exportieren. Weitere Informationen finden Sie unter In Pub/Sub exportieren.

Benutzerdefinierte Attribute als Metadaten in Pub/Sub-Nachrichten einbetten

Mit Pub/Sub-Attributen können Sie zusätzliche Informationen zur Nachricht angeben, z. B. ihre Priorität, ihren Ursprung, ihr Ziel oder zusätzliche Metadaten. Sie können auch Attribute verwenden, um Nachrichten im Abo zu filtern.

Wenn eine Spalte in einem kontinuierlichen Abfrageergebnis den Namen _ATTRIBUTES hat, werden ihre Werte in die Pub/Sub-Nachrichtenattribute kopiert. Die angegebenen Felder in _ATTRIBUTES werden als Attributschlüssel verwendet.

Die Spalte _ATTRIBUTES muss vom Typ JSON sein und das Format ARRAY<STRUCT<STRING, STRING>> oder STRUCT<STRING> haben.

Ein Beispiel finden Sie unter Daten in ein Pub/Sub-Thema exportieren.

Nach Bigtable exportieren

Zusätzliche APIs, IAM-Berechtigungen und Google Cloud-Ressourcen sind erforderlich, um Daten nach Bigtable zu exportieren. Weitere Informationen finden Sie unter In Bigtable exportieren.

Daten in eine BigQuery-Tabelle schreiben

Mit einer INSERT-Anweisung können Sie Daten in eine BigQuery-Tabelle schreiben.

KI-Funktionen verwenden

Zusätzliche APIs, IAM-Berechtigungen und Google Cloud-Ressourcen sind erforderlich, um eine unterstützte KI-Funktion in einer kontinuierlichen Abfrage zu verwenden. Weitere Informationen finden Sie je nach Anwendungsfall in den folgenden Themen:

Wenn Sie eine KI-Funktion in einer kontinuierlichen Abfrage verwenden, sollten Sie prüfen, ob die Abfrageausgabe innerhalb des Kontingents für die Funktion bleibt. Wenn Sie das Kontingent überschreiten, müssen Sie die nicht verarbeiteten Daten möglicherweise separat verarbeiten.

Kontinuierliche Abfrage mit einem Nutzerkonto ausführen

In diesem Abschnitt wird beschrieben, wie Sie eine kontinuierliche Abfrage mit einem Nutzerkonto ausführen. Nachdem die kontinuierliche Abfrage ausgeführt wird, können Sie die Google Cloud -Konsole, das Terminalfenster oder die Anwendung schließen, ohne die Abfrageausführung zu unterbrechen.

So führen Sie eine kontinuierliche Abfrage aus:

Console

  1. Öffnen Sie in der Google Cloud Console die Seite BigQuery.

    BigQuery aufrufen

  2. Klicken Sie im Abfrageeditor auf das Dreipunkt-Menü.

  3. Wählen Sie im Bereich Abfragemodus auswählen die Option Kontinuierliche Abfrage aus.

  4. Klicken Sie auf Bestätigen.

  5. Geben Sie im Abfrageeditor die SQL-Anweisung für die Continuous Query ein. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.

  6. Klicken Sie auf Ausführen.

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Führen Sie in Cloud Shell die kontinuierliche Abfrage mit dem Befehl bq query und dem Flag --continuous aus:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    Ersetzen Sie QUERY durch die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.

API

Führen Sie die kontinuierliche Abfrage durch Aufrufen der Methode jobs.insert aus. Sie müssen das Feld continuous in der JobConfigurationQuery der Job-Ressource auf true setzen, die Sie übergeben.

curl --request POST \
  'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
  --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/json' \
  --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))'
  --compressed

Ersetzen Sie Folgendes:

  • PROJECT_ID: Ihre Projekt-ID.
  • QUERY: Die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.

Kontinuierliche Abfrage mit einem Dienstkonto ausführen

In diesem Abschnitt wird beschrieben, wie Sie eine kontinuierliche Abfrage mit einem Dienstkonto ausführen. Nachdem die kontinuierliche Abfrage ausgeführt wird, können Sie die Google Cloud -Konsole, das Terminalfenster oder die Anwendung schließen, ohne die Abfrageausführung zu unterbrechen.

So führen Sie mit einem Dienstkonto eine kontinuierliche Abfrage aus:

Console

  1. Erstellen Sie ein Dienstkonto.
  2. Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
  3. Öffnen Sie in der Google Cloud Console die Seite BigQuery.

    BigQuery aufrufen

  4. Klicken Sie im Abfrageeditor auf das Dreipunkt-Menü.

  5. Wählen Sie im Bereich Abfragemodus auswählen die Option Kontinuierliche Abfrage aus.

  6. Klicken Sie auf Bestätigen.

  7. Klicken Sie im Abfrageeditor auf Mehr > Abfrageeinstellungen.

  8. Wählen Sie im Bereich Kontinuierliche Abfrage im Feld Dienstkonto das von Ihnen erstellte Dienstkonto aus.

  9. Klicken Sie auf Speichern.

  10. Geben Sie im Abfrageeditor die SQL-Anweisung für die Continuous Query ein. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.

  11. Klicken Sie auf Ausführen.

bq

  1. Erstellen Sie ein Dienstkonto.
  2. Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  4. Führen Sie die kontinuierliche Abfrage in der Befehlszeile mit dem Befehl bq query und den folgenden Flags aus:

    • Legen Sie das Flag --continuous auf true fest, um die Abfrage kontinuierlich auszuführen.
    • Verwenden Sie das Flag --connection_property, um ein Dienstkonto anzugeben.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • SERVICE_ACCOUNT_EMAIL: die E-Mail-Adresse des Dienstkontos. Sie finden die E-Mail-Adresse des Dienstkontos auf der Seite Dienstkonten der Google Cloud -Konsole.
    • QUERY: Die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.

API

  1. Erstellen Sie ein Dienstkonto.
  2. Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
  3. Führen Sie die kontinuierliche Abfrage durch Aufrufen der Methode jobs.insert aus. Legen Sie die folgenden Felder in der JobConfigurationQuery-Ressource der übergebenen Job-Ressource fest:

    • Legen Sie das Feld continuous auf true fest, um die Abfrage fortlaufend auszuführen.
    • Geben Sie im Feld connection_property ein Dienstkonto an, das verwendet werden soll.
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • QUERY: die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
    • SERVICE_ACCOUNT_EMAIL: die E-Mail-Adresse des Dienstkontos. Sie finden die E-Mail-Adresse des Dienstkontos auf der Seite Dienstkonten der Google Cloud -Konsole.

Beispiele

Die folgenden SQL-Beispiele zeigen gängige Anwendungsfälle für kontinuierliche Abfragen.

Daten in ein Pub/Sub-Thema exportieren

Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert, die Informationen zu Streamingtaxifahrten empfängt, und die Daten dann in Echtzeit mit Nachrichtenattributen in einem Pub/Sub-Thema veröffentlicht:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM `myproject.real_time_taxi_streaming.taxi_rides`
  WHERE ride_status = 'enroute'
);

Daten in eine Bigtable-Tabelle exportieren

Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert, die Informationen zu Streamingtaxifahrten empfängt, und die Daten in Echtzeit in eine Bigtable-Tabelle exportiert:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM `myproject.real_time_taxi_streaming.taxirides`
  WHERE ride_status = 'enroute'
);

Daten in eine BigQuery-Tabelle schreiben

Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert und transformiert, die Informationen zu Streamingtaxifahrten empfängt, und die Daten dann in Echtzeit in eine andere BigQuery-Tabelle schreibt. So stehen die Daten für weitere nachgelagerte Analysen zur Verfügung.

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM `myproject.real_time_taxi_streaming.taxirides`
WHERE
  ride_status = 'dropoff';

Daten mit einem Vertex AI-Modell verarbeiten

Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die ein Vertex AI-Modell verwendet, um eine Anzeige für Taxigäste anhand des aktuellen Breiten- und Längengrads zu generieren. Anschließend werden die Ergebnisse in ein Pub/Sub-Thema in Echtzeit exportiert:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM `myproject.real_time_taxi_streaming.taxirides`
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

Kontinuierliche Abfrage ab einem bestimmten Zeitpunkt starten

Wenn Sie eine kontinuierliche Abfrage starten, werden alle Zeilen in der Tabelle verarbeitet, aus der Sie auswählen, und neue Zeilen verarbeitet, sobald sie eingehen. Wenn Sie die Verarbeitung einiger oder aller vorhandenen Daten überspringen möchten, können Sie die Änderungsverlaufsfunktion APPENDS verwenden, um die Verarbeitung ab einem bestimmten Zeitpunkt zu starten.

Im folgenden Beispiel wird veranschaulicht, wie Sie mithilfe der Funktion APPENDS eine kontinuierliche Abfrage ab einem bestimmten Zeitpunkt starten:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL)
  WHERE
    ride_status = 'enroute');

SQL-Code einer kontinuierlichen Abfrage ändern

Sie können die in einer Continuous Query verwendete SQL-Abfrage nicht aktualisieren, während der Continuous Query-Job ausgeführt wird. Sie müssen den Job für die kontinuierliche Abfrage abbrechen, die SQL-Anweisung ändern und dann einen neuen Job für die kontinuierliche Abfrage an dem Punkt starten, an dem Sie den ursprünglichen Job für die kontinuierliche Abfrage angehalten haben.

So ändern Sie die in einer Continuous Query verwendete SQL-Abfrage:

  1. Rufen Sie die Jobdetails für den Job mit fortlaufender Abfrage auf, den Sie aktualisieren möchten, und notieren Sie sich die Job-ID.
  2. Pausieren Sie nach Möglichkeit die Erhebung von Upstream-Daten. Wenn Sie das nicht tun, kann es beim Neustart der kontinuierlichen Abfrage zu Datenduplikationen kommen.
  3. Brechen Sie die kontinuierliche Abfrage ab, die Sie ändern möchten.
  4. Rufen Sie den end_time-Wert für den ursprünglichen kontinuierlichen Abfragejob mithilfe der Ansicht INFORMATION_SCHEMA JOBS ab:

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • REGION: die Region, die für Ihr Projekt verwendet wird.
    • JOB_ID: Die ID des Jobs für kontinuierliche Abfragen, die Sie in Schritt 1 ermittelt haben.
  5. Ändern Sie die SQL-Anweisung für die kontinuierliche Abfrage so, dass die kontinuierliche Abfrage ab einem bestimmten Zeitpunkt gestartet wird. Verwenden Sie dabei den in Schritt 5 abgerufenen end_time-Wert als Ausgangspunkt.

  6. Ändern Sie die SQL-Anweisung für die kontinuierliche Abfrage entsprechend den erforderlichen Änderungen.

  7. Führen Sie die geänderte kontinuierliche Abfrage aus.

Kontinuierliche Abfrage abbrechen

Sie können einen Job für kontinuierliche Abfragen wie jeden anderen Job abbrechen. Es kann bis zu einer Minute dauern, bis die Abfrage nach dem Abbruch des Jobs beendet wird.

Nächste Schritte