Componente opcional de Flink de Dataproc

Puedes activar componentes adicionales como Flink cuando crees un Dataproc clúster mediante Componentes opcionales . En esta página, se muestra cómo crear un clúster de Dataproc con el componente opcional Apache Flink activado (un clúster de Flink) y, luego, ejecutar trabajos de Flink en el clúster.

Puedes usar tu clúster de Flink para hacer lo siguiente:

  1. Ejecuta trabajos de Flink con el recurso Jobs de Dataproc desde la consola de Google Cloud, Google Cloud CLI o la API de Dataproc.

  2. Ejecuta trabajos de Flink con la CLI de flink que se ejecuta en el nodo principal del clúster de Flink.

  3. Ejecuta trabajos de Apache Beam en Flink

  4. Ejecuta Flink en un clúster kerberizado

Puedes usar la consola de Google Cloud, Google Cloud CLI o la API de Dataproc para crear un clúster de Dataproc que tenga el componente Flink activado en el clúster.

Recomendación: Usa un clúster estándar de 1 VM maestra con el componente de Flink. Clústeres en modo de alta disponibilidad de Dataproc (con 3 VMs principales) no son compatibles Modo de alta disponibilidad de Flip.

Puedes ejecutar trabajos de Flink con el recurso Jobs de Dataproc desde el La consola de Google Cloud, Google Cloud CLI o la API de Dataproc.

Console

Para enviar un trabajo de muestra de conteo de palabras de Flink desde la consola, sigue estos pasos:

  1. Abre el panel de Dataproc. En la página Enviar un trabajo, la consola de Google Cloud en tu navegador.

  2. Completa los campos de la página Enviar un trabajo:

    1. En Clúster (Cluster), selecciona el nombre del clúster que quieres elegir de la lista.
    2. Establece Tipo de trabajo (Job type) en Flink.
    3. Establece Clase principal o jar (Main class or jar) en org.apache.flink.examples.java.wordcount.WordCount.
    4. Configura Archivos JAR como file:///usr/lib/flink/examples/batch/WordCount.jar.
      • file:/// denota un archivo ubicado en el clúster. Dataproc instaló WordCount.jar cuando creó el clúster de Flink.
      • Este campo también acepta una ruta de Cloud Storage (gs://BUCKET/JARFILE) o una ruta del sistema de archivos distribuido de Hadoop (HDFS) (hdfs://PATH_TO_JAR).
  3. Haz clic en Enviar.

    • El resultado del controlador del trabajo se muestra en la página Detalles del trabajo.
    • Los trabajos de Flink se enumeran en la Página Trabajos de Dataproc en la consola de Google Cloud.
    • Haz clic en Detener o Borrar en la página Trabajos o Detalles del trabajo para detener o borrar un trabajo.

gcloud

Para enviar un trabajo de Flink a un clúster de Flink de Dataproc, ejecuta el comando gcloud dataproc jobs submit de la CLI de gcloud de forma local en una ventana de terminal o en Cloud Shell.

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

Notas:

  • CLUSTER_NAME: Especifica el nombre del clúster de Flink de Dataproc al que se enviará el trabajo.
  • REGION: Especifica una región de Compute Engine dónde se encuentra el clúster.
  • MAIN_CLASS: Especifica la clase main de tu Flink, por ejemplo:
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE: Especifica el archivo JAR de la aplicación de Flink. Puedes especificar lo siguiente:
    • Un archivo JAR instalado en el clúster con file:/// prefijo:
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • Un archivo JAR en Cloud Storage tiene las siguientes características: gs://BUCKET/JARFILE
    • Un archivo jar en HDFS: hdfs://PATH_TO_JAR
  • JOB_ARGS: De manera opcional, agrega argumentos de trabajo después del guion doble (--).

  • Después de enviar el trabajo, el resultado del controlador del trabajo se muestra en el local o en la terminal de Cloud Shell.

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)

REST

En esta sección, se muestra cómo enviar un trabajo de Flink a Dataproc Flink clúster con la API de Dataproc jobs.submit.

