Kontinuierliche Abfragen erstellen

In diesem Dokument wird beschrieben, wie Sie eine kontinuierliche Abfrage in BigQuery ausführen.

Kontinuierliche BigQuery-Abfragen sind SQL-Anweisungen, die kontinuierlich ausgeführt werden. Mit kontinuierlichen Abfragen können Sie eingehende Daten in BigQuery in Echtzeit analysieren und die Ergebnisse entweder nach Bigtable oder Pub/Sub exportieren oder in eine BigQuery-Tabelle schreiben.

Kontotyp auswählen

Sie können einen Job für kontinuierliche Abfragen mit einem Nutzerkonto erstellen und ausführen. Sie haben auch die Möglichkeit, einen Job für kontinuierliche Abfragen mit einem Nutzerkonto zu erstellen und dann mit einem Dienstkonto auszuführen. Sie müssen ein Dienstkonto verwenden, um eine kontinuierliche Abfrage auszuführen, die Ergebnisse in ein Pub/Sub-Thema exportiert.

Bei Verwendung eines Nutzerkontos wird eine kontinuierliche Abfrage zwei Tage lang ausgeführt. Wenn Sie ein Dienstkonto verwenden, wird eine kontinuierliche Abfrage ausgeführt, bis sie explizit abgebrochen wird. Weitere Informationen finden Sie unter Autorisierung.

Erforderliche Berechtigungen

In diesem Abschnitt werden die Berechtigungen beschrieben, die Sie zum Erstellen und Ausführen einer kontinuierlichen Abfrage benötigen. Als Alternative zu den genannten IAM-Rollen (Identity and Access Management) können Sie die erforderlichen Berechtigungen über benutzerdefinierte Rollen erhalten.

Berechtigungen bei Verwendung eines Nutzerkontos

Dieser Abschnitt enthält Informationen zu den Rollen und Berechtigungen, die zum Erstellen und Ausführen einer kontinuierlichen Abfrage mithilfe eines Nutzerkontos erforderlich sind.

Zum Erstellen eines Jobs in BigQuery muss das Nutzerkonto die IAM-Berechtigung bigquery.jobs.create haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.jobs.create:

Zum Exportieren von Daten aus einer BigQuery-Tabelle muss das Nutzerkonto die IAM-Berechtigung bigquery.tables.export haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.export:

Zum Aktualisieren von Daten in einer BigQuery-Tabelle muss das Nutzerkonto die IAM-Berechtigung bigquery.tables.updateData haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.updateData:

Wenn das Nutzerkonto die APIs aktivieren muss, die für Ihren Anwendungsfall für kontinuierliche Abfragen erforderlich sind, muss das Nutzerkonto die Rolle Service Usage Admin (roles/serviceusage.serviceUsageAdmin) haben.

Berechtigungen bei Verwendung eines Dienstkontos

Dieser Abschnitt enthält Informationen zu den Rollen und Berechtigungen, die für das Nutzerkonto erforderlich sind, mit dem die kontinuierliche Abfrage erstellt wird, sowie für das Dienstkonto, das die kontinuierliche Abfrage ausführt.

Nutzerkontoberechtigungen

Zum Erstellen eines Jobs in BigQuery muss das Nutzerkonto die IAM-Berechtigung bigquery.jobs.create haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.jobs.create:

Zum Senden eines Jobs, der mit einem Dienstkonto ausgeführt wird, muss das Nutzerkonto die Rolle Dienstkontonutzer (roles/iam.serviceAccountUser) haben. Wenn Sie dasselbe Nutzerkonto zum Erstellen des Dienstkontos verwenden, muss das Nutzerkonto die Rolle Service Account Admin (roles/iam.serviceAccountAdmin) haben. Informationen zum Beschränken des Zugriffs eines Nutzers auf ein einzelnes Dienstkonto und nicht auf alle Dienstkonten innerhalb eines Projekts finden Sie unter Einzelne Rolle zuweisen.

