Monitorare le query continue

Puoi monitorare le query continue di BigQuery utilizzando i seguenti strumenti BigQuery:

A causa della natura di lunga durata di una query continua BigQuery, le metriche che vengono solitamente generate al completamento di una query SQL potrebbero essere assenti o imprecise.

Usa INFORMATION_SCHEMA visualizzazioni

Puoi utilizzare diverse visualizzazioni INFORMATION_SCHEMA per monitorare le query continue e le prenotazioni di query continue.

Visualizza i dettagli del job

Puoi utilizzare la visualizzazione JOBS per ottenere i metadati del job di query continua.

La seguente query restituisce i metadati per tutte le query continue attive. I metadati includono il timestamp della filigrana di output, che rappresenta il punto fino al quale la query continua ha elaborato correttamente i dati.

  1. Nella console Google Cloud , vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nell'editor di query, esegui la seguente query:

    SELECT
      start_time,
      job_id,
      user_email,
      query,
      state,
      reservation_id,
      continuous_query_info.output_watermark
    FROM `PROJECT_ID.region-REGION.INFORMATION_SCHEMA.JOBS`
    WHERE
      creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 day)
      AND continuous IS TRUE
      AND state = "RUNNING"
    ORDER BY
      start_time DESC

    Sostituisci quanto segue:

Visualizzare i dettagli dell'assegnazione della prenotazione

Puoi utilizzare le visualizzazioni ASSIGNMENTS e RESERVATIONS per ottenere i dettagli sull'assegnazione delle prenotazioni di query continue.

Restituisci i dettagli dell'assegnazione della prenotazione per le query continue:

  1. Nella console Google Cloud , vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nell'editor di query, esegui la seguente query:

    SELECT
      reservation.reservation_name,
      reservation.slot_capacity
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND job_type = 'CONTINUOUS';

    Sostituisci quanto segue:

    • ADMIN_PROJECT_ID: l'ID del progetto di amministrazione proprietario della prenotazione.
    • LOCATION: la posizione della prenotazione.
    • PROJECT_ID: l'ID del progetto assegnato alla prenotazione. Vengono restituite solo le informazioni sulle query continue in esecuzione in questo progetto.

Visualizzare le informazioni sul consumo di slot

Puoi utilizzare le visualizzazioni ASSIGNMENTS, RESERVATIONS e JOBS_TIMELINE per ottenere informazioni sul consumo continuo di slot di query.

Informazioni sul consumo degli slot di ritorno per le query continue:

  1. Nella console Google Cloud , vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nell'editor di query, esegui la seguente query:

    SELECT
      jobs.period_start,
      reservation.reservation_name,
      reservation.slot_capacity,
      SUM(jobs.period_slot_ms) / 1000 AS consumed_total_slots
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    INNER JOIN
      `PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.JOBS_TIMELINE` AS jobs
      ON (
        UPPER(CONCAT('ADMIN_PROJECT_ID:LOCATION.', assignment.reservation_name))
        = UPPER(jobs.reservation_id))
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND assignment.job_type = 'CONTINUOUS'
      AND jobs.period_start
        BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
        AND CURRENT_TIMESTAMP()
    GROUP BY 1, 2, 3
    ORDER BY jobs.period_start DESC;

    Sostituisci quanto segue:

    • ADMIN_PROJECT_ID: l'ID del progetto di amministrazione proprietario della prenotazione.
    • LOCATION: la posizione della prenotazione.
    • PROJECT_ID: l'ID del progetto assegnato alla prenotazione. Vengono restituite solo le informazioni sulle query continue in esecuzione in questo progetto.

Puoi anche monitorare le prenotazioni di query continue utilizzando altri strumenti come Esplora metriche e grafici delle risorse amministrative. Per ulteriori informazioni, vedi Monitorare le prenotazioni BigQuery.

Utilizzare il grafico di esecuzione delle query

