Supervisa consultas continuas

Puedes supervisar las consultas continuas de BigQuery con las siguientes herramientas de BigQuery:

Debido a la naturaleza de larga duración de una consulta continua de BigQuery, las métricas que se suelen generar cuando se completa una consulta de SQL pueden estar ausentes o imprecisas.

Usa las vistas de INFORMATION_SCHEMA

Puedes usar varias vistas INFORMATION_SCHEMA para supervisar las consultas continuas y las reservas de consultas continuas.

Ver detalles del trabajo

Puedes usar la vista JOBS para obtener metadatos de trabajos de consultas continuas.

La siguiente consulta devuelve los metadatos de todas las consultas continuas activas. Los metadatos incluyen la marca de agua de salida, que representa el punto hasta el cual la consulta continua procesó datos correctamente.

  1. En la consola de Google Cloud , ve a la página BigQuery.

    Ir a BigQuery

  2. En el Editor de consultas, ejecute la siguiente consulta:

    SELECT
      start_time,
      job_id,
      user_email,
      query,
      state,
      reservation_id,
      continuous_query_info.output_watermark
    FROM `PROJECT_ID.region-REGION.INFORMATION_SCHEMA.JOBS`
    WHERE
      creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 day)
      AND continuous IS TRUE
      AND state = "RUNNING"
    ORDER BY
      start_time DESC

    Reemplaza lo siguiente:

Cómo ver los detalles de la asignación de la reserva

Puedes usar las vistas ASSIGNMENTS y RESERVATIONS para obtener detalles de asignación de reserva de consultas continuas.

Muestra los detalles de la asignación de reservas para las consultas continuas:

  1. En la consola de Google Cloud , ve a la página BigQuery.

    Ir a BigQuery

  2. En el Editor de consultas, ejecute la siguiente consulta:

    SELECT
      reservation.reservation_name,
      reservation.slot_capacity
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND job_type = 'CONTINUOUS';

    Reemplaza lo siguiente:

    • ADMIN_PROJECT_ID por el ID del proyecto de administración que posee la reserva.
    • LOCATION: la ubicación de la reserva.
    • PROJECT_ID: el ID del proyecto que se asigna a la reserva. Solo se muestra información sobre las consultas continuas que se ejecutan en este proyecto.

Consulta la información de consumo de ranuras

Puedes usar las vistas ASSIGNMENTS, RESERVATIONS y JOBS_TIMELINE. para obtener información del consumo de ranuras de consulta continua.

Muestra información sobre el consumo de ranuras de consultas continuas:

  1. En la consola de Google Cloud , ve a la página BigQuery.

    Ir a BigQuery

  2. En el Editor de consultas, ejecute la siguiente consulta:

    SELECT
      jobs.period_start,
      reservation.reservation_name,
      reservation.slot_capacity,
      SUM(jobs.period_slot_ms) / 1000 AS consumed_total_slots
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    INNER JOIN
      `PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.JOBS_TIMELINE` AS jobs
      ON (
        UPPER(CONCAT('ADMIN_PROJECT_ID:LOCATION.', assignment.reservation_name))
        = UPPER(jobs.reservation_id))
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND assignment.job_type = 'CONTINUOUS'
      AND jobs.period_start
        BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
        AND CURRENT_TIMESTAMP()
    GROUP BY 1, 2, 3
    ORDER BY jobs.period_start DESC;

    Reemplaza lo siguiente:

    • ADMIN_PROJECT_ID por el ID del proyecto de administración que posee la reserva.
    • LOCATION: la ubicación de la reserva.
    • PROJECT_ID: el ID del proyecto que se asigna a la reserva. Solo se muestra información sobre las consultas continuas que se ejecutan en este proyecto.

También puedes supervisar las reservas de consultas continuas con otras herramientas, como el Explorador de métricas y los gráficos de recursos administrativos. Para obtener más información, consulta Supervisa reservas de BigQuery.

Usa el gráfico de ejecución de consultas