Wenn das Nutzerkonto die APIs aktivieren muss, die für Ihren Anwendungsfall mit kontinuierlichen Abfragen erforderlich sind, muss das Nutzerkonto die Rolle Service Usage Admin (roles/serviceusage.serviceUsageAdmin) haben.

Dienstkontoberechtigungen

Zum Exportieren von Daten aus einer BigQuery-Tabelle muss das Dienstkonto die IAM-Berechtigung bigquery.tables.export haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.export:

Zum Aktualisieren von Daten in einer BigQuery-Tabelle muss das Dienstkonto die IAM-Berechtigung bigquery.tables.updateData haben. Jede der folgenden IAM-Rollen gewährt die Berechtigung bigquery.tables.updateData:

Hinweise

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  3. BigQuery API aktivieren.

    Aktivieren Sie die API

Reservierung erstellen

Erstellen Sie eine Reservierung für Enterprise oder Enterprise Plus und erstellen Sie dann eine Reservierungszuweisung mit einem CONTINUOUS-Jobtyp.

Nach Pub/Sub exportieren

Zum Exportieren von Daten nach Pub/Sub sind zusätzliche APIs, IAM-Berechtigungen und Google Cloud-Ressourcen erforderlich. Weitere Informationen finden Sie unter Nach Pub/Sub exportieren.

Nach Bigtable exportieren

Zum Exportieren von Daten nach Bigtable sind zusätzliche APIs, IAM-Berechtigungen und Google Cloud-Ressourcen erforderlich. Weitere Informationen finden Sie unter Nach Bigtable exportieren.

Daten in eine BigQuery-Tabelle schreiben

Mit einer INSERT-Anweisung können Sie Daten in eine BigQuery-Tabelle schreiben.

KI-Funktionen verwenden

Es sind zusätzliche APIs, IAM-Berechtigungen und Google Cloud-Ressourcen erforderlich, um eine unterstützte KI-Funktion in einer kontinuierlichen Abfrage zu verwenden. Weitere Informationen finden Sie je nach Anwendungsfall in einem der folgenden Themen:

Wenn Sie eine KI-Funktion in einer kontinuierlichen Abfrage verwenden, sollten Sie prüfen, ob die Abfrageausgabe im Kontingent für die Funktion verbleibt. Wenn Sie das Kontingent überschreiten, müssen Sie möglicherweise die Einträge, die nicht verarbeitet werden, separat verarbeiten.

Kontinuierliche Abfrage mit einem Nutzerkonto ausführen

In diesem Abschnitt wird beschrieben, wie Sie eine kontinuierliche Abfrage mit einem Nutzerkonto ausführen. Sobald die kontinuierliche Abfrage ausgeführt wurde, können Sie die Google Cloud Console, das Terminalfenster oder die Anwendung schließen, ohne die Abfrageausführung zu unterbrechen.

So führen Sie eine kontinuierliche Abfrage aus:

Console

  1. Öffnen Sie in der Google Cloud Console die Seite BigQuery.

    BigQuery aufrufen

  2. Klicken Sie im Abfrageeditor auf Mehr.

  3. Wählen Sie im Abschnitt Abfragemodus auswählen die Option Kontinuierliche Abfrage aus.

  4. Klicken Sie auf Bestätigen.

  5. Geben Sie im Abfrageeditor die SQL-Anweisung für die kontinuierliche Abfrage ein. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.

  6. Klicken Sie auf Ausführen.

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Führen Sie in Cloud Shell die kontinuierliche Abfrage mit dem Befehl bq query und dem Flag --continuous aus:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'
    

    Ersetzen Sie QUERY durch die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.

API

Führen Sie die kontinuierliche Abfrage aus, indem Sie die Methode jobs.insert aufrufen. Sie müssen das Feld continuous in der JobConfigurationQuery der Job-Ressource auf true setzen, die Sie übergeben.

curl --request POST \
  'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
  --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/json' \
  --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))'
  --compressed

