Acelera las cargas de trabajo por lotes de Dataproc Serverless con la ejecución de consultas nativas

En este documento, se describe cómo habilitar las cargas de trabajo por lotes de Dataproc Serverless que se ejecutan en el nivel de precios Premium para usar la ejecución de consultas nativa, una función que puede acelerar las cargas de trabajo de Spark y reducir los costos. Esta función es compatible con las APIs de Apache Spark y no requiere código de usuario adicional.

Cuándo usar la ejecución de consultas nativas

Usa la Ejecución de consultas nativas en las siguientes situaciones:

APIs de DataFrame de Spark y consultas de Spark SQL que leen datos de archivos Parquet y ORC
Cargas de trabajo recomendadas por la herramienta de calificación de ejecución de consultas nativas.

Cuándo no usar la ejecución de consultas nativas

No uses la Ejecución de consultas nativas en las siguientes situaciones, ya que hacerlo podría no lograr la reducción del tiempo de ejecución de la carga de trabajo por lotes y podría causar fallas o regresiones en la carga de trabajo:

Cargas de trabajo que no recomienda la herramienta de calificación de ejecución de consultas nativas.
Cargas de trabajo de RDD, UDF y AA de Spark
Cargas de trabajo de escritura
Formatos de archivo distintos de Parquet y ORC
Entradas de los siguientes tipos de datos:

  • Timestamp
  • TinyInt
  • Byte
  • Struct
  • Array
  • Map: ORC y Parquet
  • VarChar
  • Char: ORC
Consultas que contienen expresiones regulares
Cargas de trabajo que usan un bucket de pagos del solicitante
Parámetros de configuración de Cloud Storage no predeterminados La ejecución de consultas nativas usa muchos valores predeterminados, incluso cuando se anulan.

Cómo usar la ejecución de consultas nativas con precios del nivel premium

La ejecución de consultas nativas sin servidores de Dataproc solo está disponible con cargas de trabajo por lotes que se ejecutan en el nivel de precios premium de Dataproc Serverless. Los precios del nivel Premium tienen un costo más alto que los del nivel estándar, pero no se cobra ningún cargo adicional por la Ejecución de consultas nativas. Para obtener más información, consulta Precios de Dataproc Serverless.

Puedes habilitar la asignación de recursos y los precios del nivel premium para los recursos de lotes si configuras las siguientes propiedades del nivel de asignación de recursos en premium cuando envíes una carga de trabajo por lotes de Spark.

Puedes enviar una carga de trabajo por lotes sin servidores de Dataproc con la consola de Google Cloud, Google Cloud CLI o la API de Dataproc.

Console

  1. En la consola de Google Cloud:

    1. Ve a Lotes de Dataproc.
    2. Haz clic en Crear para abrir la página Crear lote.
  2. Selecciona y completa los siguientes campos para configurar el lote para la ejecución de consultas nativas:

  3. Completa, selecciona o confirma otros parámetros de configuración de las cargas de trabajo por lotes. Consulta Cómo enviar una carga de trabajo por lotes de Spark.

  4. Haz clic en ENVIAR para ejecutar la carga de trabajo por lotes de Spark.

gcloud

Establece las siguientes marcas de comando gcloud dataproc batches submit spark de gcloud CLI para configurar la carga de trabajo por lotes para la ejecución de consultas nativas:

gcloud dataproc batches submit spark \
    --project=PROJECT_ID \
    --region=REGION \
    --version=VERSION \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.SparkPi \
    --properties=spark.dataproc.runtimeEngine=native,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium \
    OTHER_FLAGS_AS_NEEDED

Notas:

API

Establece los siguientes campos de la API de Dataproc para configurar la carga de trabajo por lotes para la ejecución de consultas nativas:

Limitaciones

Si habilitas la ejecución de consultas nativas en las siguientes situaciones, se pueden generar excepciones, generar incompatibilidades de Spark o hacer que la carga de trabajo recurra al motor de Spark predeterminado.

Resguardos