Puedes usar el gráfico de ejecución de consultas para obtener estadísticas de rendimiento y estadísticas generales para una consulta continua. Para obtener más información, consulta Visualiza las estadísticas de rendimiento de las consultas.

Ver el historial de un trabajo

Puedes ver los detalles del trabajo de consulta continua en tu historial de trabajos personales o en el historial de trabajos del proyecto. Para obtener más información, consulta Visualiza los detalles del trabajo.

Ten en cuenta que la lista histórica de trabajos se ordena según la hora de inicio del trabajo, por lo que es posible que las consultas continuas que se han ejecutado por un tiempo no estén cerca del inicio de la lista.

Usa el explorador de trabajos administrativos

En el explorador de trabajos administrativos, filtra tus trabajos para mostrar las consultas continuas. Para ello, configura el filtro Categoría de trabajo en Consulta continua.

Use Cloud Monitoring

Puedes ver métricas específicas de las consultas continuas de BigQuery con Cloud Monitoring. Para obtener más información, consulta Crea paneles, gráficos y alertas y lee sobre las métricas disponibles para la visualización.

Alerta sobre consultas con errores

En lugar de verificar de forma rutinaria si fallaron tus consultas continuas, puede ser útil crear una alerta para que te notifique sobre las fallas. Una forma de hacerlo es crear una métrica basada en registros de Cloud Logging personalizada con un filtro para tus trabajos y una política de alertas de Cloud Monitoring basada en esa métrica:

  1. Cuando crees una consulta continua, usa un prefijo de ID de trabajo personalizado. Varias consultas continuas pueden compartir el mismo prefijo. Por ejemplo, podrías usar el prefijo prod- para indicar una búsqueda de producción.
  2. En la consola de Google Cloud , ve a la página Métricas basadas en registros.

    Ve a Métricas basadas en registros

  3. Haz clic en Crear métrica. Aparecerá el panel Crear métrica de registros.

  4. En Tipo de métrica, selecciona Contador.

  5. En la sección Detalles, asígnale un nombre a tu métrica. Por ejemplo, CUSTOM_JOB_ID_PREFIX-metric

  6. En la sección Selección de filtro, ingresa lo siguiente en el editor Crear filtro:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Reemplaza lo siguiente:

  7. Haz clic en Crear métrica.

  8. En el menú de navegación, haz clic en Métricas basadas en registros. La métrica que acabas de crear aparecerá en la lista de métricas definidas por el usuario.

  9. En la fila de tu métrica, haz clic en Más acciones y, luego, en Crear alerta a partir de métricas.

  10. Haz clic en Siguiente. No es necesario que cambies la configuración predeterminada en la página Modo de configuración de políticas.

  11. Haz clic en Siguiente. No es necesario que cambies la configuración predeterminada en la página Configurar activador de alerta.

  12. Selecciona tus canales de notificaciones y, luego, ingresa un nombre para la política de alertas.

  13. Haz clic en Crear política.

Para probar la alerta, ejecuta una consulta continua con el prefijo del ID de trabajo personalizado que seleccionaste y, luego, cancélala. Es posible que la alerta tarde unos minutos en llegar a tu canal de notificaciones.

Vuelve a intentar las búsquedas con errores

Volver a intentar una consulta continua fallida puede ayudar a evitar situaciones en las que una canalización continua no funciona durante un período prolongado o requiere intervención humana para reiniciarse. Estos son algunos aspectos importantes que debes tener en cuenta cuando vuelvas a intentar ejecutar una consulta continua fallida:

  • Indica si es tolerable volver a procesar una cierta cantidad de datos que procesó la consulta anterior antes de que fallara.
  • Cómo controlar los reintentos de limitación o usar la retirada exponencial