Ersetzen Sie Folgendes:

  • PROJECT_ID: Ihre Projekt-ID.
  • QUERY: Die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.

Kontinuierliche Abfrage mithilfe eines Dienstkontos ausführen

In diesem Abschnitt wird beschrieben, wie Sie eine kontinuierliche Abfrage mithilfe eines Dienstkontos ausführen. Sobald die kontinuierliche Abfrage ausgeführt wurde, können Sie die Google Cloud Console, das Terminalfenster oder die Anwendung schließen, ohne die Abfrageausführung zu unterbrechen.

So führen Sie über ein Dienstkonto eine kontinuierliche Abfrage aus:

Console

  1. Erstellen Sie ein Dienstkonto.
  2. Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
  3. Öffnen Sie in der Google Cloud Console die Seite BigQuery.

    BigQuery aufrufen

  4. Klicken Sie im Abfrageeditor auf Mehr.

  5. Wählen Sie im Abschnitt Abfragemodus auswählen die Option Kontinuierliche Abfrage aus.

  6. Klicken Sie auf Bestätigen.

  7. Klicken Sie im Abfrageeditor auf Mehr > Abfrageeinstellungen.

  8. Wählen Sie im Abschnitt Kontinuierliche Abfrage im Feld Dienstkonto das von Ihnen erstellte Dienstkonto aus.

  9. Klicken Sie auf Speichern.

  10. Geben Sie im Abfrageeditor die SQL-Anweisung für die kontinuierliche Abfrage ein. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.

  11. Klicken Sie auf Ausführen.

bq

  1. Erstellen Sie ein Dienstkonto.
  2. Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  4. Führen Sie in der Befehlszeile die kontinuierliche Abfrage mit dem Befehl bq query und den folgenden Flags aus:

    • Setzen Sie das Flag --continuous auf true, um die Abfrage kontinuierlich zu machen.
    • Verwenden Sie das Flag --connection_property, um das zu verwendende Dienstkonto anzugeben.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • SERVICE_ACCOUNT_EMAIL: die E-Mail-Adresse des Dienstkontos. Sie können die E-Mail-Adresse des Dienstkontos auf der Seite Dienstkonten in der Google Cloud Console abrufen.
    • QUERY: Die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.

API

  1. Erstellen Sie ein Dienstkonto.
  2. Gewähren Sie die erforderlichen Berechtigungen für das Dienstkonto:
  3. Führen Sie die kontinuierliche Abfrage aus, indem Sie die Methode jobs.insert aufrufen. Legen Sie die folgenden Felder in der Ressource JobConfigurationQuery der Ressource Job fest, die Sie übergeben:

    • Setzen Sie das Feld continuous auf true, um die Abfrage kontinuierlich zu machen.
    • Verwenden Sie das Feld connection_property, um das zu verwendende Dienstkonto anzugeben.
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • QUERY: Die SQL-Anweisung für die kontinuierliche Abfrage. Die SQL-Anweisung darf nur unterstützte Vorgänge enthalten.
    • SERVICE_ACCOUNT_EMAIL: die E-Mail-Adresse des Dienstkontos. Sie können die E-Mail-Adresse des Dienstkontos auf der Seite Dienstkonten in der Google Cloud Console abrufen.

Beispiele

Die folgenden SQL-Beispiele zeigen gängige Anwendungsfälle für kontinuierliche Abfragen.

Daten in ein Pub/Sub-Thema exportieren

Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert, die gestreamte Taxifahrteninformationen empfängt, und diese Daten in Echtzeit in einem Pub/Sub-Thema veröffentlicht:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM `myproject.real_time_taxi_streaming.taxi_rides`
  WHERE ride_status = 'enroute'
);

Daten in eine Bigtable-Tabelle exportieren

Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert, die Streaminginformationen zu Taxifahrten empfängt, und die Daten in Echtzeit in die Bigtable-Tabelle exportiert:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM `myproject.real_time_taxi_streaming.taxirides`
  WHERE ride_status = 'enroute'
);

