Componente opcional de Dataproc de Flink

Organiza tus páginas con colecciones Guarda y categoriza el contenido según tus preferencias.

Puedes instalar componentes adicionales, como Flink, cuando creas un clúster de Dataproc con la función de componentes opcionales. En esta página, se describe el componente Flink.

El componente Flink de Dataproc instala Apache Flink en un clúster de Dataproc.

Instala el componente

Instala el componente cuando crees un clúster de Dataproc. El componente de Dataproc Flink se puede instalar en clústeres creados con la imagen versión 1.5 o posterior de Dataproc.

La versión de imagen del clúster determina la versión del componente de Flink instalado en el clúster. Consulta Versiones de Dataproc compatibles para ver una lista de las versiones de componentes incluidas en cada actualización de imagen de Dataproc.

Recomendación: Usa un clúster de VM principal de 1 instancia principal con el componente de Flink. Los clústeres del modo de alta disponibilidad de Dataproc (con 3 VM principales) no admiten el modo de alta disponibilidad de Flink.

Comando de gcloud

Para crear un clúster de Dataproc que incluya el componente Flink, usa el comando gcloud dataproc clusters create cluster-name con la marca --optional-components.

gcloud dataproc clusters create cluster-name \
    --optional-components=FLINK \
    --region=region \
    --enable-component-gateway \
    --image-version=DATAPROC_IMAGE_VERSION \
    --scopes=https://www.googleapis.com/auth/cloud-platform  \
    ... other flags
Notas:
  • La marca --enable-component-gateway habilita el acceso a la IU de Flink Job Manager.
  • La marca --scopes=https://www.googleapis.com/auth/cloud-platform habilita el acceso a la API a los servicios de Cloud Platform en el proyecto.
  • Puedes agregar el --metadata flink-start-yarn-session=true opcional para ejecutar el daemon de Flink YARN (/usr/bin/flink-yarn-daemon) en segundo plano en el clúster a fin de iniciar una sesión de Flink YARN (consulta Modo de sesión de Flink).
    gcloud dataproc clusters create cluster-name \
        --optional-components=FLINK \
        --region=region \
        --enable-component-gateway \
        --image-version=DATAPROC_IMAGE_VERSION \
        --scopes=https://www.googleapis.com/auth/cloud-platform  \
        --metadata flink-start-yarn-session=true \
        ... other flags
    

API de REST

El componente Flink se puede especificar a través de la API de Dataproc mediante SoftwareConfig.Component como parte de una solicitud clusters.create.

Configura EndpointConfig.enableHttpPortAccess en true para habilitar la conexión a la IU de Flink Job Manager.

Consola

  1. Habilita el componente y la puerta de enlace de componentes.
    • En Google Cloud Console, abre la página de Dataproc Crear un clúster de Dataproc en Compute Engine. El panel Configurar clúster está configurado.
    • En la sección Control de versiones, confirma o cambia el Tipo y la versión de la imagen.
    • En la sección Componentes (Components), sigue estos pasos:
      • En Puerta de enlace de componentes, selecciona Habilitar puerta de enlace de componentes.
      • En Componentes opcionales, selecciona Flink y otros componentes opcionales para instalar en el clúster.

Para configurar las propiedades de Flink:

  1. Usa una acción de inicialización para actualizar las propiedades predeterminadas de Flink en /etc/flink/conf/flink-conf.yaml.

  2. Especifica las propiedades de Flink con marcas de línea de comandos cuando envíes un trabajo de Flink o inicies una sesión de Flink.

Establecer ruta de clase

  1. Inicializa la ruta de clase de Hadoop desde una ventana de terminal de SSH en la VM principal del clúster de Flink:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    

Puedes ejecutar Flink en diferentes modos de implementación en YARN: aplicación, por trabajo o modo de sesión.

Modo de aplicación

El modo de aplicación de Flink es compatible con la imagen de Dataproc 2.0 y versiones posteriores. Este modo ejecuta el método main() del trabajo en el administrador de trabajos de YARN. El clúster se cierra después de que finaliza el trabajo.

Ejemplo de envío de trabajo:

flink run-application \
    -t yarn-application \
   -Djobmanager.memory.process.size=1024m \
   -Dtaskmanager.memory.process.size=2048m \
   -Djobmanager.heap.mb=820 \
   -Dtaskmanager.heap.mb=1640 \
   -Dtaskmanager.numberOfTaskSlots=2 \
   -Dparallelism.default=4 \
   /usr/lib/flink/examples/batch/WordCount.jar

Enumera el trabajo en ejecución:

./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY

Cancela el trabajo en ejecución:

./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>

Modo por trabajo

Este modo de Flink ejecuta el método main() del trabajo en el cliente.

Ejemplo de envío de trabajo:

flink run \
    -m yarn-cluster \
    -p 4 \
    -ys 2 \
    -yjm 1024m \
    -ytm 2048m \
    /usr/lib/flink/examples/batch/WordCount.jar

