Se usó la API de Cloud Translation para traducir esta página.
Switch to English

Modo de flexibilidad mejorada de Dataproc

El modo de flexibilidad mejorada (EFM) de Dataproc administra los datos aleatorios para minimizar las demoras en el progreso del trabajo causadas por la eliminación de nodos de un clúster en ejecución. EFM descarga datos aleatorios en uno de dos modos que el usuario puede seleccionar:

  1. Reproducción aleatoria de trabajadores principales Los asignadores escriben datos en trabajadores principales. Los trabajadores extraen esos nodos remotos durante la fase de reducción. Este modo solo está disponible para los trabajos de Spark y se recomienda.

  2. HCFS (Sistema de archivos compatible con Hadoop) Los asignadores escriben datos en una implementación de HCFS (HDFS de forma predeterminada). Al igual que con el modo de trabajador principal, solo los trabajadores principales participan en implementaciones de HDFS y HCFS (si HCFS Shuffle usa el conector de Cloud Storage, los datos se almacenan fuera del clúster). Este modo puede beneficiar a los trabajos con pequeñas cantidades de datos, pero, debido a limitaciones de escalamiento, no se recomienda para trabajos más grandes.

Debido a que ambos modos EFM no almacenan datos aleatorios intermedios en trabajadores secundarios, EFM es adecuado en clústeres que usan VM interrumpibles o solo realizan ajustes la escala automática al grupo de trabajadores secundarios.

Limitaciones:

  • El modo de flexibilidad mejorada es compatible con clústeres que ejecutan versiones con imágenes 1.4 - 1.5.
  • Los trabajos de Apache Hadoop YARN que no admiten la reubicación de AppMaster pueden fallar en el modo de flexibilidad mejorada (consulta Cuándo esperar a que finalicen AppMasters).
  • El modo de flexibilidad mejorada no se recomienda en los siguientes casos:
    • en un clúster que solo tiene trabajadores principales.
  • El modo de flexibilidad mejorada no es compatible:
    • cuando el ajuste de escala automático principal de trabajadores está habilitado. En la mayoría de los casos, los trabajadores principales continuarán almacenando datos aleatorios que no se migren de forma automática. Al reducir el grupo de trabajadores primarios se descartan los beneficios de EFM.
    • Cuando los trabajos de Spark se ejecutan en un clúster con el retiro de servicio ordenado habilitado (consulta SPARK-20628).

Usa el modo de flexibilidad mejorada

El modo de flexibilidad mejorada se configura por motor de ejecución y se debe configurar durante la creación del clúster.

  • La implementación de Spark EFM se configura con la propiedad de clúster dataproc:efm.spark.shuffle. Valores de propiedad válidos:

    • primary-worker para la redistribución aleatoria de trabajadores (recomendado)
    • hcfs para la reproducción aleatoria basada en HCFS
  • La implementación de Hadoop MapReduce se configura con la propiedad del clúster dataproc:efm.mapreduce.shuffle. Valores de propiedad válidos:

    • hcfs

Ejemplo: Crea un clúster con la redistribución aleatoria de trabajadores principales para la redistribución aleatoria de Spark y HCFS para MapReduce:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --properties=dataproc:efm.mapreduce.shuffle=hcfs \
    --image-version=1.4 \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Ejemplo de Apache Spark

  1. Ejecuta un trabajo de WordCount con texto público de Shakespeare mediante el jar de ejemplos de Spark en el clúster EFM.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Ejemplo de MapReduce de Apache Hadoop

  1. Ejecuta un trabajo de teragen pequeño para generar datos de entrada en Cloud Storage para un trabajo de terasort más tarde mediante el jar de ejemplos de MapReduce en el clúster de EFM.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
    

  2. Ejecuta un trabajo de terasort con los datos

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

Configura SSD locales para Shuffle de trabajadores principales

Las implementaciones aleatorias de trabajador principal y HDFS escriben datos aleatorios de intermediarios en discos adjuntos de VM y se benefician de la capacidad de procesamiento y las IOPS adicionales que ofrecen los SSD locales. Para facilitar la asignación de recursos, orienta un objetivo de aproximadamente 1 partición de SSD local por 4 CPU virtuales cuando configures las máquinas de trabajador principal.

Para adjuntar SSD locales, pasa la marca --num-worker-local-ssds al comando gcloud dataproc clusters create.

Proporción de trabajadores secundarios