La ejecución de consultas nativas después de la ejecución puede generar un resguardo de la carga de trabajo en el motor de ejecución de Spark, lo que genera una regresión o una falla.

  • ANSI: Si el modo ANSI está habilitado, la ejecución se traslada a Spark.

  • Modo que distingue mayúsculas de minúsculas: La ejecución de consultas nativas solo admite el modo predeterminado de Spark que no distingue mayúsculas de minúsculas. Si el modo distingue mayúsculas de minúsculas está habilitado, pueden ocurrir resultados incorrectos.

  • RegExp functions: La ejecución de consultas nativas implementa funciones regexp, como rlike, regexp_extract, basadas en RE2. En Spark, se basan en java.util.regex.

    • Análisis de contexto: La ejecución de consultas nativas no admite el patrón de búsqueda anticipada ni de búsqueda hacia atrás de RE2.
    • Cuando se hace coincidir un espacio en blanco con el patrón "\s", a diferencia de java.util.regex, RE2 no trata "\v" ("\x0b") como espacio en blanco.
  • Análisis de tablas particionadas: La ejecución de consultas nativa admite el análisis de tabla particionada solo cuando la ruta de acceso contiene la información de partición. De lo contrario, la carga de trabajo recurre al motor de ejecución de Spark.

Comportamiento incompatible

Cuando se usa la ejecución de consultas nativas en los siguientes casos, se pueden producir comportamientos incompatibles o resultados incorrectos:

  • Funciones JSON: La ejecución de consultas nativas admite cadenas rodeadas de comillas dobles, no simples. Se producen resultados incorrectos con comillas simples. Si usas “*” en la ruta con la función get_json_object, se muestra NULL.

  • Configuración de lectura de Parquet:

    • La ejecución de consultas nativas trata spark.files.ignoreCorruptFiles como establecido en el valor predeterminado de false, incluso cuando se establece en true.
    • La ejecución de consultas nativas ignora spark.sql.parquet.datetimeRebaseModeInRead y solo muestra el contenido del archivo Parquet. No se consideran las diferencias entre el calendario híbrido heredado (Julian Gregorian) y el calendario Gregoriano proléptico. Los resultados de Spark pueden diferir.
  • NaN: No se admite. Pueden ocurrir resultados inesperados, por ejemplo, cuando se usa NaN en una comparación numérica.

  • Lectura de columnas de Spark: Puede producirse un error grave debido a que el vector de columnas de Spark no es compatible con la ejecución de consultas nativas.

  • Derrame: Cuando las particiones de shuffle se establecen en una cantidad grande, la función de volcado en el disco puede activar un OutOfMemoryException. Si esto ocurre, reducir la cantidad de particiones puede eliminar esta excepción.

Cómo ajustar tu carga de trabajo de ejecución de consultas nativas

La ejecución de consultas nativas sin servidor de Dataproc se puede ajustar aún más con las siguientes propiedades:

Propiedad de lote Cuándo usar
spark.driver.memory
spark.driver.memoryOverhead
To tune the memory provided to spark driver process
spark.executor.memory
spark.executor.memoryOverhead
spark.memory.offHeap.size
To tune the memory provided to onheap/offheap memory of executor process
spark.dataproc.driver.disk.tier
spark.dataproc.driver.disk.size
To configure premium disk tier and size for driver
spark.dataproc.executor.disk.tier
spark.dataproc.executor.disk.size
To configure premium disk tier and size for executor

Propiedades de la carga de trabajo por lotes de la ejecución de consultas nativas

  • spark.dataproc.runtimeEngine=native (Obligatorio): El motor de entorno de ejecución de la carga de trabajo se debe configurar como native para anular el motor de entorno de ejecución spark predeterminado.

  • version (obligatorio): La carga de trabajo debe usar la versión del entorno de ejecución de Spark 1.2.26 o posterior, 2.2.26 o posterior, o una versión principal posterior del entorno de ejecución.

  • Niveles de procesamiento premium (obligatorio): Las propiedades spark.dataproc.spark.driver.compute.tier y spark.dataproc.executor.compute.tier deben establecerse en premium.

  • Niveles de disco premium (opcional): Los niveles de disco premium usan el método de ordenamiento en columnas en lugar del método de ordenamiento basado en filas para proporcionar un mejor rendimiento. Para obtener una mejor capacidad de procesamiento de E/S de reproducción aleatoria, usa los niveles de disco premium del controlador y el ejecutor con un tamaño de disco lo suficientemente grande como para admitir archivos de reproducción aleatoria.

  • Memoria (opcional): Si configuraste el motor de ejecución de consultas nativas sin configurar la memoria fuera del montón (spark.memory.offHeap.size) y la memoria dentro del montón (spark.executor.memory), el motor de ejecución de consultas nativas toma una cantidad predeterminada de memoria 4g y la divide en una proporción 6:1 entre la memoria fuera del montón y la memoria dentro del montón.

    Si decides configurar la memoria cuando usas la Ejecución de consultas nativas, puedes hacerlo de las siguientes maneras:

    • Configura solo la memoria fuera del montón (spark.memory.offHeap.size) con un valor especificado. La ejecución de consultas nativas usará el valor especificado como memoria fuera del montón y usará un 1/7th adicional del valor de memoria fuera del montón como memoria dentro del montón.

    • Configura la memoria dentro del montón (spark.executor.memory) y fuera del montón (spark.memory.offHeap.size). La cantidad que asignes a la memoria fuera del montón debe ser mayor que la que asignes a la memoria dentro del montón. Recomendación: Asigna memoria fuera del montón a la memoria dentro del montón en una proporción de 6:1.

    Valores de muestra:

    Configuración de la memoria sin ejecución de consultas nativas Configuración de memoria recomendada con la ejecución de consultas nativas
    spark.executor.memory spark.memory.offHeap.size spark.executor.memory
    7 g 6G 1 g
    14 g 12 g 2G
    28 g 24 g 4G
    56 g 48 g 8 g