Puoi utilizzare il grafico di esecuzione delle query per ottenere informazioni sulle prestazioni e statistiche generali per una query continua. Per ulteriori informazioni, vedi Visualizzare gli approfondimenti sul rendimento delle query.

Visualizzare la cronologia dei job

Puoi visualizzare i dettagli dei job di query continua nella cronologia dei job personali o in quella del progetto. Per saperne di più, vedi Visualizzare i dettagli del job.

Tieni presente che l'elenco storico dei job è ordinato in base all'ora di inizio del job, pertanto le query continue in esecuzione da un po' di tempo potrebbero non trovarsi all'inizio dell'elenco.

Utilizzare lo strumento di esplorazione dei job amministrativi

In Esplora job amministrativi, filtra i job per mostrare le query continue impostando il filtro Categoria job su Query continua.

Usa Cloud Monitoring

Puoi visualizzare le metriche specifiche per le query continue di BigQuery utilizzando Cloud Monitoring. Per saperne di più, consulta Creare dashboard, grafici e avvisi e leggi le informazioni sulle metriche disponibili per la visualizzazione.

Avviso relativo alle query non riuscite

Anziché controllare regolarmente se le query continue non sono riuscite, può essere utile creare un avviso per ricevere una notifica in caso di errore. Un modo per farlo è creare una metrica basata su log di Cloud Logging personalizzata con un filtro per i tuoi job e una policy di avviso di Cloud Monitoring basata su questa metrica:

  1. Quando crei una query continua, utilizza un prefisso ID job personalizzato. Più query continue possono condividere lo stesso prefisso. Ad esempio, potresti utilizzare il prefisso prod- per indicare una query di produzione.
  2. Nella console Google Cloud , vai alla pagina Metriche basate su log.

    Vai a Metriche basate su log

  3. Fai clic su Crea metrica. Viene visualizzato il riquadro Crea una metrica.

  4. Per Tipo di metrica, seleziona Contatore.

  5. Nella sezione Dettagli, assegna un nome alla metrica. Ad esempio, CUSTOM_JOB_ID_PREFIX-metric.

  6. Nella sezione Selezione filtro, inserisci quanto segue nell'editor Crea filtro:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Sostituisci quanto segue:

  7. Fai clic su Crea metrica.

  8. Nel menu di navigazione, fai clic su Metriche basate sui log. La metrica che hai appena creato viene visualizzata nell'elenco delle metriche definite dall'utente.

  9. Nella riga della metrica, fai clic su Altre azioni e poi su Crea avviso da metrica.

  10. Fai clic su Avanti. Non è necessario modificare le impostazioni predefinite nella pagina Modalità di configurazione dei criteri.

  11. Fai clic su Avanti. Non è necessario modificare le impostazioni predefinite nella pagina Configura attivatore di avvisi.

  12. Seleziona i canali di notifica e inserisci un nome per la policy di avviso.

  13. Fai clic su Crea criterio.

Puoi testare l'avviso eseguendo una query continua con il prefisso dell'ID job personalizzato che hai selezionato e poi annullandola. Potrebbero essere necessari alcuni minuti prima che l'avviso raggiunga il canale di notifica.

Ritenta le query non riuscite

Il nuovo tentativo di esecuzione di una query continua non riuscita può aiutare a evitare situazioni in cui una pipeline continua è inattiva per un periodo di tempo prolungato o richiede l'intervento umano per essere riavviata. Di seguito sono riportati alcuni aspetti importanti da considerare quando riprovi una query continua non riuscita:

  • Se è tollerabile rielaborare una certa quantità di dati elaborati dalla query precedente prima che non sia riuscita.
  • Come gestire la limitazione dei tentativi o l'utilizzo del backoff esponenziale.