Dado que los trabajadores secundarios escriben sus datos aleatorios en HDFS, tu clúster debe contener una cantidad suficiente de trabajadores principales para que puedan adaptarse a la carga aleatoria de tu trabajo. Para realizar un ajuste de escala automático en clústeres, para evitar que el grupo principal escale y genere un comportamiento no deseado, establece minInstances en el valor maxInstances en la política de ajuste de escala automático{101. } para el grupo de trabajadores principal.

Cambia el tamaño del grupo de trabajadores principales

El grupo de trabajadores principal se puede escalar de forma segura, pero reducir la escala del grupo de trabajadores principales puede tener un impacto negativo en el progreso del trabajo. Las operaciones que reducen la escala del grupo de trabajadores principales deben usar el retiro de servicio ordenado, que se habilita mediante la configuración de la marca --graceful-decommission-timeout.

Clústeres con ajuste de escala automático: el escalamiento del grupo de trabajadores principales está inhabilitado en los clústeres de EFM con políticas de ajuste de escala automático. Para cambiar el tamaño del grupo de trabajadores principal en un clúster con ajuste de escala automático, sigue estos pasos:

  1. Inhabilita el ajuste de escala automático.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Escala el grupo principal.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Vuelve a habilitar el ajuste de escala automático:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Supervisa el uso de disco de trabajadores principales

Los trabajadores principales deben tener suficiente espacio en disco para los datos aleatorios del clúster. Puedes supervisar esta capacidad mediante la métrica remaining HDFS capacity. A medida que el disco local se llena, el espacio deja de estar disponible para HDFS y la capacidad restante disminuye.

De forma predeterminada, cuando el uso del disco local de un trabajador principal supere el 90% de capacidad, el nodo se marcará como UNHEALTHY en la IU del nodo YARN. Si tienes problemas de capacidad de disco, puedes borrar los datos no utilizados de HDFS o escalar el grupo de trabajadores principal.

Configuración avanzada

Partición y paralelismo

Cuando envíes un trabajo de MapReduce o Spark, configura un nivel adecuado de partición. Decidir la cantidad de particiones de entrada y salida para una etapa de redistribución implica una compensación entre las diferentes características de rendimiento. Es mejor experimentar con valores que funcionan en las formas de tu trabajo.

Particiones de entrada

La partición de entrada de MapReduce y Spark se determina mediante el conjunto de datos de entrada. Cuando se leen archivos desde Cloud Storage, cada tarea procesa aproximadamente un “tamaño de bloque” de datos.

  • Para los trabajos de Spark SQL, el tamaño máximo de la partición está controlado por spark.sql.files.maxPartitionBytes. Considera aumentarlo a 1 GB: spark.sql.files.maxPartitionBytes=1073741824.

  • Para los trabajos de MapReduce y RDD de Spark, por lo general, el tamaño de la partición se controla con fs.gs.block.size, cuyo valor predeterminado es 128 MB. Considera aumentarlo a 1 GB. También puedes establecer propiedades específicas de InputFormat, como mapreduce.input.fileinputformat.split.minsize y mapreduce.input.fileinputformat.split.maxsize.

    • Para trabajos MapReduce: --properties fs.gs.block.size=1073741824
    • Para RRDD de Spark: --properties spark.hadoop.fs.gs.block.size=1073741824

Particiones de salida

El número de tareas en etapas posteriores se controla mediante varias propiedades. En trabajos más grandes que procesan más de 1 TB, considera contar con al menos 1 GB por partición.

  • En los trabajos de MapReduce, el número de particiones de salida se controla mediante mapreduce.job.reduces.

  • Para Spark SQL, la cantidad de particiones de salida se controla mediante spark.sql.shuffle.partitions.

  • Para los trabajos de Spark con la API de RDD, puedes especificar la cantidad de particiones de salida o configurar spark.default.parallelism.

Ajuste de Shuffle para trabajadores principales

La propiedad más importante es --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Ten en cuenta que esta es una propiedad YARN a nivel de clúster porque el servidor de Spark Shuffle se ejecuta como parte de Node Manager. Se establece de manera predeterminada dos veces (2x) de núcleos en la máquina (por ejemplo, 16 subprocesos en n1-highmem-8). Si el valor de "Tiempo de bloqueo de lectura aleatoria" es mayor a 1 segundo y los trabajadores principales no alcanzaron los límites de red o disco, considera aumentar la cantidad de subprocesos de servidor de redistribución.

En tipos de máquinas más grandes, considera aumentar spark.shuffle.io.numConnectionsPerPeer, que es 1. (Por ejemplo, configúralo en 5 conexiones por par de hosts).

