Componente Flink de Dataproc

Puedes instalar componentes adicionales cuando creas un clúster de Dataproc con la función de componentes opcionales. En esta página, se describe el componente Flink.

El componente Flink de Dataproc instala Apache Flink en un clúster de Dataproc.

Instala el componente

Instala el componente cuando crees un clúster de Dataproc. El componente Flink de Dataproc se puede instalar en clústeres creados con la imagen versión 1.5 de Dataproc o una versión posterior.

La versión de la imagen de clúster determina la versión del componente Flink instalado en el clúster (por ejemplo, consulta las versiones del componente Apache Flink enumeradas para los cuatro últimas versiones de actualización de imágenes 2.0.x y las cuatro anteriores). Consulta las versiones compatibles de Dataproc para la versión del componente incluida en cada versión de la imagen de Dataproc.

Comando de gcloud

Para crear un clúster de Dataproc que incluya el componente Flink, usa el comando gcloud dataproc clusters create cluster-name con la marca --optional-components.

gcloud dataproc clusters create cluster-name \
    --optional-components=FLINK \
    --region=region \
    --enable-component-gateway \
    --image-version=DATAPROC_IMAGE_VERSION \
    ... other flags

Nota: Dado que una sesión de Flink en YARN consume recursos YARN importantes, de forma predeterminada, Dataproc no inicia una sesión de Flink cuando se inicia el clúster de Dataproc. Puedes iniciar una sesión cuando inicies tu clúster de Flink si agregas la marca --metadata flink-start-yarn-session=true al comando gcloud dataproc clusters create.

API de REST

El componente Flink se puede especificar a través de la API de Dataproc mediante SoftwareConfig.Component como parte de una solicitud clusters.create. El campo SoftwareConfig.imageVersion se usa para configurar la versión con imágenes del clúster.

Console

  1. Habilita la puerta de enlace del componente y el componente.
    • En Cloud Console, abre la página Create a cluster (Crear un clúster) de Dataproc. Se selecciona el panel Configurar clúster.
    • En la sección Control de versiones, confirma o cambia el tipo y la versión de la imagen.
    • En la sección Componentes, sigue estos pasos:

Después de que se inicie un clúster de Dataproc con Flink, SSH en el nodo principal del clúster de Dataproc y, luego, ejecuta trabajos de Flink.

Ejemplo:

Ejecuta un solo trabajo Flink. Después de aceptar el trabajo, Flink inicia un administrador de trabajos y ranuras para el trabajo en YARN. El trabajo de Flink se ejecutará en el clúster de YARN hasta que finalice. El administrador de trabajos se cierra después de que se completa el trabajo. Los registros de trabajos están disponibles en los registros YARN.

flink run -m yarn-cluster /usr/lib/flink/examples/batch/WordCount.jar

Ejemplo:

Inicia una sesión de Flink en YARN de larga duración y, luego, ejecuta un trabajo.

Inicia la sesión en el nodo principal del clúster de Dataproc. Como alternativa, puedes iniciar una sesión de Flink en YARN cuando crees el clúster de Flink mediante la marca gcloud dataproc clusters create --metadata flink-start-yarn-session=true.

. /usr/bin/flink-yarn-daemon

Toma nota del host y del puerto en FLINK_MASTER_URL después de que la sesión se inicie con éxito. Reemplaza JOB_MANAGER_HOSTNAME y REST_API_PORT en el siguiente comando por esos elementos. Ejecute el trabajo:

HADOOP_CLASSPATH=`hadoop classpath`

flink run -m JOB_MANAGER_HOSTNAME:REST_API_PORT /usr/lib/flink/examples/batch/WordCount.jar

Para detener la sesión, reemplaza APPLICATION_ID por el ID de la aplicación asociada con la sesión de Flink en YARN que se encuentra en la IU del administrador de trabajo de Flink o desde el resultado de yarn application -list. Luego, ejecuta lo siguiente:

yarn application -kill APPLICATION_ID

Ejecuta trabajos de Apache Beam

Puedes ejecutar trabajos de Apache Beam en Dataproc mediante FlinkRunner.

Puedes ejecutar trabajos de Beam en Flink de las siguientes maneras:

  1. Trabajos de Java Beam
  2. Trabajos de Portable Beam

Trabajos de Java Beam

Empaqueta tus trabajos de Beam en un archivo JAR. Proporciona el archivo JAR empaquetado con las dependencias necesarias para ejecutar el trabajo.