Un possibile approccio per automatizzare il nuovo tentativo di query è il seguente:

  1. Crea un sink Cloud Logging basato su un filtro di inclusione che corrisponda ai seguenti criteri per indirizzare i log a un argomento Pub/Sub:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Sostituisci quanto segue:

  2. Crea una funzione Cloud Run che viene attivata in risposta ai log di ricezione di Pub/Sub corrispondenti al tuo filtro.

    La funzione Cloud Run potrebbe accettare il payload di dati dal messaggio Pub/Sub e tentare di avviare una nuova query continua utilizzando la stessa sintassi SQL della query non riuscita, ma all'inizio subito dopo l'interruzione del job precedente.

Ad esempio, puoi utilizzare una funzione simile alla seguente:

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python nella guida rapida di BigQuery per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API BigQuery Python.

Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configurare l'autenticazione per le librerie client.

import base64
import json
import logging
import re
import uuid

import google.auth
import google.auth.transport.requests
import requests


def retry_continuous_query(event, context):
    logging.info("Cloud Function started.")

    if "data" not in event:
        logging.info("No data in Pub/Sub message.")
        return

    try:
        # Decode and parse the Pub/Sub message data
        log_entry = json.loads(base64.b64decode(event["data"]).decode("utf-8"))

        # Extract the SQL query and other necessary data
        proto_payload = log_entry.get("protoPayload", {})
        metadata = proto_payload.get("metadata", {})
        job_change = metadata.get("jobChange", {})
        job = job_change.get("job", {})
        job_config = job.get("jobConfig", {})
        query_config = job_config.get("queryConfig", {})
        sql_query = query_config.get("query")
        job_stats = job.get("jobStats", {})
        end_timestamp = job_stats.get("endTime")
        failed_job_id = job.get("jobName")

        # Check if required fields are missing
        if not all([sql_query, failed_job_id, end_timestamp]):
            logging.error("Required fields missing from log entry.")
            return

        logging.info(f"Retrying failed job: {failed_job_id}")

        # Adjust the timestamp in the SQL query
        timestamp_match = re.search(
            r"\s*TIMESTAMP\(('.*?')\)(\s*\+ INTERVAL 1 MICROSECOND)?", sql_query
        )

        if timestamp_match:
            original_timestamp = timestamp_match.group(1)
            new_timestamp = f"'{end_timestamp}'"
            sql_query = sql_query.replace(original_timestamp, new_timestamp)
        elif "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE" in sql_query:
            new_timestamp = f"TIMESTAMP('{end_timestamp}') + INTERVAL 1 MICROSECOND"
            sql_query = sql_query.replace(
                "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE", new_timestamp
            )

        # Get access token
        credentials, project = google.auth.default(
            scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )
        request = google.auth.transport.requests.Request()
        credentials.refresh(request)
        access_token = credentials.token

        # API endpoint
        url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project}/jobs"

        # Request headers
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

        # Generate a random UUID
        random_suffix = str(uuid.uuid4())[:8]  # Take the first 8 characters of the UUID

        # Combine the prefix and random suffix
        job_id = f"CUSTOM_JOB_ID_PREFIX{random_suffix}"

        # Request payload
        data = {
            "configuration": {
                "query": {
                    "query": sql_query,
                    "useLegacySql": False,
                    "continuous": True,
                    "connectionProperties": [
                        {"key": "service_account", "value": "SERVICE_ACCOUNT"}
                    ],
                    # ... other query parameters ...
                },
                "labels": {"bqux_job_id_prefix": "CUSTOM_JOB_ID_PREFIX"},
            },
            "jobReference": {
                "projectId": project,
                "jobId": job_id,  # Use the generated job ID here
            },
        }

        # Make the API request
        response = requests.post(url, headers=headers, json=data)

        # Handle the response
        if response.status_code == 200:
            logging.info("Query job successfully created.")
        else:
            logging.error(f"Error creating query job: {response.text}")

    except Exception as e:
        logging.error(
            f"Error processing log entry or retrying query: {e}", exc_info=True
        )

    logging.info("Cloud Function finished.")

Passaggi successivi