Herramienta de calificación de ejecución de consultas nativas

Puedes ejecutar la herramienta de calificación de ejecución de consultas nativas de Dataproc, run_qualification_tool.sh, para identificar cargas de trabajo que puedan lograr tiempos de ejecución más rápidos con la ejecución de consultas nativas. La herramienta analiza los archivos de eventos de Spark que generan las aplicaciones de cargas de trabajo por lotes y, luego, estima los posibles ahorros de tiempo de ejecución que cada aplicación de carga de trabajo puede obtener con la ejecución de consultas nativas.

Ejecuta la herramienta de calificación

Sigue estos pasos para ejecutar la herramienta en los archivos de eventos de carga de trabajo por lotes de Dataproc Serverless.

1.Copia run_qualification_tool.sh en un directorio local que contenga los archivos de eventos de Spark para analizar.

  1. Ejecuta la herramienta de calificación para analizar un archivo de evento o un conjunto de archivos de eventos que se encuentran en el directorio de la secuencia de comandos.

    ./run_qualification_tool.sh -f EVENT_FILE_PATH/EVENT_FILE_NAME \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -k SERVICE_ACCOUNT_KEY  \
        -x MEMORY_ALLOCATEDg  \
        -t PARALLEL_THREADS_TO_RUN
    

    Marcas y valores:

    -f (obligatorio): Consulta Ubicaciones de archivos de eventos de Spark para encontrar los archivos de eventos de la carga de trabajo de Spark.

    • EVENT_FILE_PATH (obligatorio, a menos que se especifique EVENT_FILE_NAME): ruta de acceso del archivo de evento que se analizará Si no se proporciona, se supone que la ruta de acceso del archivo de evento es el directorio actual.

    • EVENT_FILE_NAME (obligatorio, a menos que se especifique EVENT_FILE_PATH): Es el nombre del archivo de evento que se analizará. Si no se proporciona, se analizan los archivos de eventos que se encuentran de forma recursiva en EVENT_FILE_PATH.

    -o(opcional): Si no se proporciona, la herramienta crea o usa un directorio output existente en el directorio actual para colocar los archivos de salida.

    • CUSTOM_OUTPUT_DIRECTORY_PATH: Es la ruta de acceso del directorio de salida a los archivos de salida.

    -k (opcional):

    -x (opcional):

    • MEMORY_ALLOCATED: Es la memoria en gigabytes que se asignará a la herramienta. De forma predeterminada, la herramienta usa el 80% de la memoria libre disponible en el sistema y todos los núcleos de la máquina disponibles.

    -t(opcional):

    • PARALLEL_THREADS_TO_RUN: Es N=cantidad de subprocesos paralelos que la herramienta debe ejecutar. De forma predeterminada, la herramienta ejecuta todos los núcleos.

    Ejemplo de uso del comando:

    ./run_qualification_tool.sh -f gs://dataproc-temp-us-east1-9779/spark-job-history \
        -o perfboost-output -k /keys/event-file-key -x 34g -t 5
    

    En este ejemplo, la herramienta de calificación recorre el directorio gs://dataproc-temp-us-east1-9779/spark-job-history y analiza los archivos de eventos de Spark contenidos en este directorio y sus subdirectorios. El acceso al directorio se proporciona a /keys/event-file-key. La herramienta usa 34 GB memory para la ejecución y ejecuta 5 subprocesos en paralelo.