Daten in eine BigQuery-Tabelle schreiben

Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die Daten aus einer BigQuery-Tabelle filtert und transformiert, die Informationen zu Streamingtaxifahrten empfängt, und die Daten dann in Echtzeit in eine andere BigQuery-Tabelle schreibt. Dadurch stehen die Daten für weitere nachgelagerte Analysen zur Verfügung.

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM `myproject.real_time_taxi_streaming.taxirides`
WHERE
  ride_status = 'dropoff';

Daten mit einem Vertex AI-Modell verarbeiten

Das folgende Beispiel zeigt eine kontinuierliche Abfrage, die ein Vertex AI-Modell verwendet, um eine Anzeige für Taxigäste anhand des aktuellen Breiten- und Längengrads zu generieren. Anschließend werden die Ergebnisse in ein Pub/Sub-Thema in Echtzeit exportiert:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM `myproject.real_time_taxi_streaming.taxirides`
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

Kontinuierliche Abfrage ab einem bestimmten Zeitpunkt starten

Wenn Sie eine kontinuierliche Abfrage starten, werden alle Zeilen in der Tabelle verarbeitet, aus der Sie auswählen, und neue Zeilen verarbeitet, sobald sie eingehen. Wenn Sie die Verarbeitung einiger oder aller vorhandenen Daten überspringen möchten, können Sie die Änderungsverlaufsfunktion APPENDS verwenden, um die Verarbeitung ab einem bestimmten Zeitpunkt zu starten.

Das folgende Beispiel zeigt, wie eine kontinuierliche Abfrage mit der Funktion APPENDS zu einem bestimmten Zeitpunkt gestartet wird:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL)
  WHERE
    ride_status = 'enroute');

SQL einer kontinuierlichen Abfrage ändern

Sie können den SQL-Code, der in einer kontinuierlichen Abfrage verwendet wird, nicht aktualisieren, während der kontinuierliche Abfragejob ausgeführt wird. Sie müssen den kontinuierlichen Abfragejob abbrechen, den SQL-Code ändern und dann einen neuen kontinuierlichen Abfragejob ab dem Punkt starten, an dem Sie den ursprünglichen kontinuierlichen Abfragejob beendet haben.

So ändern Sie den SQL-Code, der in einer kontinuierlichen Abfrage verwendet wird:

  1. Rufen Sie die Jobdetails für den kontinuierlichen Abfragejob auf, den Sie aktualisieren möchten, und notieren Sie sich die Job-ID.
  2. Pausieren Sie nach Möglichkeit die Erfassung von vorgelagerten Daten. Wenn dies nicht möglich ist, erhalten Sie beim Neustart der kontinuierlichen Abfrage möglicherweise eine Datenduplizierung.
  3. Brechen Sie die kontinuierliche Abfrage ab, die Sie ändern möchten.
  4. Rufen Sie den Wert end_time für den ursprünglichen kontinuierlichen Abfragejob mit der Ansicht JOBS INFORMATION_SCHEMA ab:

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • REGION ist die Region, die von Ihrem Projekt verwendet wird.
    • JOB_ID: Die Job-ID der kontinuierlichen Abfrage, die Sie in Schritt 1 identifiziert haben.
  5. Ändern Sie die SQL-Anweisung für die kontinuierliche Abfrage so, dass die kontinuierliche Abfrage ab einem bestimmten Zeitpunkt gestartet wird. Verwenden Sie dabei den in Schritt 5 abgerufenen end_time-Wert als Ausgangspunkt.

  6. Ändern Sie die SQL-Anweisung für die kontinuierliche Abfrage entsprechend Ihren erforderlichen Änderungen.

  7. Führen Sie die geänderte kontinuierliche Abfrage aus.

Kontinuierliche Abfrage abbrechen

Sie können einen kontinuierlichen Abfragejob wie jeden anderen Job abbrechen. Nach dem Abbruch des Jobs kann es bis zu einer Minute dauern, bis die Abfrage nicht mehr ausgeführt wird.