Antes de usar cualquiera de los datos de solicitud a continuación, realiza los siguientes reemplazos:

  • PROJECT_ID: ID del proyecto de Google Cloud
  • REGION: región del clúster
  • CLUSTER_NAME: Especifica el nombre del clúster de Flink de Dataproc al que se enviará el trabajo.

Método HTTP y URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

Cuerpo JSON de la solicitud:

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

Para enviar tu solicitud, expande una de estas opciones:

Deberías recibir una respuesta JSON similar a la que se muestra a continuación:

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
  • Los trabajos de Flink aparecen en la página Trabajos de Dataproc en la consola de Google Cloud.
  • Puedes hacer clic en Detener o Borrar en la página Trabajos o Detalles del trabajo en la consola de Google Cloud para detener o borrar un trabajo.

En lugar de ejecutar trabajos de Flink con el recurso Jobs de Dataproc Puedes ejecutar trabajos de Flink en el nodo principal de tu clúster de Flink mediante la CLI de flink.

En las siguientes secciones, se describen diferentes formas en las que puedes ejecutar un trabajo de CLI flink en tu clúster de Dataproc Flink.

  1. Establece una conexión SSH al nodo principal: usa SSH. para abrir una ventana de la terminal en la VM de la instancia principal del clúster.

  2. Establece la ruta de acceso a clases: Inicializa la ruta de acceso a clases de Hadoop desde la ventana de la terminal de SSH en la VM principal del clúster de Flink:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. Ejecuta trabajos de Flink: Puedes ejecutar trabajos de Flink en diferentes modos de implementación en YARN: aplicación, por trabajo y modo de sesión.

    1. Modo de aplicación: El modo de aplicación de Flink es compatible con la versión 2.0 o posterior de la imagen de Dataproc. Este modo ejecuta el método main() del trabajo en el administrador de trabajos de YARN. El clúster se cierra una vez finalizado el trabajo.

      Ejemplo de envío de trabajos:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      Enumera los trabajos en ejecución:

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      Cancela un trabajo en ejecución:

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. Modo por trabajo: Este modo de Flink ejecuta el método main() del trabajo en el lado del cliente.

      Ejemplo de envío de trabajos:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. Modo de sesión: Inicia una sesión de Flink YARN de larga duración y, luego, envíala. uno o más trabajos a la sesión.

      1. Inicia una sesión: Puedes iniciar una sesión de Flink de una de las siguientes maneras:

        1. Crea un clúster de Flink y agrega la marca --metadata flink-start-yarn-session=true al comando gcloud dataproc clusters create (consulta Cómo crear un clúster de Flink de Dataproc). Con esta bandera después de que se crea el clúster, Dataproc ejecuta /usr/bin/flink-yarn-daemon para iniciar una sesión de Flink en el clúster.

          El ID de aplicación de YARN de la sesión se guarda en /tmp/.yarn-properties-${USER}. Puedes solicitar el ID con el comando yarn application -list.

        2. Ejecuta la secuencia de comandos yarn-session.sh de Flink, que está preinstalada en la VM principal del clúster, con la configuración personalizada:

          Ejemplo con parámetros de configuración personalizados:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. Ejecuta la secuencia de comandos del wrapper /usr/bin/flink-yarn-daemon de Flink con configuración predeterminada:

          . /usr/bin/flink-yarn-daemon
          
      2. Envía un trabajo a una sesión: Ejecuta el siguiente comando para enviar un Vincular el trabajo a la sesión

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL: Es la URL, incluido el host y el puerto, de la VM principal de Flink en la que se ejecutan las tareas. Quita http:// prefix de la URL. Esta URL se muestra en el resultado del comando cuando inicias una sesión de Flink. Puedes ejecutar el siguiente comando para mostrar esta URL En el campo Tracking-URL, haz lo siguiente:
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. Lista de trabajos en una sesión: Para enumerar los trabajos de Flink en una sesión, haz una de las siguientes acciones:

        • Ejecuta flink list sin argumentos. El comando busca el ID de aplicación de YARN de la sesión en /tmp/.yarn-properties-${USER}

        • Obtén el ID de aplicación de YARN de la sesión desde /tmp/.yarn-properties-${USER} o el resultado de yarn application -list, y, luego, ejecuta <code>flink list -yid YARN_APPLICATION_ID.

        • Ejecuta flink list -m FLINK_MASTER_URL.

      4. Detener una sesión: Para detener la sesión, obtén el ID de aplicación de YARN de la sesión desde /tmp/.yarn-properties-${USER} o el resultado de yarn application -list y, luego, ejecuta cualquiera de los siguientes comandos:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

