En este documento, se describe cómo habilitar las cargas de trabajo por lotes de Dataproc Serverless que se ejecutan en el nivel de precios Premium para usar la ejecución de consultas nativa, una función que puede acelerar las cargas de trabajo de Spark y reducir los costos. Esta función es compatible con las APIs de Apache Spark y no requiere código de usuario adicional.
Cuándo usar la ejecución de consultas nativas
Usa la Ejecución de consultas nativas en las siguientes situaciones:
APIs de DataFrame de Spark y consultas de Spark SQL que leen datos de archivos Parquet y ORC
Cargas de trabajo recomendadas por la
herramienta de calificación de ejecución de consultas nativas.
Cuándo no usar la ejecución de consultas nativas
No uses la Ejecución de consultas nativas en las siguientes situaciones, ya que hacerlo podría no lograr la reducción del tiempo de ejecución de la carga de trabajo por lotes y podría causar fallas o regresiones en la carga de trabajo:
Cargas de trabajo que no recomienda la herramienta de calificación de ejecución de consultas nativas.
Cargas de trabajo de RDD, UDF y AA de Spark
Cargas de trabajo de escritura
Formatos de archivo distintos de Parquet y ORC
Entradas de los siguientes tipos de datos:
Timestamp
TinyInt
Byte
Struct
Array
Map
: ORC y ParquetVarChar
Char
: ORC
Cargas de trabajo que usan un bucket de pagos del solicitante
Parámetros de configuración de Cloud Storage no predeterminados La ejecución de consultas nativas usa muchos valores predeterminados, incluso cuando se anulan.
Cómo usar la ejecución de consultas nativas con precios del nivel premium
La ejecución de consultas nativas sin servidores de Dataproc solo está disponible con cargas de trabajo por lotes que se ejecutan en el nivel de precios premium de Dataproc Serverless. Los precios del nivel Premium tienen un costo más alto que los del nivel estándar, pero no se cobra ningún cargo adicional por la Ejecución de consultas nativas. Para obtener más información, consulta Precios de Dataproc Serverless.
Puedes habilitar la asignación de recursos y los precios del nivel premium para los recursos de lotes si configuras las siguientes propiedades del nivel de asignación de recursos en premium
cuando envíes una carga de trabajo por lotes de Spark.
Puedes enviar una carga de trabajo por lotes sin servidores de Dataproc con la consola de Google Cloud, Google Cloud CLI o la API de Dataproc.
Console
En la consola de Google Cloud:
- Ve a Lotes de Dataproc.
- Haz clic en Crear para abrir la página Crear lote.
Selecciona y completa los siguientes campos para configurar el lote para la ejecución de consultas nativas:
- Contenedor:
- Versión del entorno de ejecución: Selecciona
1.2
,2.2
o, si está disponible, un número de versiónmajor.minor
superior. Consulta Versiones del entorno de ejecución de Dataproc sin servidores para Spark compatibles.
- Versión del entorno de ejecución: Selecciona
- Configuración del nivel de ejecutor y controlador:
- Selecciona
Premium
para todos los niveles (Nivel de procesamiento del controlador, Nivel de procesamiento del ejecutor).
- Selecciona
- Propiedades: Ingresa los siguientes pares
Key
(nombre de la propiedad) yValue
para las siguientes propiedades de ejecución de consultas nativas:Clave Valor spark.dataproc.runtimeEngine
nativo/nativa/indígena/aborigen
- Contenedor:
Completa, selecciona o confirma otros parámetros de configuración de las cargas de trabajo por lotes. Consulta Cómo enviar una carga de trabajo por lotes de Spark.
Haz clic en ENVIAR para ejecutar la carga de trabajo por lotes de Spark.
gcloud
Establece las siguientes marcas de comando gcloud dataproc batches submit spark
de gcloud CLI para configurar la carga de trabajo por lotes para la ejecución de consultas nativas:
gcloud dataproc batches submit spark \ --project=PROJECT_ID \ --region=REGION \ --version=VERSION \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.SparkPi \ --properties=spark.dataproc.runtimeEngine=native,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium \ OTHER_FLAGS_AS_NEEDED
Notas:
- PROJECT_ID es el ID del proyecto de Google Cloud. Los IDs de los proyectos se enumeran en la sección Información del proyecto en el panel de la consola de Google Cloud.
- REGION: Una región de Compute Engine disponible para ejecutar la carga de trabajo.
- VERSION: Especifica
1.2
,2.2
o, si está disponible, un número de versiónmajor.minor
superior. Consulta Versiones del entorno de ejecución de Spark compatibles con Dataproc sin servidor. - OTHER_FLAGS_AS_NEEDED: Consulta Cómo enviar una carga de trabajo por lotes de Spark.
API
Establece los siguientes campos de la API de Dataproc para configurar la carga de trabajo por lotes para la ejecución de consultas nativas:
- RuntimeConfig.version:
Especifica
1.2
,2.2
o, si está disponible, un número de versiónmajor.minor
superior. Consulta Versiones del entorno de ejecución de Dataproc sin servidores para Spark compatibles. RuntimeConfig.properties: Establece las siguientes propiedades de ejecución de consultas nativas:
"spark.dataproc.runtimeEngine":"native" "spark.dataproc.driver.compute.tier":"premium" "spark.dataproc.executor.compute".tier:"premium"
Notas:
- Consulta Cómo enviar una carga de trabajo por lotes de Spark para configurar otros campos de la API de la carga de trabajo por lotes.
Limitaciones
Si habilitas la ejecución de consultas nativas en las siguientes situaciones, se pueden generar excepciones, generar incompatibilidades de Spark o hacer que la carga de trabajo recurra al motor de Spark predeterminado.
Resguardos
La ejecución de consultas nativas después de la ejecución puede generar un resguardo de la carga de trabajo en el motor de ejecución de Spark, lo que genera una regresión o una falla.
ANSI: Si el modo ANSI está habilitado, la ejecución se traslada a Spark.
Modo que distingue mayúsculas de minúsculas: La ejecución de consultas nativas solo admite el modo predeterminado de Spark que no distingue mayúsculas de minúsculas. Si el modo distingue mayúsculas de minúsculas está habilitado, pueden ocurrir resultados incorrectos.
RegExp functions
: La ejecución de consultas nativas implementa funcionesregexp
, comorlike
,regexp_extract
, basadas enRE2
. En Spark, se basan enjava.util.regex
.- Análisis de contexto: La ejecución de consultas nativas no admite el patrón de búsqueda anticipada ni de búsqueda hacia atrás de RE2.
- Cuando se hace coincidir un espacio en blanco con el patrón "\s", a diferencia de
java.util.regex
, RE2 no trata "\v" ("\x0b") como espacio en blanco.
Análisis de tablas particionadas: La ejecución de consultas nativa admite el análisis de tabla particionada solo cuando la ruta de acceso contiene la información de partición. De lo contrario, la carga de trabajo recurre al motor de ejecución de Spark.
Comportamiento incompatible
Cuando se usa la ejecución de consultas nativas en los siguientes casos, se pueden producir comportamientos incompatibles o resultados incorrectos:
Funciones JSON: La ejecución de consultas nativas admite cadenas rodeadas de comillas dobles, no simples. Se producen resultados incorrectos con comillas simples. Si usas “*” en la ruta con la función
get_json_object
, se muestraNULL
.Configuración de lectura de Parquet:
- La ejecución de consultas nativas trata
spark.files.ignoreCorruptFiles
como establecido en el valor predeterminado defalse
, incluso cuando se establece entrue
. - La ejecución de consultas nativas ignora
spark.sql.parquet.datetimeRebaseModeInRead
y solo muestra el contenido del archivo Parquet. No se consideran las diferencias entre el calendario híbrido heredado (Julian Gregorian) y el calendario Gregoriano proléptico. Los resultados de Spark pueden diferir.
- La ejecución de consultas nativas trata
NaN
: No se admite. Pueden ocurrir resultados inesperados, por ejemplo, cuando se usaNaN
en una comparación numérica.Lectura de columnas de Spark: Puede producirse un error grave debido a que el vector de columnas de Spark no es compatible con la ejecución de consultas nativas.
Derrame: Cuando las particiones de shuffle se establecen en una cantidad grande, la función de volcado en el disco puede activar un
OutOfMemoryException
. Si esto ocurre, reducir la cantidad de particiones puede eliminar esta excepción.
Cómo ajustar tu carga de trabajo de ejecución de consultas nativas
La ejecución de consultas nativas sin servidor de Dataproc se puede ajustar aún más con las siguientes propiedades:
Propiedad de lote | Cuándo usar |
---|---|
spark.driver.memory spark.driver.memoryOverhead |
To tune the memory provided to spark driver process |
spark.executor.memory spark.executor.memoryOverhead spark.memory.offHeap.size |
To tune the memory provided to onheap/offheap memory of executor process |
spark.dataproc.driver.disk.tier spark.dataproc.driver.disk.size |
To configure premium disk tier and size for driver |
spark.dataproc.executor.disk.tier spark.dataproc.executor.disk.size |
To configure premium disk tier and size for executor |
Propiedades de la carga de trabajo por lotes de la ejecución de consultas nativas
spark.dataproc.runtimeEngine=native
(Obligatorio): El motor de entorno de ejecución de la carga de trabajo se debe configurar comonative
para anular el motor de entorno de ejecuciónspark
predeterminado.version
(obligatorio): La carga de trabajo debe usar la versión del entorno de ejecución de Spark 1.2.26 o posterior, 2.2.26 o posterior, o una versión principal posterior del entorno de ejecución.Niveles de procesamiento premium (obligatorio): Las propiedades
spark.dataproc.spark.driver.compute.tier
yspark.dataproc.executor.compute.tier
deben establecerse enpremium
.Niveles de disco premium (opcional): Los niveles de disco premium usan el método de ordenamiento en columnas en lugar del método de ordenamiento basado en filas para proporcionar un mejor rendimiento. Para obtener una mejor capacidad de procesamiento de E/S de reproducción aleatoria, usa los niveles de disco premium del controlador y el ejecutor con un tamaño de disco lo suficientemente grande como para admitir archivos de reproducción aleatoria.
Memoria (opcional): Si configuraste el motor de ejecución de consultas nativas sin configurar la memoria fuera del montón (
spark.memory.offHeap.size
) y la memoria dentro del montón (spark.executor.memory
), el motor de ejecución de consultas nativas toma una cantidad predeterminada de memoria4g
y la divide en una proporción6:1
entre la memoria fuera del montón y la memoria dentro del montón.Si decides configurar la memoria cuando usas la Ejecución de consultas nativas, puedes hacerlo de las siguientes maneras:
Configura solo la memoria fuera del montón (
spark.memory.offHeap.size
) con un valor especificado. La ejecución de consultas nativas usará el valor especificado como memoria fuera del montón y usará un1/7th
adicional del valor de memoria fuera del montón como memoria dentro del montón.Configura la memoria dentro del montón (
spark.executor.memory
) y fuera del montón (spark.memory.offHeap.size
). La cantidad que asignes a la memoria fuera del montón debe ser mayor que la que asignes a la memoria dentro del montón. Recomendación: Asigna memoria fuera del montón a la memoria dentro del montón en una proporción de 6:1.
Valores de muestra:
Configuración de la memoria sin ejecución de consultas nativas Configuración de memoria recomendada con la ejecución de consultas nativas spark.executor.memory
spark.memory.offHeap.size
spark.executor.memory
7 g 6G 1 g 14 g 12 g 2G 28 g 24 g 4G 56 g 48 g 8 g
Herramienta de calificación de ejecución de consultas nativas
Puedes ejecutar la herramienta de calificación de ejecución de consultas nativas de Dataproc,
run_qualification_tool.sh
, para identificar cargas de trabajo que puedan lograr tiempos de ejecución más rápidos
con la ejecución de consultas nativas. La herramienta analiza los archivos de eventos de Spark que generan las aplicaciones de cargas de trabajo por lotes y, luego, estima los posibles ahorros de tiempo de ejecución que cada aplicación de carga de trabajo puede obtener con la ejecución de consultas nativas.
Ejecuta la herramienta de calificación
Sigue estos pasos para ejecutar la herramienta en los archivos de eventos de carga de trabajo por lotes de Dataproc Serverless.
1.Copia run_qualification_tool.sh
en un directorio local que contenga los archivos de eventos de Spark para analizar.
Ejecuta la herramienta de calificación para analizar un archivo de evento o un conjunto de archivos de eventos que se encuentran en el directorio de la secuencia de comandos.
./run_qualification_tool.sh -f EVENT_FILE_PATH/EVENT_FILE_NAME \ -o CUSTOM_OUTPUT_DIRECTORY_PATH \ -k SERVICE_ACCOUNT_KEY \ -x MEMORY_ALLOCATEDg \ -t PARALLEL_THREADS_TO_RUN
Marcas y valores:
-f
(obligatorio): Consulta Ubicaciones de archivos de eventos de Spark para encontrar los archivos de eventos de la carga de trabajo de Spark.EVENT_FILE_PATH (obligatorio, a menos que se especifique EVENT_FILE_NAME): ruta de acceso del archivo de evento que se analizará Si no se proporciona, se supone que la ruta de acceso del archivo de evento es el directorio actual.
EVENT_FILE_NAME (obligatorio, a menos que se especifique EVENT_FILE_PATH): Es el nombre del archivo de evento que se analizará. Si no se proporciona, se analizan los archivos de eventos que se encuentran de forma recursiva en
EVENT_FILE_PATH
.
-o
(opcional): Si no se proporciona, la herramienta crea o usa un directoriooutput
existente en el directorio actual para colocar los archivos de salida.- CUSTOM_OUTPUT_DIRECTORY_PATH: Es la ruta de acceso del directorio de salida a los archivos de salida.
-k
(opcional):- SERVICE_ACCOUNT_KEY: Es la clave de la cuenta de servicio en formato JSON si se necesita para acceder a EVENT_FILE_PATH.
-x
(opcional):- MEMORY_ALLOCATED: Es la memoria en gigabytes que se asignará a la herramienta. De forma predeterminada, la herramienta usa el 80% de la memoria libre disponible en el sistema y todos los núcleos de la máquina disponibles.
-t
(opcional):- PARALLEL_THREADS_TO_RUN: Es N=cantidad de subprocesos paralelos que la herramienta debe ejecutar. De forma predeterminada, la herramienta ejecuta todos los núcleos.
Ejemplo de uso del comando:
./run_qualification_tool.sh -f gs://dataproc-temp-us-east1-9779/spark-job-history \ -o perfboost-output -k /keys/event-file-key -x 34g -t 5
En este ejemplo, la herramienta de calificación recorre el directorio
gs://dataproc-temp-us-east1-9779/spark-job-history
y analiza los archivos de eventos de Spark contenidos en este directorio y sus subdirectorios. El acceso al directorio se proporciona a/keys/event-file-key
. La herramienta usa34 GB memory
para la ejecución y ejecuta5
subprocesos en paralelo.
Archivos de salida de la herramienta de calificación
Una vez que se completa el análisis, la herramienta de calificación coloca los siguientes archivos de salida en un directorio perfboost-output
en el directorio actual:
AppsRecommendedForBoost.tsv
: Es una lista separada por tabulaciones de las aplicaciones recomendadas para usar con la Ejecución de consultas nativas.UnsupportedOperators.tsv
: Es una lista separada por tabulaciones de aplicaciones que no se recomiendan para usar con la Ejecución de consultas nativas.
Archivo de salida AppsRecommendedForBoost.tsv
En la siguiente tabla, se muestra el contenido de un archivo de salida AppsRecommendedForBoost.tsv
de muestra. Contiene una fila para cada aplicación analizada.
Archivo de salida AppsRecommendedForBoost.tsv
de muestra:
applicationId | applicationName | rddPercentage | unsupportedSqlPercentage | totalTaskTime | supportedTaskTime | supportedSqlPercentage | recommendedForBoost | expectedRuntimeReduction |
---|---|---|---|---|---|---|---|---|
app-2024081/batches/083f6196248043938-000 | projects/example.com:dev/locations/us-central1 6b4d6cae140f883c0 11c8e |
0.00% | 0.00% | 548924253 | 548924253 | 100.00% | TRUE | 30.00% |
app-2024081/batches/60381cab738021457-000 | projects/example.com:dev/locations/us-central1 474113a1462b426bf b3aeb |
0.00% | 0.00% | 514401703 | 514401703 | 100.00% | TRUE | 30.00% |
Descripciones de las columnas:
applicationId
: ElApplicationID
de la aplicación de Spark. Úsalo para identificar la carga de trabajo por lotes correspondiente.applicationName
: Es el nombre de la aplicación de Spark.rddPercentage
: Es el porcentaje de operaciones de RDD en la aplicación. La ejecución de consultas nativas no admite operaciones de RDD.unsupportedSqlPercentage:
Porcentaje de operaciones de SQL que no admite la ejecución de consultas nativas.totalTaskTime
: Es el tiempo de tarea acumulado de todas las tareas que se ejecutaron durante la ejecución de la aplicación.supportedTaskTime
: Es el tiempo total de la tarea que admite la ejecución de consultas nativas.
Las siguientes columnas proporcionan la información importante para ayudarte a determinar si la ejecución de consultas nativas puede beneficiar a tu carga de trabajo por lotes:
supportedSqlPercentage
: Es el porcentaje de operaciones de SQL que admite la ejecución de consultas nativas. Cuanto mayor sea el porcentaje, mayor será la reducción del entorno de ejecución que se puede lograr ejecutando la aplicación con la ejecución de consultas nativas.recommendedForBoost
: Si esTRUE
, se recomienda ejecutar la aplicación con la ejecución de consultas nativa. SirecommendedForBoost
esFALSE
, no uses la Ejecución de consultas nativas en la carga de trabajo por lotes.expectedRuntimeReduction
: Es la reducción porcentual esperada en el tiempo de ejecución de la aplicación cuando la ejecutas con la ejecución de consultas nativas.
Archivo de salida UnsupportedOperators.tsv
.
El archivo de salida UnsupportedOperators.tsv
contiene una lista de operadores que se usan en las aplicaciones de cargas de trabajo que no son compatibles con la ejecución de consultas nativas.
Cada fila del archivo de salida muestra un operador no compatible.
Descripciones de las columnas:
unsupportedOperator
: Es el nombre del operador que no es compatible con la ejecución de consultas nativas.cumulativeCpuMs
: Es la cantidad de milisegundos de CPU que se consumen durante la ejecución del operador. Este valor refleja la importancia relativa del operador en la aplicación.count
: Es la cantidad de veces que se usa el operador en la aplicación.
Ejecuta la herramienta de calificación en todos los proyectos
En esta sección, se proporcionan instrucciones para ejecutar una secuencia de comandos que ejecute la herramienta de calificación para analizar cargas de trabajo por lotes de archivos de eventos de Spark de varios proyectos.
Requisitos y limitaciones de la secuencia de comandos:
- Ejecuta la secuencia de comandos en máquinas de Linux:
- La versión
>=11
de Java debe estar instalada como la versión predeterminada.
- La versión
- Dado que los registros de Cloud Logging tienen un TTL de 30 días, no se pueden analizar los archivos de eventos de Spark de cargas de trabajo por lotes que se ejecutaron hace más de 30 días.
Para ejecutar la herramienta de calificación en todos los proyectos, sigue estos pasos.
Descarga la secuencia de comandos
list-batches-and-run-qt.sh
y cópiala en tu máquina local.Cambia los permisos de la secuencia de comandos.
chmod +x list-batches-and-run-qt.sh
Prepara una lista de archivos de entrada del proyecto para pasarla a la secuencia de comandos para su análisis. Para crear el archivo de texto, agrega una fila en el siguiente formato para cada proyecto y región con archivos de eventos de Spark de carga de trabajo por lotes que deseas analizar.
-r REGION -s START_DATE -e END_DATE -p PROJECT_ID -l LIMIT_MAX_BATCHES -k KEY_PATH
Notas:
-r
(obligatorio):- REGION: Es la región a la que se envían los lotes del proyecto.
-s
(obligatorio): Formato:yyyy-mm-dd
. Puedes agregar un segmento de tiempo00:00:00
opcional.- START_DATE: Solo se analizan las cargas de trabajo por lotes creadas después de la fecha de inicio. Los lotes se analizan en orden descendente según el momento de su creación, y los más recientes se analizan primero.
-e
(opcional): Formato:yyyy-mm-dd
. Puedes agregar un segmento de tiempo00:00:00
opcional.- END_DATE: Si especificas esta opción, solo se analizarán las cargas de trabajo por lotes creadas antes de la fecha de finalización o en la fecha de finalización. Si no se especifica, se analizan todos los lotes creados después de START_DATE. Los lotes se analizan en orden descendente según la hora de creación del lote; los lotes más recientes se analizan primero.
-l
(opcional):LIMIT_MAX_BATCHES: Es la cantidad máxima de lotes que se deben analizar. Puedes usar esta opción en combinación con START-DATE y END-DATE para analizar una cantidad limitada de lotes creados entre las fechas especificadas.
Si no se especifica -l, se analiza la cantidad predeterminada de hasta
100
lotes.
-k
(opcional):- KEY_PATH: Es una ruta de acceso local que contiene las claves de acceso de Cloud Storage para los archivos de eventos de Spark de la carga de trabajo.
Ejemplo de archivo de entrada:
-r us-central1 -s 2024-08-21 -p project1 -k key1 -r us-east1 -s 2024-08-21 -e 2024-08-23 -l 50 -p project2 -k key2
Notas:
Filas 1: Se analizarán hasta los 100 archivos de eventos de Spark (predeterminado) más recientes en
project1
en la regiónus-central1
con una hora de creación posterior a2024-08-21 00:00:00 AM
.key1
permite el acceso a los archivos de Cloud Storage.Filas 2: Se analizarán hasta los 50 archivos de eventos de Spark más recientes en
project2
en la regiónus-eastl1
con una hora de creación posterior a2024-08-21 00:00:00 AM
y anterior o el 23/08/2024 11:59:59 p.m.key2
permite el acceso a los archivos de eventos en Cloud Storage.
Ejecuta la secuencia de comandos
list-batches-and-run-qt.sh
: Los análisis se muestran en los archivosAppsRecommendedForBoost.tsv
yUnsupportedOperators.tsv
../list-batches-and-run-qt.sh PROJECT_INPUT_FILE_LIST \ -x MEMORY_ALLOCATED \ -o CUSTOM_OUTPUT_DIRECTORY_PATH \ -t PARALLEL_THREADS_TO_RUN
Notas:
PROJECT_INPUT_FILE_LIST: Consulta las instrucciones del paso 3 en la sección.
-x
,-o
y-t
: Consulta Ejecuta la herramienta de calificación.
Ubicaciones de archivos de eventos de Spark
Realiza cualquiera de los siguientes pasos para encontrar los archivos de eventos de Spark para las cargas de trabajo por lotes de Dataproc Serverless:
En Cloud Storage, busca el
spark.eventLog.dir
de la carga de trabajo y, luego, descárgalo.- Si no encuentras el
spark.eventLog.dir
, configúralo en una ubicación de Cloud Storage y, luego, vuelve a ejecutar la carga de trabajo y descarga elspark.eventLog.dir
.spark.eventLog.dir
- Si no encuentras el
Si configuraste el servidor de historial de Spark para el trabajo por lotes, haz lo siguiente:
- Ve al servidor de historial de Spark y, luego, selecciona la carga de trabajo.
- Haz clic en Descargar en la columna Registro de eventos.