Modo de sesión

Inicia una sesión de YARN de Flink de larga duración y envía uno o más trabajos a la sesión.

Puede iniciar una sesión de Flink de una de las siguientes maneras:

  1. Después de crear el clúster de Flink, ejecuta la secuencia de comandos yarn-session.sh de Flink, que está preinstalada en la VM principal del clúster, con la configuración personalizada:

    Ejemplo con configuración personalizada:

    /usr/lib/flink/bin/yarn-session.sh \
      -s 1 \
      -jm 1024m \
      -tm 2048m \
      -nm flink-dataproc \
      --detached
     ```
    
  2. Después de crear el clúster de Flink, ejecuta la secuencia de comandos wrapper /usr/bin/flink-yarn-daemon con la configuración predeterminada:

    . /usr/bin/flink-yarn-daemon
    
  3. Crea un clúster de Flink y agrega la marca --metadata flink-start-yarn-session=true al comando gcloud dataproc clusters create. Con esta marca habilitada, después de crear el clúster, Dataproc ejecuta /usr/bin/flink-yarn-daemon para iniciar una sesión de Flink en el clúster.

El ID de aplicación YARN de la sesión se guarda en /tmp/.yarn-properties-${USER}. Puedes enumerar el ID con el comando yarn application -list.

Enviar un trabajo a la sesión

Cuando inicias una sesión de Flink, el resultado del comando muestra una URL (incluidos el host y el puerto) de la VM de la instancia principal de Flink en la que se ejecutan los trabajos.

Otra forma de ver la URL principal de Flink: El siguiente resultado del comando muestra una URL de la instancia principal de Flink en el campo Tracking-URL:

yarn application -list -appId=<yarn-app-id> | sed 's#http://##'`

Ejecute el siguiente comando para enviar un trabajo de Flink a la sesión. Reemplaza FLINK_MASTER_URL por la URL principal de Flink después de quitar el http:// prefix.

flink run -m FLINK_MASTER_URL /usr/lib/flink/examples/batch/WordCount.jar

Enumerar trabajos en una sesión

Para enumerar los trabajos de Flink en una sesión, realiza una de las siguientes acciones:

  • Ejecuta flink list sin argumentos. El comando busca el ID de la aplicación YARN de la sesión en /tmp/.yarn-properties-${USER}.

  • Ejecuta flink list -yid YARN_APPLICATION_ID. Ejecuta /tmp/.yarn-properties-${USER} o yarn application -list para obtener el ID de aplicación YARN.

  • Ejecuta flink list -m FLINK_MASTER_URL.

Detener una sesión

Para detener la sesión, obtén el ID de la aplicación YARN de la sesión de /tmp/.yarn-properties-${USER} o el resultado de yarn application -list y, luego, ejecuta cualquiera de los siguientes comandos:

echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID

Ejecuta trabajos de Apache Beam

Puedes ejecutar trabajos de Apache Beam en Dataproc mediante FlinkRunner.

Puedes ejecutar trabajos de Beam en Flink de las siguientes maneras:

  1. Trabajos de Java Beam
  2. Trabajos de Portable Beam

Trabajos de Java Beam

Empaqueta tus trabajos de Beam en un archivo JAR. Proporciona el archivo JAR empaquetado con las dependencias necesarias para ejecutar el trabajo.

En el siguiente ejemplo, se ejecuta un trabajo de Java Beam desde el nodo principal del clúster de Dataproc.

  1. Crea un clúster de Dataproc con el componente Flink habilitado.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: Es la versión de la imagen del clúster, que determina la versión de Flink instalada en el clúster (por ejemplo, consulta las versiones de componente de Apache Flink enumeradas para las cuatro versiones 2.0.x más recientes y la cuatro anteriores).
    • --region: Es una región de Dataproc compatible.
    • --enable-component-gateway: Habilita el acceso a la IU del administrador de trabajo de Flink.
    • --scopes: Habilita el acceso a la API para los servicios de Cloud Platform en el proyecto.
  2. Usa la utilidad SSH para abrir una ventana de la terminal en el nodo principal del clúster de FLink.

  3. Inicia una sesión de YARN de Flink en el nodo principal del clúster de Dataproc.

    . /usr/bin/flink-yarn-daemon
    

    Toma nota de la versión de Flink en tu clúster de Dataproc.

    flink --version
    
  4. En tu máquina local, genera el ejemplo de recuento de palabras canónicos de Beam en Java.

    Elige una versión de Beam compatible con la versión de Flink en tu clúster de Dataproc. Consulta la tabla Compatibilidad de versiones de Flink que enumera la compatibilidad de versiones de Beam-Flink.

    Abre el archivo POM generado. Verifica la versión del ejecutor de Flink de Beam que especifica la etiqueta <flink.artifact.name>. Si la versión del ejecutor de Flink de Beam en el nombre del artefacto de Flink no coincide con la versión de Flink en tu clúster, actualiza el número de versión para que coincida.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Empaqueta el ejemplo de conteo de palabras.

    mvn package -Pflink-runner
    
  6. Sube el archivo uber JAR empaquetado, word-count-beam-bundled-0.1.jar (~135 MB) al nodo principal del clúster de Dataproc. Puedes usar gsutil cp para transferencias de archivos más rápidas a tu clúster de Dataproc desde Cloud Storage.

    1. En tu terminal local, crea un bucket de Cloud Storage y sube el archivo uber JAR.

      gsutil mb BUCKET_NAME
      
      gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. En el nodo principal de Dataproc, descarga el archivo uber JAR.

      gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Ejecutar el trabajo de Java Beam en el nodo principal del clúster de Dataproc

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Comprueba que los resultados se hayan escrito en tu bucket de Cloud Storage.

    gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Detén la sesión de Flink en YARN.

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