Puedes ejecutar trabajos de Apache Beam en Dataproc mediante FlinkRunner.

Puedes ejecutar trabajos de Beam en Flink de las siguientes maneras:

  1. Trabajos de Java Beam
  2. Trabajos de Portable Beam

Trabajos de Java Beam

Empaqueta tus trabajos de Beam en un archivo JAR. Proporciona el archivo JAR empaquetado con las dependencias necesarias para ejecutar el trabajo.

En el siguiente ejemplo, se ejecuta un trabajo de Java Beam desde el nodo principal del clúster de Dataproc.

  1. Crea un clúster de Dataproc con el componente Flink habilitado.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: Es la versión de la imagen del clúster, que determina la versión de Flink instalada en el clúster (por ejemplo, consulta las versiones de componente de Apache Flink enumeradas para las cuatro versiones 2.0.x más recientes y la cuatro anteriores).
    • --region: Es una región de Dataproc compatible.
    • --enable-component-gateway: Habilita el acceso a la IU del administrador de trabajo de Flink.
    • --scopes: Habilita el acceso a las APIs de Google Cloud para tu clúster. (consulta las Prácticas recomendadas sobre permisos). El permiso cloud-platform está habilitado de forma predeterminada (no es necesario que incluyas este parámetro de configuración de marcas) cuando creas un clúster que use la versión 2.1 o una posterior de la imagen de Dataproc.
  2. Usa la utilidad SSH para abrir una ventana de la terminal en el nodo instancia principal del clúster de Flink.

  3. Inicia una sesión de Flink YARN en la instancia principal del clúster de Dataproc .

    . /usr/bin/flink-yarn-daemon
    

    Toma nota de la versión de Flink en tu clúster de Dataproc.

    flink --version
    
  4. En tu máquina local, genera el ejemplo de recuento de palabras canónicos de Beam en Java.

    Elige una versión de Beam compatible con la versión de Flink en tu clúster de Dataproc. Consulta la Compatibilidad con la versión de Flink que enumera la compatibilidad de las versiones de Beam-Flink.

    Abre el archivo POM generado. Verifica la versión del ejecutor de Flink de Beam que especifica la etiqueta <flink.artifact.name>. Si la versión del ejecutor de Flink de Beam en el nombre del artefacto de Flink no coincide con la versión de Flink en tu clúster, actualiza el número de versión para que coincida.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Empaqueta el ejemplo de conteo de palabras.

    mvn package -Pflink-runner
    
  6. Sube el archivo uber JAR empaquetado, word-count-beam-bundled-0.1.jar (~135 MB) al nodo principal del clúster de Dataproc. Puedes usar gcloud storage cp para transferencias de archivos más rápidas a tu clúster de Dataproc desde Cloud Storage.

    1. En tu terminal local, crea un bucket de Cloud Storage y sube el archivo uber JAR.

      gcloud storage buckets create BUCKET_NAME
      
      gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. En el nodo principal de Dataproc, descarga el archivo uber JAR.

      gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Ejecutar el trabajo de Java Beam en el nodo principal del clúster de Dataproc

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Comprueba que los resultados se hayan escrito en tu bucket de Cloud Storage.

    gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Detén la sesión de Flink en YARN.

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

Trabajos de Portable Beam