En el siguiente ejemplo, se ejecuta un trabajo de Java Beam desde el nodo principal del clúster de Dataproc.

  1. Crea un clúster de Dataproc con el componente Flink habilitado.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: Es la versión de la imagen del clúster, que determina la versión de Flink instalada en el clúster (por ejemplo, consulta las versiones de componente de Apache Flink enumeradas para las cuatro versiones 2.0.x más recientes y la cuatro anteriores).
    • --region: Es una región de Dataproc compatible.
    • --enable-component-gateway: Habilita el acceso a la IU del administrador de trabajo de Flink.
    • --scopes: Habilita el acceso a la API a los servicios de GCP en el mismo proyecto.
  2. Establece una conexión SSH al nodo principal del clúster de Dataproc de la siguiente manera:

  3. Inicia una sesión de Flink en YARN en el nodo principal del clúster de Dataproc.

    . /usr/bin/flink-yarn-daemon
    

    Toma nota de la versión de Flink en tu clúster de Dataproc.

    flink --version
    
  4. En tu máquina local, genera el ejemplo de recuento de palabras canónicos de Beam en Java.

    Elige una versión de Beam compatible con la versión de Flink en tu clúster de Dataproc. Consulta la tabla Compatibilidad de versiones de Flink que enumera la compatibilidad de versiones de Beam-Flink.

    Abre el archivo POM generado. Verifica la versión del ejecutor de Flink de Beam que especifica la etiqueta <flink.artifact.name>. Si la versión del ejecutor de Flink de Beam en el nombre del artefacto de Flink no coincide con la versión de Flink en tu clúster, actualiza el número de versión para que coincida.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Empaqueta el ejemplo de conteo de palabras.

    mvn package -Pflink-runner
    
  6. Sube el archivo uber JAR empaquetado, word-count-beam-bundled-0.1.jar (~135 MB) al nodo principal del clúster de Dataproc. Puedes usar gsutil cp para transferencias de archivos más rápidas a tu clúster de Dataproc desde Cloud Storage.

    1. En tu terminal local, crea un bucket de Cloud Storage y sube el archivo uber JAR.

      gsutil mb BUCKET_NAME
      
      gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. En el nodo principal de Dataproc, descarga el archivo uber JAR.

      gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Ejecutar el trabajo de Java Beam en el nodo principal del clúster de Dataproc

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Comprueba que los resultados se hayan escrito en tu bucket de Cloud Storage.

    gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Detén la sesión de Flink en YARN.

    yarn application -list
    
    yarn application -kill APPLICATION_ID
    

Trabajos de Portable Beam

Para ejecutar trabajos de Beam escritos en Python, Go y otros lenguajes compatibles, puedes usar FlinkRunner y PortableRunner como se describe en la documentación del Ejecutor de Flink página (también consulta Hoja de ruta del marco de trabajo de portabilidad).

En el siguiente ejemplo, se ejecuta un trabajo portátil de Beam en Python desde el nodo principal del clúster de Dataproc.

  1. Crea un clúster de Dataproc con los componentes Flink y Docker habilitados.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink y Docker.
    • --image-version: Es la versión de la imagen del clúster, que determina la versión de Flink instalada en el clúster (por ejemplo, consulta las versiones de componente de Apache Flink enumeradas para las cuatro versiones 2.0.x más recientes y la cuatro anteriores).
    • --region: Es una región de Dataproc compatible.
    • --enable-component-gateway: Habilita el acceso a la IU del administrador de trabajo de Flink.
    • --scopes: Habilita el acceso a la API a los servicios de GCP en el mismo proyecto.
  2. Establece una conexión SSH al nodo principal del clúster de Dataproc de la siguiente manera:

  3. Crea un bucket de Cloud Storage

    gsutil mb BUCKET_NAME
    
  4. Inicia una sesión de Flink en YARN en el nodo principal del clúster de Dataproc y guarda la URL de la instancia principal de Flink una vez que comience la sesión.

    . /usr/bin/flink-yarn-daemon
    

    Toma nota de la versión de Flink en tu clúster de Dataproc.

    flink --version
    
  5. Instala las bibliotecas de Python necesarias para el trabajo en el nodo principal del clúster de Dataproc.

    Elige una versión de Beam compatible con la versión de Flink en tu clúster de Dataproc. Consulta la tabla Compatibilidad de versiones de Flink que enumera la compatibilidad de versiones de Beam-Flink.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Ejecuta el ejemplo de recuento de palabras en el nodo principal del clúster de Dataproc.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    
    • --runner (obligatorio): FlinkRunner.
    • --flink_version (obligatorio): Versión de Flink.
    • --flink_master (obligatorio): Dirección de la instancia principal de Flink donde se ejecutará el trabajo.
    • --flink_submit_uber_jar (obligatorio): Usa el uber JAR para ejecutar el trabajo de Beam.
    • --output (obligatorio): En el que se debe escribir el resultado.
  7. Comprueba que los resultados se hayan escrito en tu bucket.

    gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Detén la sesión de Flink en YARN.

    yarn application -list
    
    yarn application -kill APPLICATION_ID
    

El componente Flink de Dataproc admite clústeres con Kerberos. Se necesita un ticket Kerberos válido para enviar y continuar un trabajo de Flink o iniciar un clúster de Flink. De forma predeterminada, un ticket es válido por 7 días.

La interfaz web del administrador de trabajo de Flink está disponible mientras se ejecuta un clúster de Flink o un clúster de sesión de Flink. Puedes abrir la IU del administrador de trabajos de Flink desde la aplicación de la instancia principal de la aplicación de Flink en YARN.

Para habilitar y usar el acceso a la IU, haz lo siguiente:

  1. Crea el clúster de Dataproc con la puerta de enlace de componentes habilitada.
  2. Después de la creación del clúster, haz clic en el vínculo ResourceManager de YARN de la puerta de enlace de componentes en la pestaña de interfaz web en la página Detalles del clúster en Google Cloud Console.
  3. En la IU de Resource Manager de YARN, identifica la entrada de la aplicación del clúster de Flink. Según el estado de finalización de tu trabajo, se mostrará el vínculo ApplicationMaster o History.
  4. En un trabajo de transmisión de larga duración, haz clic en el vínculo ApplicationManager para abrir el panel de Flink; para un trabajo completado, haz clic en el vínculo Historial para ver los detalles del trabajo.