Archivos de salida de la herramienta de calificación

Una vez que se completa el análisis, la herramienta de calificación coloca los siguientes archivos de salida en un directorio perfboost-output en el directorio actual:

  • AppsRecommendedForBoost.tsv: Es una lista separada por tabulaciones de las aplicaciones recomendadas para usar con la Ejecución de consultas nativas.

  • UnsupportedOperators.tsv: Es una lista separada por tabulaciones de aplicaciones que no se recomiendan para usar con la Ejecución de consultas nativas.

Archivo de salida AppsRecommendedForBoost.tsv

En la siguiente tabla, se muestra el contenido de un archivo de salida AppsRecommendedForBoost.tsv de muestra. Contiene una fila para cada aplicación analizada.

Archivo de salida AppsRecommendedForBoost.tsv de muestra:

applicationId applicationName rddPercentage unsupportedSqlPercentage totalTaskTime supportedTaskTime supportedSqlPercentage recommendedForBoost expectedRuntimeReduction
app-2024081/batches/083f6196248043938-000 projects/example.com:dev/locations/us-central1
6b4d6cae140f883c0
11c8e
0.00% 0.00% 548924253 548924253 100.00% TRUE 30.00%
app-2024081/batches/60381cab738021457-000 projects/example.com:dev/locations/us-central1
474113a1462b426bf
b3aeb
0.00% 0.00% 514401703 514401703 100.00% TRUE 30.00%

Descripciones de las columnas:

  • applicationId: El ApplicationID de la aplicación de Spark. Úsalo para identificar la carga de trabajo por lotes correspondiente.

  • applicationName: Es el nombre de la aplicación de Spark.

  • rddPercentage: Es el porcentaje de operaciones de RDD en la aplicación. La ejecución de consultas nativas no admite operaciones de RDD.

  • unsupportedSqlPercentage: Porcentaje de operaciones de SQL que no admite la ejecución de consultas nativas.

  • totalTaskTime: Es el tiempo de tarea acumulado de todas las tareas que se ejecutaron durante la ejecución de la aplicación.

  • supportedTaskTime: Es el tiempo total de la tarea que admite la ejecución de consultas nativas.

Las siguientes columnas proporcionan la información importante para ayudarte a determinar si la ejecución de consultas nativas puede beneficiar a tu carga de trabajo por lotes:

  • supportedSqlPercentage: Es el porcentaje de operaciones de SQL que admite la ejecución de consultas nativas. Cuanto mayor sea el porcentaje, mayor será la reducción del entorno de ejecución que se puede lograr ejecutando la aplicación con la ejecución de consultas nativas.

  • recommendedForBoost: Si es TRUE, se recomienda ejecutar la aplicación con la ejecución de consultas nativa. Si recommendedForBoost es FALSE, no uses la Ejecución de consultas nativas en la carga de trabajo por lotes.

  • expectedRuntimeReduction: Es la reducción porcentual esperada en el tiempo de ejecución de la aplicación cuando la ejecutas con la ejecución de consultas nativas.

Archivo de salida UnsupportedOperators.tsv.

El archivo de salida UnsupportedOperators.tsv contiene una lista de operadores que se usan en las aplicaciones de cargas de trabajo que no son compatibles con la ejecución de consultas nativas. Cada fila del archivo de salida muestra un operador no compatible.

Descripciones de las columnas:

  • unsupportedOperator: Es el nombre del operador que no es compatible con la ejecución de consultas nativas.

  • cumulativeCpuMs: Es la cantidad de milisegundos de CPU que se consumen durante la ejecución del operador. Este valor refleja la importancia relativa del operador en la aplicación.

  • count: Es la cantidad de veces que se usa el operador en la aplicación.

Ejecuta la herramienta de calificación en todos los proyectos

En esta sección, se proporcionan instrucciones para ejecutar una secuencia de comandos que ejecute la herramienta de calificación para analizar cargas de trabajo por lotes de archivos de eventos de Spark de varios proyectos.

Requisitos y limitaciones de la secuencia de comandos:

  • Ejecuta la secuencia de comandos en máquinas de Linux:
    • La versión >=11 de Java debe estar instalada como la versión predeterminada.
  • Dado que los registros de Cloud Logging tienen un TTL de 30 días, no se pueden analizar los archivos de eventos de Spark de cargas de trabajo por lotes que se ejecutaron hace más de 30 días.