Para ejecutar trabajos de Beam escritos en Python, Go y otros lenguajes compatibles, puedes usar FlinkRunner y PortableRunner como se describe en la página Ejecutor de Flink de Beam (también consulta la Hoja de ruta del marco de trabajo de portabilidad).

En el siguiente ejemplo, se ejecuta un trabajo portátil de Beam en Python desde el nodo principal del clúster de Dataproc.

  1. Crea un clúster de Dataproc con los componentes Flink y Docker habilitados.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    Notas:

    • --optional-components: Flink y Docker.
    • --image-version: Es la versión de la imagen del clúster, que determina la versión de Flink instalada en el clúster (por ejemplo, consulta las versiones de componente de Apache Flink enumeradas para las cuatro versiones 2.0.x más recientes y la cuatro anteriores).
    • --region: Es una región de Dataproc disponible.
    • --enable-component-gateway: Habilita el acceso a la IU de Flink Job Manager.
    • --scopes: Habilita el acceso a las APIs de Google Cloud para tu clúster. (consulta las Prácticas recomendadas sobre permisos). El permiso cloud-platform está habilitado de forma predeterminada (no es necesario que incluyas este parámetro de configuración de marcas) cuando creas un clúster que use la versión 2.1 o una posterior de la imagen de Dataproc.
  2. Usa la CLI de gcloud de forma local o en Cloud Shell para crear un bucket de Cloud Storage. Especificarás el BUCKET_NAME cuando ejecutes un programa de muestra de conteo de palabras.

    gcloud storage buckets create BUCKET_NAME
    
  3. En una ventana de la terminal de la VM del clúster, inicia una sesión de Flink YARN. Anota la URL principal de Flink, la dirección de la instancia principal de Flink donde se ejecutan los trabajos. Especificarás el FLINK_MASTER_URL cuando ejecutar un programa de muestra de conteo de palabras.

    . /usr/bin/flink-yarn-daemon
    

    Muestra y observa la versión de Flink que ejecuta Dataproc. clúster. Especificarás el FLINK_VERSION cuando ejecutar un programa de muestra de conteo de palabras.

    flink --version
    
  4. Instala las bibliotecas de Python necesarias para el trabajo en el nodo principal del clúster.

  5. Instalar un Versión de Beam que sea compatible con la versión de Flink en el clúster.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Ejecuta el ejemplo de recuento de palabras en la instancia principal del clúster .

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    Notas:

    • --runner: FlinkRunner.
    • --flink_version: FLINK_VERSION, que se mencionó antes.
    • --flink_master: FLINK_MASTER_URL, como se indicó antes.
    • --flink_submit_uber_jar: Usa el archivo uber JAR para ejecutar el trabajo de Beam.
    • --output: BUCKET_NAME, creado antes.
  7. Verifica que los resultados se hayan escrito en tu bucket.

    gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Detén la sesión de Flink en YARN.

    1. Obtén el ID de la aplicación.
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

El componente Flink de Dataproc admite clústeres con Kerberos. Se necesita un ticket Kerberos válido para enviar y continuar un trabajo de Flink o iniciar un clúster de Flink. De forma predeterminada, los tickets de Kerberos son válidos durante siete días.

La interfaz web del administrador de trabajo de Flink está disponible mientras se ejecuta un clúster de Flink o un clúster de sesión de Flink. Para usar la interfaz web, haz lo siguiente:

  1. Crea un clúster de Dataproc Flink.
  2. Después de crear el clúster, haz clic en la Puerta de enlace del componente. Vínculo de ResourceManager de YARN en la pestaña Interfaz web de la página Detalles del clúster en la consola de Google Cloud.
  3. En la IU de Resource Manager de YARN, identifica la entrada de la aplicación del clúster de Flink. Según el estado de finalización de un trabajo, un ApplicationMaster o el vínculo Historial.
  4. En un trabajo de transmisión de larga duración, haz clic en el vínculo ApplicationManager para abrir el panel de Flink; para un trabajo completado, haz clic en el vínculo Historial para ver los detalles del trabajo.