Puedes activar componentes adicionales, como Flink, al crear un clúster de Dataproc mediante la función Componentes opcionales. En esta página se explica cómo crear un clúster de Dataproc con el componente opcional Apache Flink activado (un clúster de Flink) y, a continuación, ejecutar tareas de Flink en el clúster.
Puedes usar tu clúster de Flink para lo siguiente:
Ejecuta trabajos de Flink con el recurso
Jobs
de Dataproc desde la consola Google Cloud , la CLI de Google Cloud o la API de Dataproc.Ejecuta trabajos de Flink con la CLI
flink
en el nodo maestro del clúster de Flink.Ejecutar Flink en un clúster con Kerberos
Crear un clúster de Dataproc Flink
Puedes usar la Google Cloud consola, la CLI de Google Cloud o la API de Dataproc para crear un clúster de Dataproc que tenga el componente Flink activado.
Recomendación: Usa un clúster estándar de una VM principal con el componente Flink. Los clústeres en modo de alta disponibilidad de Dataproc (con 3 VMs maestras) no admiten el modo de alta disponibilidad de Flink.
Consola
Para crear un clúster de Dataproc Flink con la Google Cloud consola, sigue estos pasos:
Abre la página Crear un clúster de Dataproc en Compute Engine de Dataproc.
- El panel Configurar clúster está seleccionado.
- En la sección Control de versiones, confirme o cambie el tipo y la versión de la imagen. La versión de la imagen del clúster determina la versión del componente de Flink instalado en el clúster.
- La versión de la imagen debe ser 1.5 o superior para activar el componente Flink en el clúster. (Consulta las versiones de Dataproc compatibles para ver las listas de las versiones de los componentes incluidos en cada lanzamiento de imagen de Dataproc).
- La versión de la imagen debe ser [TBD] o una posterior para ejecutar trabajos de Flink a través de la API Jobs de Dataproc (consulta Ejecutar trabajos de Flink de Dataproc).
- En la sección Componentes:
- En Pasarela de componentes, selecciona Habilitar pasarela de componentes. Debes habilitar la pasarela de componentes para activar el enlace a la interfaz de usuario del servidor de historial de Flink. Al habilitar la pasarela de componentes, también se habilita el acceso a la interfaz web de Job Manager de Flink que se ejecuta en el clúster de Flink.
- En Componentes opcionales, selecciona Flink y otros componentes opcionales que quieras activar en tu clúster.
- En la sección Control de versiones, confirme o cambie el tipo y la versión de la imagen. La versión de la imagen del clúster determina la versión del componente de Flink instalado en el clúster.
Haz clic en el panel Personalizar clúster (opcional).
En la sección Propiedades del clúster, haz clic en Añadir propiedades para cada propiedad del clúster opcional que quieras añadir al clúster. Puede añadir propiedades con el prefijo
flink
para configurar propiedades de Flink en/etc/flink/conf/flink-conf.yaml
que actúen como valores predeterminados para las aplicaciones de Flink que ejecute en el clúster.Ejemplos:
- Define
flink:historyserver.archive.fs.dir
para especificar la ubicación de Cloud Storage en la que se escribirán los archivos del historial de trabajos de Flink (el servidor del historial de Flink que se ejecuta en el clúster de Flink usará esta ubicación). - Define los slots de tareas de Flink con
flink:taskmanager.numberOfTaskSlots=n
.
- Define
En la sección Metadatos personalizados del clúster, haz clic en Añadir metadatos para añadir metadatos opcionales. Por ejemplo, añade
flink-start-yarn-session
true
para ejecutar el daemon de Flink YARN (/usr/bin/flink-yarn-daemon
) en segundo plano en el nodo maestro del clúster para iniciar una sesión de Flink YARN (consulta Modo de sesión de Flink).
Si usas la versión 2.0 o una anterior de la imagen de Dataproc, haz clic en el panel Gestionar seguridad (opcional) y, a continuación, en Acceso al proyecto, selecciona
Enables the cloud-platform scope for this cluster
. El ámbitocloud-platform
está habilitado de forma predeterminada cuando creas un clúster que usa la versión de imagen 2.1 de Dataproc o una posterior.
- El panel Configurar clúster está seleccionado.
Haz clic en Crear para que se genere el clúster.
gcloud
Para crear un clúster de Dataproc Flink con gcloud CLI, ejecuta el siguiente comando gcloud dataproc clusters create de forma local en una ventana de terminal o en Cloud Shell:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=DATAPROC_IMAGE_VERSION \ --optional-components=FLINK \ --enable-component-gateway \ --properties=PROPERTIES ... other flags
Notas:
- CLUSTER_NAME: especifica el nombre del clúster.
- REGION: especifica una región de Compute Engine en la que se ubicará el clúster.
DATAPROC_IMAGE_VERSION: especifica de forma opcional la versión de la imagen que se va a usar en el clúster. La versión de la imagen del clúster determina la versión del componente de Flink instalado en el clúster.
La versión de la imagen debe ser 1.5 o superior para activar el componente Flink en el clúster. (Consulta las versiones de Dataproc compatibles para ver las listas de las versiones de los componentes incluidos en cada lanzamiento de imagen de Dataproc).
La versión de la imagen debe ser [TBD] o una posterior para ejecutar tareas de Flink a través de la API Jobs de Dataproc (consulta Ejecutar tareas de Flink de Dataproc).
--optional-components
: debes especificar el componenteFLINK
para ejecutar trabajos de Flink y el servicio web Flink HistoryServer en el clúster.--enable-component-gateway
: Debes habilitar Component Gateway para activar el enlace de Component Gateway a la interfaz de usuario de Flink History Server. Si habilitas la pasarela de componentes, también podrás acceder a la interfaz web de Job Manager de Flink que se ejecuta en el clúster de Flink.PROPERTIES. Opcionalmente, especifique una o varias propiedades del clúster.
Al crear clústeres de Dataproc con versiones de imagen
2.0.67
+ y2.1.15
+, puedes usar la marca--properties
para configurar propiedades de Flink en/etc/flink/conf/flink-conf.yaml
que actuarán como valores predeterminados para las aplicaciones de Flink que ejecutes en el clúster.Puedes definir
flink:historyserver.archive.fs.dir
para especificar la ubicación de Cloud Storage en la que se escribirán los archivos del historial de tareas de Flink (el servidor del historial de Flink que se ejecuta en el clúster de Flink usará esta ubicación).Ejemplo con varias propiedades:
--properties=flink:historyserver.archive.fs.dir=gs://my-bucket/my-flink-cluster/completed-jobs,flink:taskmanager.numberOfTaskSlots=2
Otras marcas:
- Puedes añadir la marca opcional
--metadata flink-start-yarn-session=true
para ejecutar el daemon de Flink YARN (/usr/bin/flink-yarn-daemon
) en segundo plano en el nodo maestro del clúster para iniciar una sesión de Flink YARN (consulta Modo de sesión de Flink).
- Puedes añadir la marca opcional
Cuando se usan versiones de imagen 2.0 o anteriores, se puede añadir la marca
--scopes=https://www.googleapis.com/auth/cloud-platform
para habilitar el acceso a las APIs de Google Cloud por parte del clúster (consulta las prácticas recomendadas sobre los permisos). El ámbitocloud-platform
está habilitado de forma predeterminada cuando creas un clúster que usa la versión de imagen 2.1 de Dataproc o una posterior.
API
Para crear un clúster de Dataproc Flink con la API de Dataproc, envía una solicitud clusters.create de la siguiente manera:
Notas:
Define SoftwareConfig.Component en
FLINK
.También puedes definir
SoftwareConfig.imageVersion
para especificar la versión de la imagen que se va a usar en el clúster. La versión de la imagen del clúster determina la versión del componente de Flink instalado en el clúster.La versión de la imagen debe ser 1.5 o superior para activar el componente Flink en el clúster. (Consulta las versiones de Dataproc compatibles para ver las listas de las versiones de los componentes incluidos en cada lanzamiento de imagen de Dataproc).
La versión de la imagen debe ser [TBD] o una posterior para ejecutar tareas de Flink a través de la API Jobs de Dataproc (consulta Ejecutar tareas de Flink de Dataproc).
Define EndpointConfig.enableHttpPortAccess en
true
para habilitar el enlace Component Gateway a la interfaz de usuario de Flink History Server. Si habilitas la pasarela de componentes, también podrás acceder a la interfaz web de Job Manager de Flink que se ejecuta en el clúster de Flink.También puede definir
SoftwareConfig.properties
para especificar una o varias propiedades de clúster.- Puedes especificar propiedades de Flink que actúen como valores predeterminados para las aplicaciones de Flink que ejecutes en el clúster. Por ejemplo, puedes definir
flink:historyserver.archive.fs.dir
para especificar la ubicación de Cloud Storage en la que se escribirán los archivos del historial de tareas de Flink (el servidor del historial de Flink que se ejecuta en el clúster de Flink usará esta ubicación).
- Puedes especificar propiedades de Flink que actúen como valores predeterminados para las aplicaciones de Flink que ejecutes en el clúster. Por ejemplo, puedes definir
También puedes definir lo siguiente:
GceClusterConfig.metadata
. Por ejemplo, para especificarflink-start-yarn-session
true
para ejecutar el daemon de Flink YARN (/usr/bin/flink-yarn-daemon
) en segundo plano en el nodo maestro del clúster para iniciar una sesión de Flink YARN (consulta Modo de sesión de Flink).- GceClusterConfig.serviceAccountScopes
a
https://www.googleapis.com/auth/cloud-platform
(permisocloud-platform
) cuando se usan versiones de imagen 2.0 o anteriores para habilitar el acceso a las APIs de tu clúster (consulta las prácticas recomendadas sobre los permisos). Google Cloud El ámbitocloud-platform
está habilitado de forma predeterminada cuando creas un clúster que usa la versión de imagen 2.1 de Dataproc o una posterior.
Después de crear un clúster de Flink
- Usa el enlace
Flink History Server
de la sección Component Gateway (Pasarela de componentes) para ver el servidor del historial de Flink que se ejecuta en el clúster de Flink. - Usa
YARN ResourceManager link
en Component Gateway para ver la interfaz web de Flink Job Manager que se ejecuta en el clúster de Flink . - Crea un servidor de historial persistente de Dataproc para ver los archivos del historial de tareas de Flink escritos por clústeres de Flink que ya no existen.
Ejecutar trabajos de Flink con el recurso Jobs
de Dataproc
Puedes ejecutar tareas de Flink mediante el recurso Jobs
de Dataproc desde laGoogle Cloud consola, la CLI de Google Cloud o la API de Dataproc.
Consola
Para enviar un trabajo de recuento de palabras de Flink de ejemplo desde la consola, haz lo siguiente:
Abre la página Dataproc > Enviar un trabajo en la consola deGoogle Cloud en tu navegador.
Rellena los campos de la página Enviar un trabajo:
- Selecciona el nombre de tu clúster en la lista de clústeres.
- En Tipo de tarea, selecciona
Flink
. - Asigna el valor
org.apache.flink.examples.java.wordcount.WordCount
a Clase principal o .jar. - Define Archivos JAR en
file:///usr/lib/flink/examples/batch/WordCount.jar
.file:///
indica que el archivo se encuentra en el clúster. Dataproc ha instaladoWordCount.jar
al crear el clúster de Flink.- Este campo también acepta una ruta de Cloud Storage (
gs://BUCKET/JARFILE
) o una ruta de Hadoop Distributed File System (HDFS) (hdfs://PATH_TO_JAR
).
Haz clic en Enviar.
- La salida del controlador de tareas se muestra en la página Detalles de la tarea.
- Los trabajos de Flink se muestran en la página Trabajos de Dataproc de la Google Cloud consola.
- Haz clic en Detener o Eliminar en la página Tareas o Detalles de la tarea para detener o eliminar una tarea.
gcloud
Para enviar una tarea de Flink a un clúster de Flink de Dataproc, ejecuta el comando de la CLI de gcloud gcloud dataproc jobs submit de forma local en una ventana de terminal o en Cloud Shell.
gcloud dataproc jobs submit flink \ --cluster=CLUSTER_NAME \ --region=REGION \ --class=MAIN_CLASS \ --jar=JAR_FILE \ -- JOB_ARGS
Notas:
- CLUSTER_NAME: especifica el nombre del clúster de Dataproc Flink al que se enviará el trabajo.
- REGION: especifica una región de Compute Engine en la que se encuentra el clúster.
- MAIN_CLASS: especifica la clase
main
de tu aplicación Flink, como:org.apache.flink.examples.java.wordcount.WordCount
- JAR_FILE: especifica el archivo JAR de la aplicación Flink. Puedes especificar lo siguiente:
- Un archivo JAR instalado en el clúster, con el prefijo
file:///` :
file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
file:///usr/lib/flink/examples/batch/WordCount.jar
- Un archivo JAR en Cloud Storage:
gs://BUCKET/JARFILE
- Un archivo JAR en HDFS:
hdfs://PATH_TO_JAR
- Un archivo JAR instalado en el clúster, con el prefijo
JOB_ARGS: opcionalmente, añade argumentos de trabajo después de los dos guiones (
--
).Después de enviar el trabajo, la salida del controlador de trabajo se muestra en el terminal local o de Cloud Shell.
Program execution finished Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished. Job Runtime: 13610 ms ... (after,1) (and,12) (arrows,1) (ay,1) (be,4) (bourn,1) (cast,1) (coil,1) (come,1)
REST
En esta sección se muestra cómo enviar un trabajo de Flink a un clúster de Flink de Dataproc mediante la API jobs.submit de Dataproc.
Antes de usar los datos de la solicitud, haz las siguientes sustituciones:
- PROJECT_ID: Google Cloud ID de proyecto
- REGION: región del clúster
- CLUSTER_NAME: especifica el nombre del clúster de Dataproc Flink al que se enviará el trabajo.
Método HTTP y URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit
Cuerpo JSON de la solicitud:
{ "job": { "placement": { "clusterName": "CLUSTER_NAME" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] } } }
Para enviar tu solicitud, despliega una de estas opciones:
Deberías recibir una respuesta JSON similar a la siguiente:
{ "reference": { "projectId": "PROJECT_ID", "jobId": "JOB_ID" }, "placement": { "clusterName": "CLUSTER_NAME", "clusterUuid": "CLUSTER_UUID" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "args": [ "1000" ], "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] }, "status": { "state": "PENDING", "stateStartTime": "2020-10-07T20:16:21.759Z" }, "jobUuid": "JOB_UUID" }
- Los trabajos de Flink se muestran en la página Trabajos de Dataproc de la Google Cloud consola.
- Puedes hacer clic en Detener o Eliminar en la página Tareas o Detalles de la tarea de la consola de Google Cloud para detener o eliminar una tarea.
Ejecutar tareas de Flink con la CLI de flink
En lugar de ejecutar trabajos de Flink con el recurso Jobs
de Dataproc, puedes ejecutar trabajos de Flink en el nodo maestro de tu clúster de Flink con la CLI flink
.
En las siguientes secciones se describen las diferentes formas de ejecutar un trabajo de la CLI de flink
en tu clúster de Dataproc Flink.
Accede al nodo maestro mediante SSH: usa la utilidad SSH para abrir una ventana de terminal en la VM maestra del clúster.
Define la ruta de clases: inicializa la ruta de clases de Hadoop desde la ventana de terminal SSH de la máquina virtual maestra del clúster de Flink:
export HADOOP_CLASSPATH=$(hadoop classpath)
Ejecutar trabajos de Flink: puedes ejecutar trabajos de Flink en diferentes modos de implementación en YARN: aplicación, por trabajo y sesión.
Modo de aplicación: la versión de imagen 2.0 de Dataproc y las posteriores admiten el modo de aplicación de Flink. Este modo ejecuta el método
main()
de la tarea en el gestor de tareas de YARN. El clúster se apaga cuando finaliza la tarea.Ejemplo de envío de un trabajo:
flink run-application \ -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -Djobmanager.heap.mb=820 \ -Dtaskmanager.heap.mb=1640 \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=4 \ /usr/lib/flink/examples/batch/WordCount.jar
Mostrar las tareas en ejecución:
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
Para cancelar una tarea en ejecución, haz lo siguiente:
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
Modo por trabajo: en este modo de Flink, se ejecuta el método
main()
del trabajo en el lado del cliente.Ejemplo de envío de un trabajo:
flink run \ -m yarn-cluster \ -p 4 \ -ys 2 \ -yjm 1024m \ -ytm 2048m \ /usr/lib/flink/examples/batch/WordCount.jar
Modo de sesión: inicia una sesión de Flink YARN de larga duración y, a continuación, envía uno o varios trabajos a la sesión.
Iniciar una sesión: puedes iniciar una sesión de Flink de una de las siguientes formas:
Crea un clúster de Flink añadiendo la marca
--metadata flink-start-yarn-session=true
al comandogcloud dataproc clusters create
(consulta Crear un clúster de Dataproc Flink). Si esta marca está habilitada, después de crear el clúster, Dataproc ejecuta/usr/bin/flink-yarn-daemon
para iniciar una sesión de Flink en el clúster.El ID de aplicación YARN de la sesión se guarda en
/tmp/.yarn-properties-${USER}
. Puedes consultar el ID con el comandoyarn application -list
.Ejecuta la secuencia de comandos de Flink
yarn-session.sh
preinstalada en la VM maestra del clúster con ajustes personalizados:Ejemplo con ajustes personalizados:
/usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached
Ejecuta la secuencia de comandos del envoltorio
/usr/bin/flink-yarn-daemon
de Flink con la configuración predeterminada:. /usr/bin/flink-yarn-daemon
Enviar un trabajo a una sesión: ejecuta el siguiente comando para enviar un trabajo de Flink a la sesión.
flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
- FLINK_MASTER_URL: la URL, incluido el host y el puerto, de la VM maestra de Flink en la que se ejecutan los trabajos.
Quita el
http:// prefix
de la URL. Esta URL se indica en la salida del comando cuando inicias una sesión de Flink. Puedes ejecutar el siguiente comando para mostrar esta URL en el campoTracking-URL
:
yarn application -list -appId=<yarn-app-id> | sed 's#http://##' ```
- FLINK_MASTER_URL: la URL, incluido el host y el puerto, de la VM maestra de Flink en la que se ejecutan los trabajos.
Quita el
Listar los trabajos de una sesión: para listar los trabajos de Flink de una sesión, haz una de las siguientes acciones:
Ejecuta
flink list
sin argumentos. El comando busca el ID de aplicación YARN de la sesión en/tmp/.yarn-properties-${USER}
.Obtén el ID de aplicación de YARN de la sesión desde
/tmp/.yarn-properties-${USER}
o la salida deyarn application -list
, y, a continuación, ejecuta<code>
flink list -yid YARN_APPLICATION_ID.Ejecuta
flink list -m FLINK_MASTER_URL
.
Detener una sesión: para detener la sesión, obtén el ID de aplicación de YARN de la sesión desde
/tmp/.yarn-properties-${USER}
o la salida deyarn application -list
y, a continuación, ejecuta uno de los siguientes comandos:echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID
Ejecutar tareas de Apache Beam en Flink
Puedes ejecutar tareas de Apache Beam en Dataproc con el FlinkRunner
.
Puedes ejecutar trabajos de Beam en Flink de las siguientes formas:
- Tareas de Java Beam
- Tareas de Beam portátiles
Tareas de Java Beam
Empaqueta tus trabajos de Beam en un archivo JAR. Proporciona el archivo JAR empaquetado con las dependencias necesarias para ejecutar el trabajo.
En el siguiente ejemplo, se ejecuta un trabajo de Java Beam desde el nodo maestro del clúster de Dataproc.
Crea un clúster de Dataproc con el componente Flink habilitado.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink.--image-version
: la versión de la imagen del clúster, que determina la versión de Flink instalada en el clúster (por ejemplo, consulta las versiones de los componentes de Apache Flink que se indican en las cuatro versiones de lanzamiento de la imagen 2.0.x más recientes y anteriores).--region
: una región de Dataproc compatible.--enable-component-gateway
: habilita el acceso a la interfaz de usuario de Flink Job Manager.--scopes
: habilita el acceso a las APIs por parte de tu clúster (consulta las prácticas recomendadas de los ámbitos). Google Cloud El ámbitocloud-platform
está habilitado de forma predeterminada (no es necesario incluir esta configuración de marca) al crear un clúster que use la versión 2.1 o posterior de la imagen de Dataproc.
Usa la utilidad SSH para abrir una ventana de terminal en el nodo maestro del clúster de Flink.
Inicia una sesión de Flink YARN en el nodo maestro del clúster de Dataproc.
. /usr/bin/flink-yarn-daemon
Anota la versión de Flink de tu clúster de Dataproc.
flink --version
En tu máquina local, genera el ejemplo canónico de recuento de palabras de Beam en Java.
Elige una versión de Beam que sea compatible con la versión de Flink de tu clúster de Dataproc. Consulta la tabla de compatibilidad de versiones de Flink para ver la compatibilidad de versiones de Beam y Flink.
Abre el archivo POM generado. Comprueba la versión del runner de Beam Flink especificada por la etiqueta
<flink.artifact.name>
. Si la versión del runner de Beam Flink en el nombre del artefacto de Flink no coincide con la versión de Flink de tu clúster, actualiza el número de versión para que coincida.mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
Empaqueta el ejemplo de recuento de palabras.
mvn package -Pflink-runner
Sube el archivo JAR empaquetado
word-count-beam-bundled-0.1.jar
(~135 MB) al nodo maestro de tu clúster de Dataproc. Puedes usargcloud storage cp
para transferir archivos más rápido a tu clúster de Dataproc desde Cloud Storage.En tu terminal local, crea un segmento de Cloud Storage y sube el archivo JAR uber.
gcloud storage buckets create BUCKET_NAME
gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
En el nodo maestro de Dataproc, descarga el uber JAR.
gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
Ejecuta la tarea de Java Beam en el nodo maestro del clúster de Dataproc.
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
Comprueba que los resultados se hayan escrito en tu segmento de Cloud Storage.
gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
Detén la sesión de Flink YARN.
yarn application -list
yarn application -kill YARN_APPLICATION_ID
Tareas de Beam portátiles
Para ejecutar tareas de Beam escritas en Python, Go y otros lenguajes admitidos, puedes usar FlinkRunner
y PortableRunner
, tal como se describe en la página Flink Runner de Beam (consulta también la hoja de ruta del framework de portabilidad).
En el siguiente ejemplo se ejecuta un trabajo portátil de Beam en Python desde el nodo maestro del clúster de Dataproc.
Crea un clúster de Dataproc con los componentes Flink y Docker habilitados.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
Notas:
--optional-components
: Flink y Docker.--image-version
: la versión de la imagen del clúster, que determina la versión de Flink instalada en el clúster (por ejemplo, consulta las versiones de los componentes de Apache Flink que se indican en las cuatro últimas versiones de lanzamiento de la imagen 2.0.x).--region
: Una región de Dataproc disponible.--enable-component-gateway
: habilita el acceso a la interfaz de usuario de Flink Job Manager.--scopes
: habilita el acceso a las APIs por parte de tu clúster (consulta las prácticas recomendadas sobre los ámbitos). Google Cloud El ámbitocloud-platform
está habilitado de forma predeterminada (no es necesario incluir esta configuración de marca) al crear un clúster que use la versión 2.1 o posterior de la imagen de Dataproc.
Usa la CLI gcloud de forma local o en Cloud Shell para crear un segmento de Cloud Storage. Especificarás el BUCKET_NAME cuando ejecutes un programa de recuento de palabras de ejemplo.
gcloud storage buckets create BUCKET_NAME
En una ventana de terminal de la VM del clúster, inicia una sesión de Flink YARN. Anota la URL maestra de Flink, la dirección del maestro de Flink donde se ejecutan los trabajos. Especificarás el FLINK_MASTER_URL cuando ejecutes un programa de recuento de palabras de ejemplo.
. /usr/bin/flink-yarn-daemon
Muestra y anota la versión de Flink que ejecuta el clúster de Dataproc. Especificarás el FLINK_VERSION cuando ejecutes un programa de recuento de palabras de ejemplo.
flink --version
Instala las bibliotecas de Python necesarias para el trabajo en el nodo maestro del clúster.
Instala una versión de Beam que sea compatible con la versión de Flink del clúster.
python -m pip install apache-beam[gcp]==BEAM_VERSION
Ejecuta el ejemplo de recuento de palabras en el nodo maestro del clúster.
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
Notas:
--runner
:FlinkRunner
.--flink_version
: FLINK_VERSION, como se ha indicado anteriormente.--flink_master
: FLINK_MASTER_URL, como se ha indicado anteriormente.--flink_submit_uber_jar
: usa el archivo JAR uber para ejecutar el trabajo de Beam.--output
: BUCKET_NAME, creada anteriormente.
Verifica que los resultados se hayan escrito en tu segmento.
gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Detén la sesión de Flink YARN.
- Obtén el ID de la aplicación.
yarn application -list
1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.yarn application -kill
Ejecutar Flink en un clúster con Kerberos
El componente Flink de Dataproc admite clústeres Kerberized. Se necesita un ticket de Kerberos válido para enviar y conservar una tarea de Flink o para iniciar un clúster de Flink. De forma predeterminada, un ticket de Kerberos sigue siendo válido durante siete días.
Acceder a la interfaz de usuario de Flink Job Manager
La interfaz web de Flink Job Manager está disponible mientras se ejecuta un trabajo de Flink o un clúster de sesiones de Flink. Para usar la interfaz web, sigue estos pasos:
- Crea un clúster de Dataproc Flink.
- Después de crear el clúster, haz clic en el enlace YARN ResourceManager de Component Gateway en la pestaña Interfaz web de la página Detalles del clúster de la consola de Google Cloud .
- En la interfaz de usuario YARN Resource Manager (Gestor de recursos de YARN), busca la entrada de la aplicación del clúster de Flink. En función del estado de finalización de un trabajo, se mostrará un enlace ApplicationMaster o History.
- En el caso de un trabajo de streaming de larga duración, haz clic en el enlace ApplicationManager para abrir el panel de control de Flink. En el caso de un trabajo completado, haz clic en el enlace History para ver los detalles del trabajo.