Un posible enfoque para automatizar el reintento de consultas es el siguiente:

  1. Crea un receptor de Cloud Logging basado en un filtro de inclusión que cumpla con los siguientes criterios para enrutar los registros a un tema de Pub/Sub:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Reemplaza lo siguiente:

  2. Crea una Cloud Run Function que se active en respuesta a los registros de recepción de Pub/Sub que coincidan con tu filtro.

    La función de Cloud Run podría aceptar la carga útil de datos del mensaje de Pub/Sub y tratar de iniciar una nueva consulta continua con la misma sintaxis de SQL que la consulta fallida, pero al principio, justo después de que se detuvo el trabajo anterior.

Por ejemplo, puedes usar una función similar a la siguiente:

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración para Python incluidas en la guía de inicio rápido de BigQuery sobre cómo usar bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de BigQuery para Python.

Para autenticarte en BigQuery, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para bibliotecas cliente.

import base64
import json
import logging
import re
import uuid

import google.auth
import google.auth.transport.requests
import requests


def retry_continuous_query(event, context):
    logging.info("Cloud Function started.")

    if "data" not in event:
        logging.info("No data in Pub/Sub message.")
        return

    try:
        # Decode and parse the Pub/Sub message data
        log_entry = json.loads(base64.b64decode(event["data"]).decode("utf-8"))

        # Extract the SQL query and other necessary data
        proto_payload = log_entry.get("protoPayload", {})
        metadata = proto_payload.get("metadata", {})
        job_change = metadata.get("jobChange", {})
        job = job_change.get("job", {})
        job_config = job.get("jobConfig", {})
        query_config = job_config.get("queryConfig", {})
        sql_query = query_config.get("query")
        job_stats = job.get("jobStats", {})
        end_timestamp = job_stats.get("endTime")
        failed_job_id = job.get("jobName")

        # Check if required fields are missing
        if not all([sql_query, failed_job_id, end_timestamp]):
            logging.error("Required fields missing from log entry.")
            return

        logging.info(f"Retrying failed job: {failed_job_id}")

        # Adjust the timestamp in the SQL query
        timestamp_match = re.search(
            r"\s*TIMESTAMP\(('.*?')\)(\s*\+ INTERVAL 1 MICROSECOND)?", sql_query
        )

        if timestamp_match:
            original_timestamp = timestamp_match.group(1)
            new_timestamp = f"'{end_timestamp}'"
            sql_query = sql_query.replace(original_timestamp, new_timestamp)
        elif "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE" in sql_query:
            new_timestamp = f"TIMESTAMP('{end_timestamp}') + INTERVAL 1 MICROSECOND"
            sql_query = sql_query.replace(
                "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE", new_timestamp
            )

        # Get access token
        credentials, project = google.auth.default(
            scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )
        request = google.auth.transport.requests.Request()
        credentials.refresh(request)
        access_token = credentials.token

        # API endpoint
        url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project}/jobs"

        # Request headers
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

        # Generate a random UUID
        random_suffix = str(uuid.uuid4())[:8]  # Take the first 8 characters of the UUID

        # Combine the prefix and random suffix
        job_id = f"CUSTOM_JOB_ID_PREFIX{random_suffix}"

        # Request payload
        data = {
            "configuration": {
                "query": {
                    "query": sql_query,
                    "useLegacySql": False,
                    "continuous": True,
                    "connectionProperties": [
                        {"key": "service_account", "value": "SERVICE_ACCOUNT"}
                    ],
                    # ... other query parameters ...
                },
                "labels": {"bqux_job_id_prefix": "CUSTOM_JOB_ID_PREFIX"},
            },
            "jobReference": {
                "projectId": project,
                "jobId": job_id,  # Use the generated job ID here
            },
        }

        # Make the API request
        response = requests.post(url, headers=headers, json=data)

        # Handle the response
        if response.status_code == 200:
            logging.info("Query job successfully created.")
        else:
            logging.error(f"Error creating query job: {response.text}")

    except Exception as e:
        logging.error(
            f"Error processing log entry or retrying query: {e}", exc_info=True
        )

    logging.info("Cloud Function finished.")

¿Qué sigue?