Aumenta los reintentos

La cantidad máxima de intentos permitidos para las instancias principales, las tareas y las etapas se puede establecer mediante la configuración de las siguientes propiedades:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Debido a que las instancias principales y las tareas de la app se terminan con más frecuencia en clústeres que usan muchas VM interrumpibles o en el ajuste de escala automático sin retiro de servicio ordenado, aumentar los valores de las propiedades anteriores en esos clústeres puede ayudar (ten en cuenta que usar EFM con Spark y el retiro de servicio ordenado no es compatible).

Configura HDFS para la suffle HCFS

Para mejorar el rendimiento de las mezclas grandes, puedes reducir la contención de bloqueo en NameNode si configuras dfs.namenode.fslock.fair=false. Ten en cuenta que esto corre el riesgo de descartar solicitudes individuales, pero puede mejorar la capacidad de procesamiento en todo el clúster. Para mejorar aún más el rendimiento de NameNode, puedes conectar SSD locales al nodo principal mediante la configuración --num-master-local-ssds. También puedes agregar SSD locales a los trabajadores principales para mejorar el rendimiento de DataNode si configuras --num-worker-local-ssds.

Otros sistemas de archivos compatibles con Hadoop para el Shuffle HCFS

De forma predeterminada, los datos aleatorios de EFM HCFS se escriben en HDFS, pero puedes usar cualquier sistema de archivos compatible con Hadoop (HCFS). Por ejemplo, puedes decidir escribir Shuffle en Cloud Storage o en HDFS de un clúster diferente. Para especificar un sistema de archivos, puedes apuntar fs.defaultFS al sistema de archivos de destino cuando envías un trabajo a tu clúster.

Retiro de servicio ordenado de YARN en clústeres EFM

retiro de servicio ordenado de YARN se puede usar para quitar nodos rápidamente con un impacto mínimo en las aplicaciones en ejecución. Para los clústeres con ajuste de escala automático, se puede configurar el tiempo de espera del retiro de servicio ordenado en una AutoscalingPolicy que esté conectada al clúster de EFM.

Mejoras de MapReduce EFM para el retiro de servicio ordenado

  1. Dado que los datos intermedios se almacenan en un sistema de archivos distribuido, los nodos se pueden quitar de un clúster de EFM cuando todos los contenedores que se ejecutan en esos nodos hayan finalizado. Por otra parte, los nodos no se quitan en los clústeres de Dataproc estándar hasta que la aplicación haya finalizado.

  2. La eliminación del nodo no espera a que finalicen las instancias principales que se ejecutan en un nodo. Cuando el contenedor principal de la app finaliza, se reprograma en otro nodo que no se está retire de retiro. El progreso del trabajo no se pierde: la nueva instancia principal de la app recupera rápidamente el estado de la instancia principal anterior mediante la lectura del historial de trabajos.

Usa el retiro de servicio ordenado en un clúster de EFM con MapReduce

  1. Crea un clúster EFM con la misma cantidad de trabajadores principales y secundarios.

    gcloud dataproc clusters create cluster-name \
        --properties=dataproc:efm.mapreduce.shuffle=hcfs \
        --image-version=1.4 \
        --region=region \
        --num-workers=5 \
        --num-secondary-workers=5
    

  2. Ejecuta un trabajo de MapReduce que calcule el valor de pi con ejemplos jar de mapreduce en el clúster.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    

  3. Mientras se ejecuta el trabajo, reduzca la escala del clúster mediante un retiro de servicio ordenado.

    gcloud dataproc clusters update cluster-name \
        --region=region \
        --num-secondary-workers=0 \
        --graceful-decommission-timeout=1h
    
    Los nodos se quitarán del clúster con rapidez antes de que el trabajo finalice, y se minimiza la pérdida del progreso del trabajo. Las pausas temporales en el progreso del trabajo pueden ocurrir debido a lo siguiente:

    • Conmutación por error de la instancia principal de la app. Si el progreso del trabajo se reduce a un 0% y, luego, se sube de inmediato al valor previo, la instancia principal de la app podría haber finalizado y una nueva instancia principal de app recuperó su estado. Esto no debería afectar de manera significativa el progreso del trabajo, ya que la conmutación por error ocurre con rapidez.
    • interrupción de VM Debido a que HDFS solo conserva los resultados completos, no parciales, de la tarea de asignación, las caídas temporales en el progreso del trabajo pueden ocurrir cuando una VM se interrumpe mientras trabaja en una tarea de asignación.