Trabajos de Portable Beam

Para ejecutar trabajos de Beam escritos en Python, Go y otros lenguajes compatibles, puedes usar FlinkRunner y PortableRunner como se describe en la documentación del Ejecutor de Flink página (también consulta Hoja de ruta del marco de trabajo de portabilidad).

En el siguiente ejemplo, se ejecuta un trabajo portátil de Beam en Python desde el nodo principal del clúster de Dataproc.

  1. Crea un clúster de Dataproc con los componentes Flink y Docker habilitados.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    Notas:

    • --optional-components: Flink y Docker.
    • --image-version: La versión de la imagen del clúster, que determina la versión de Flink instalada en el clúster (por ejemplo, consulta las versiones de componentes de Apache Flink que se enumeran para las últimas versiones de actualización de la imagen 2.0.x anteriores y anteriores).
    • --region: Es una región de Dataproc disponible.
    • --enable-component-gateway: Habilita el acceso a la IU del administrador de trabajos de Flink.
    • --scopes: Habilita el acceso a la API para los servicios de Cloud Platform en el proyecto.
  2. Usa gsutil de forma local o en Cloud Shell para crear un depósito de Cloud Storage. Especificarás el BUCKET_NAME cuando ejecutes un programa de ejemplo de recuento de palabras.

    gsutil mb BUCKET_NAME
    
  3. En una ventana de la terminal en la VM del clúster, inicia una sesión de Flink YARN. Ten en cuenta la URL principal de Flink, la dirección de la instancia principal de Flink donde se ejecutan los trabajos. Especificarás el FLINK_MASTER_URL cuando ejecutes un programa de ejemplo de recuento de palabras.

    . /usr/bin/flink-yarn-daemon
    

    Muestra y observa la versión de Flink que ejecuta el clúster de Dataproc. Especificarás el FLINK_VERSION cuando ejecutes un programa de ejemplo de recuento de palabras.

    flink --version
    
  4. Instala las bibliotecas de Python necesarias para el trabajo en el nodo principal del clúster.

  5. Instala una versión de Beam compatible con la versión de Flink del clúster.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Ejecuta el ejemplo de recuento de palabras en el nodo principal del clúster.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    Notas:

    • --runner: FlinkRunner.
    • --flink_version: FLINK_VERSION, anotada anteriormente.
    • --flink_master: FLINK_MASTER_URL, anotada anteriormente.
    • --flink_submit_uber_jar: Usa el JAR uber para ejecutar el trabajo de Beam.
    • --output: BUCKET_NAME, creada anteriormente.
  7. Verifique que se hayan escrito los resultados en su bucket.

    gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Detén la sesión de Flink en YARN.

    1. Obtén el ID de aplicación.
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

El componente Flink de Dataproc admite clústeres con Kerberos. Se necesita un ticket de Kerberos válido para enviar y conservar un trabajo de Flink o iniciar un clúster de Flink. De forma predeterminada, un ticket de Kerberos permanece válido por siete días.

La interfaz web del administrador de trabajo de Flink está disponible mientras se ejecuta un clúster de Flink o un clúster de sesión de Flink. Para utilizar la interfaz web

  1. Crea un clúster de Dataproc con la puerta de enlace de componentes habilitada.
  2. Después de crear el clúster, haz clic en Component Gateway YARN ResourceManager link en la pestaña Interfaz web de la página Cluster details (Detalles del clúster) en Google Cloud Console.
  3. En la IU de Resource Manager de YARN, identifica la entrada de la aplicación del clúster de Flink. Según el estado de finalización del trabajo, se mostrará el vínculo ApplicationMaster o History.
  4. En un trabajo de transmisión de larga duración, haz clic en el vínculo ApplicationManager para abrir el panel de Flink; para un trabajo completado, haz clic en el vínculo Historial para ver los detalles del trabajo.