Para ejecutar la herramienta de calificación en todos los proyectos, sigue estos pasos.

  1. Descarga la secuencia de comandos list-batches-and-run-qt.sh y cópiala en tu máquina local.

  2. Cambia los permisos de la secuencia de comandos.

    chmod +x list-batches-and-run-qt.sh
    
  3. Prepara una lista de archivos de entrada del proyecto para pasarla a la secuencia de comandos para su análisis. Para crear el archivo de texto, agrega una fila en el siguiente formato para cada proyecto y región con archivos de eventos de Spark de carga de trabajo por lotes que deseas analizar.

    -r REGION -s START_DATE -e END_DATE -p PROJECT_ID -l LIMIT_MAX_BATCHES -k KEY_PATH
    

    Notas:

    -r (obligatorio):

    • REGION: Es la región a la que se envían los lotes del proyecto.

    -s (obligatorio): Formato: yyyy-mm-dd. Puedes agregar un segmento de tiempo 00:00:00 opcional.

    • START_DATE: Solo se analizan las cargas de trabajo por lotes creadas después de la fecha de inicio. Los lotes se analizan en orden descendente según el momento de su creación, y los más recientes se analizan primero.

    -e (opcional): Formato: yyyy-mm-dd. Puedes agregar un segmento de tiempo 00:00:00 opcional.

    • END_DATE: Si especificas esta opción, solo se analizarán las cargas de trabajo por lotes creadas antes de la fecha de finalización o en la fecha de finalización. Si no se especifica, se analizan todos los lotes creados después de START_DATE. Los lotes se analizan en orden descendente según la hora de creación del lote; los lotes más recientes se analizan primero.

    -l (opcional):

    • LIMIT_MAX_BATCHES: Es la cantidad máxima de lotes que se deben analizar. Puedes usar esta opción en combinación con START-DATE y END-DATE para analizar una cantidad limitada de lotes creados entre las fechas especificadas.

      Si no se especifica -l, se analiza la cantidad predeterminada de hasta 100 lotes.

    -k (opcional):

    • KEY_PATH: Es una ruta de acceso local que contiene las claves de acceso de Cloud Storage para los archivos de eventos de Spark de la carga de trabajo.

    Ejemplo de archivo de entrada:

    -r us-central1 -s 2024-08-21 -p project1 -k key1
    -r us-east1 -s 2024-08-21 -e 2024-08-23 -l 50 -p project2 -k key2
    

    Notas:

    • Filas 1: Se analizarán hasta los 100 archivos de eventos de Spark (predeterminado) más recientes en project1 en la región us-central1 con una hora de creación posterior a 2024-08-21 00:00:00 AM. key1 permite el acceso a los archivos de Cloud Storage.

    • Filas 2: Se analizarán hasta los 50 archivos de eventos de Spark más recientes en project2 en la región us-eastl1 con una hora de creación posterior a 2024-08-21 00:00:00 AM y anterior o el 23/08/2024 11:59:59 p.m. key2 permite el acceso a los archivos de eventos en Cloud Storage.

  4. Ejecuta la secuencia de comandos list-batches-and-run-qt.sh: Los análisis se muestran en los archivos AppsRecommendedForBoost.tsv y UnsupportedOperators.tsv.

    ./list-batches-and-run-qt.sh PROJECT_INPUT_FILE_LIST \
        -x MEMORY_ALLOCATED \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -t PARALLEL_THREADS_TO_RUN
    

    Notas:

Ubicaciones de archivos de eventos de Spark

Realiza cualquiera de los siguientes pasos para encontrar los archivos de eventos de Spark para las cargas de trabajo por lotes de Dataproc Serverless:

  1. En Cloud Storage, busca el spark.eventLog.dir de la carga de trabajo y, luego, descárgalo.

    1. Si no encuentras el spark.eventLog.dir, configúralo en una ubicación de Cloud Storage y, luego, vuelve a ejecutar la carga de trabajo y descarga el spark.eventLog.dir.spark.eventLog.dir
  2. Si configuraste el servidor de historial de Spark para el trabajo por lotes, haz lo siguiente:

    1. Ve al servidor de historial de Spark y, luego, selecciona la carga de trabajo.
    2. Haz clic en Descargar en la columna Registro de eventos.