Puedes instalar componentes adicionales cuando creas un clúster de Dataproc con la función de componentes opcionales. En esta página, se describe el componente Flink.
El componente Flink de Dataproc instala Apache Flink en un clúster de Dataproc.
Instala el componente
Instala el componente cuando crees un clúster de Dataproc. El componente Flink de Dataproc se puede instalar en clústeres creados con la imagen versión 1.5 de Dataproc o una versión posterior.
La versión de la imagen de clúster determina la versión del componente Flink instalado en el clúster (por ejemplo, consulta las versiones del componente Apache Flink enumeradas para los cuatro últimas versiones de actualización de imágenes 2.0.x y las cuatro anteriores). Consulta las versiones compatibles de Dataproc para la versión del componente incluida en cada versión de la imagen de Dataproc.
Comando de gcloud
Para crear un clúster de Dataproc que incluya el componente Flink, usa el comando gcloud dataproc clusters create cluster-name con la marca --optional-components
.
gcloud dataproc clusters create cluster-name \ --optional-components=FLINK \ --region=region \ --enable-component-gateway \ --image-version=DATAPROC_IMAGE_VERSION \ ... other flags
Nota: Dado que una sesión de Flink en YARN consume recursos YARN importantes, de forma predeterminada, Dataproc no inicia una sesión de Flink cuando se inicia el clúster de Dataproc.
Puedes iniciar una sesión cuando inicies tu clúster de Flink si agregas la marca --metadata flink-start-yarn-session=true
al comando gcloud dataproc clusters create
.
API de REST
El componente Flink se puede especificar a través de la API de Dataproc mediante SoftwareConfig.Component como parte de una solicitud clusters.create. El campo SoftwareConfig.imageVersion se usa para configurar la versión con imágenes del clúster.
Console
- Habilita el componente y la puerta de enlace de componentes.
- En Cloud Console, abre la página Crear un clúster de Dataproc. Se selecciona el panel Configurar clúster.
- En la sección Control de versiones, confirma o cambia el tipo y la versión de la imagen.
- En la sección Componentes, haz lo siguiente:
- En Puerta de enlace de componentes, selecciona Habilitar puerta de enlace de componentes (consulta Visualiza y accede a las URL de la puerta de enlace de componentes).
- En Componentes opcionales, selecciona Flink y otros componentes opcionales para instalar en tu clúster.
Ejecuta trabajos de Flink
Después de que se inicie un clúster de Dataproc con Flink, SSH en el nodo principal del clúster de Dataproc y, luego, ejecuta trabajos de Flink.
Ejemplo:
Ejecuta un solo trabajo Flink. Después de aceptar el trabajo, Flink inicia un administrador de trabajos y ranuras para el trabajo en YARN. El trabajo de Flink se ejecutará en el clúster de YARN hasta que finalice. El administrador de trabajos se cierra después de que se completa el trabajo. Los registros de trabajos están disponibles en los registros YARN.
flink run -m yarn-cluster /usr/lib/flink/examples/batch/WordCount.jar
Ejemplo:
Inicia una sesión de Flink en YARN de larga duración y, luego, ejecuta un trabajo.
Inicia la sesión en el nodo principal del clúster de Dataproc.
Como alternativa, puedes iniciar una sesión de Flink en YARN cuando crees el clúster de Flink mediante la marca gcloud dataproc clusters create --metadata flink-start-yarn-session=true
.
. /usr/bin/flink-yarn-daemon
Toma nota del host y del puerto en FLINK_MASTER_URL
después de que la sesión se inicie con éxito. Reemplaza JOB_MANAGER_HOSTNAME
y REST_API_PORT
en el siguiente comando por esos elementos. Ejecute el trabajo:
HADOOP_CLASSPATH=`hadoop classpath`
flink run -m JOB_MANAGER_HOSTNAME:REST_API_PORT /usr/lib/flink/examples/batch/WordCount.jar
Para detener la sesión, reemplaza APPLICATION_ID
por el ID de la aplicación asociada con la sesión de Flink en YARN que se encuentra en la IU del administrador de trabajo de Flink o desde el resultado de yarn application -list
. Luego, ejecuta lo siguiente:
yarn application -kill APPLICATION_ID
Ejecuta trabajos de Apache Beam
Puedes ejecutar trabajos de Apache Beam en Dataproc mediante FlinkRunner
.
Puedes ejecutar trabajos de Beam en Flink de las siguientes maneras:
- Trabajos de Java Beam
- Trabajos de Portable Beam
Trabajos de Java Beam
Empaqueta tus trabajos de Beam en un archivo JAR. Proporciona el archivo JAR empaquetado con las dependencias necesarias para ejecutar el trabajo.
En el siguiente ejemplo, se ejecuta un trabajo de Java Beam desde el nodo principal del clúster de Dataproc.
Crea un clúster de Dataproc con el componente Flink habilitado.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink.--image-version
: Es la versión de la imagen del clúster, que determina la versión de Flink instalada en el clúster (por ejemplo, consulta las versiones de componente de Apache Flink enumeradas para las cuatro versiones 2.0.x más recientes y la cuatro anteriores).--region
: Es una región de Dataproc compatible.--enable-component-gateway
: Habilita el acceso a la IU del administrador de trabajo de Flink.--scopes
: Habilita el acceso a la API a los servicios de GCP en el mismo proyecto.
Establece una conexión SSH al nodo principal del clúster de Dataproc de la siguiente manera:
Inicia una sesión de Flink en YARN en el nodo principal del clúster de Dataproc.
. /usr/bin/flink-yarn-daemon
Toma nota de la versión de Flink en tu clúster de Dataproc.
flink --version
En tu máquina local, genera el ejemplo de recuento de palabras canónicos de Beam en Java.
Elige una versión de Beam compatible con la versión de Flink en tu clúster de Dataproc. Consulta la tabla Compatibilidad de versiones de Flink que enumera la compatibilidad de versiones de Beam-Flink.
Abre el archivo POM generado. Verifica la versión del ejecutor de Flink de Beam que especifica la etiqueta
<flink.artifact.name>
. Si la versión del ejecutor de Flink de Beam en el nombre del artefacto de Flink no coincide con la versión de Flink en tu clúster, actualiza el número de versión para que coincida.mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
Empaqueta el ejemplo de conteo de palabras.
mvn package -Pflink-runner
Sube el archivo uber JAR empaquetado,
word-count-beam-bundled-0.1.jar
(~135 MB) al nodo principal del clúster de Dataproc. Puedes usargsutil cp
para transferencias de archivos más rápidas a tu clúster de Dataproc desde Cloud Storage.En tu terminal local, crea un bucket de Cloud Storage y sube el archivo uber JAR.
gsutil mb BUCKET_NAME
gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
En el nodo principal de Dataproc, descarga el archivo uber JAR.
gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
Ejecutar el trabajo de Java Beam en el nodo principal del clúster de Dataproc
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
Comprueba que los resultados se hayan escrito en tu bucket de Cloud Storage.
gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
Detén la sesión de Flink en YARN.
yarn application -list
yarn application -kill APPLICATION_ID
Trabajos de Portable Beam
Para ejecutar trabajos de Beam escritos en Python, Go y otros lenguajes compatibles, puedes usar FlinkRunner
y PortableRunner
como se describe en la documentación del Ejecutor de Flink página (también consulta Hoja de ruta del marco de trabajo de portabilidad).
En el siguiente ejemplo, se ejecuta un trabajo portátil de Beam en Python desde el nodo principal del clúster de Dataproc.
Crea un clúster de Dataproc con los componentes Flink y Docker habilitados.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink y Docker.--image-version
: Es la versión de la imagen del clúster, que determina la versión de Flink instalada en el clúster (por ejemplo, consulta las versiones de componente de Apache Flink enumeradas para las cuatro versiones 2.0.x más recientes y la cuatro anteriores).--region
: Es una región de Dataproc compatible.--enable-component-gateway
: Habilita el acceso a la IU del administrador de trabajo de Flink.--scopes
: Habilita el acceso a la API a los servicios de GCP en el mismo proyecto.
Establece una conexión SSH al nodo principal del clúster de Dataproc de la siguiente manera:
Crea un bucket de Cloud Storage
gsutil mb BUCKET_NAME
Inicia una sesión de Flink en YARN en el nodo principal del clúster de Dataproc y guarda la URL de la instancia principal de Flink una vez que comience la sesión.
. /usr/bin/flink-yarn-daemon
Toma nota de la versión de Flink en tu clúster de Dataproc.
flink --version
Instala las bibliotecas de Python necesarias para el trabajo en el nodo principal del clúster de Dataproc.
Elige una versión de Beam compatible con la versión de Flink en tu clúster de Dataproc. Consulta la tabla Compatibilidad de versiones de Flink que enumera la compatibilidad de versiones de Beam-Flink.
python -m pip install apache-beam[gcp]==BEAM_VERSION
Ejecuta el ejemplo de recuento de palabras en el nodo principal del clúster de Dataproc.
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
--runner
(obligatorio):FlinkRunner
.--flink_version
(obligatorio): Versión de Flink.--flink_master
(obligatorio): Dirección de la instancia principal de Flink donde se ejecutará el trabajo.--flink_submit_uber_jar
(obligatorio): Usa el uber JAR para ejecutar el trabajo de Beam.--output
(obligatorio): En el que se debe escribir el resultado.
Comprueba que los resultados se hayan escrito en tu bucket.
gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Detén la sesión de Flink en YARN.
yarn application -list
yarn application -kill APPLICATION_ID
Ejecuta Flink en un clúster de Kerberized
El componente Flink de Dataproc admite clústeres con Kerberos. Se necesita un ticket Kerberos válido para enviar y continuar un trabajo de Flink o iniciar un clúster de Flink. De forma predeterminada, un ticket es válido por 7 días.
Accede a la IU del administrador de trabajo de Flink
La interfaz web del administrador de trabajo de Flink está disponible mientras se ejecuta un clúster de Flink o un clúster de sesión de Flink. Puedes abrir la IU del administrador de trabajos de Flink desde la aplicación de la instancia principal de la aplicación de Flink en YARN.
Para habilitar y usar el acceso a la IU, haz lo siguiente:
- Crea el clúster de Dataproc con la puerta de enlace de componentes habilitada.
- Después de la creación del clúster, haz clic en el vínculo de componentes de Gateway YARN ResourceManager en la pestaña Interfaz web en la página Detalles del clúster en Google Cloud Console.
- En la IU de Resource Manager de YARN, identifica la entrada de la aplicación del clúster de Flink. Según el estado de finalización de tu trabajo, se mostrará el vínculo ApplicationMaster o History.
- En un trabajo de transmisión de larga duración, haz clic en el vínculo ApplicationManager para abrir el panel de Flink; para un trabajo completado, haz clic en el vínculo Historial para ver los detalles del trabajo.