Usar el ejecutor interactivo de Apache Beam con notebooks de JupyterLab te permite desarrollar canalizaciones de forma iterativa, inspeccionar el grafo de canalización y analizar PCollections individuales en un flujo de trabajo de bucle de lectura-evaluación-impresión (REPL). Para ver un instructivo en el que se muestre cómo usar el ejecutor interactivo de Apache Beam con notebooks de JupyterLab, consulta Desarrolla con notebooks de Apache Beam.
En esta página, se proporcionan detalles sobre las funciones avanzadas que puedes usar con el notebook de Apache Beam.
FlinkRunner interactivo en clústeres administrados por notebooks
Para trabajar con datos en tamaño de producción de forma interactiva desde el notebook, puedes usar FlinkRunner
con algunas opciones de canalización genéricas que le indican a la sesión del notebook que administre un clúster de Dataproc de larga duración y ejecute tus canalizaciones de Apache Beam forma distribuida.
Requisitos previos
Para usar esta función, sigue estos pasos:
- Habilita la API de Dataproc
- Otorga el rol de administrador o editor a la cuenta de servicio que ejecuta la instancia de notebook para Dataproc.
- Usa un kernel de notebook con la versión 2.40.0 o posterior del SDK de Apache Beam.
Configuración
Como mínimo, necesitas la siguiente configuración:
# Set a Cloud Storage bucket to cache source recording and PCollections.
# By default, the cache is on the notebook instance itself, but that does not
# apply to the distributed execution scenario.
ib.options.cache_root = 'gs://<BUCKET_NAME>/flink'
# Define an InteractiveRunner that uses the FlinkRunner under the hood.
interactive_flink_runner = InteractiveRunner(underlying_runner=FlinkRunner())
options = PipelineOptions()
# Instruct the notebook that Google Cloud is used to run the FlinkRunner.
cloud_options = options.view_as(GoogleCloudOptions)
cloud_options.project = 'PROJECT_ID'
Aprovisionamiento explícito (opcional)
Puedes agregar las siguientes opciones.
# Change this if the pipeline needs to run in a different region
# than the default, 'us-central1'. For example, to set it to 'us-west1':
cloud_options.region = 'us-west1'
# Explicitly provision the notebook-managed cluster.
worker_options = options.view_as(WorkerOptions)
# Provision 40 workers to run the pipeline.
worker_options.num_workers=40
# Use the default subnetwork.
worker_options.subnetwork='default'
# Choose the machine type for the workers.
worker_options.machine_type='n1-highmem-8'
# When working with non-official Apache Beam releases, such as Apache Beam built from source
# code, configure the environment to use a compatible released SDK container.
# If needed, build a custom container and use it. For more information, see:
# https://beam.apache.org/documentation/runtime/environments/
options.view_as(PortableOptions).environment_config = 'apache/beam_python3.7_sdk:2.41.0 or LOCATION.pkg.dev/PROJECT_ID/REPOSITORY/your_custom_container'
Uso
# The parallelism is applied to each step, so if your pipeline has 10 steps, you
# end up having 10 * 10 = 100 tasks scheduled, which can be run in parallel.
options.view_as(FlinkRunnerOptions).parallelism = 10
p_word_count = beam.Pipeline(interactive_flink_runner, options=options)
word_counts = (
p_word_count
| 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
| 'count' >> beam.combiners.Count.PerElement())
# The notebook session automatically starts and manages a cluster to run
# your pipelines with the FlinkRunner.
ib.show(word_counts)
# Interactively adjust the parallelism.
options.view_as(FlinkRunnerOptions).parallelism = 150
# The BigQuery read needs a Cloud Storage bucket as a temporary location.
options.view_as(GoogleCloudOptions).temp_location = ib.options.cache_root
p_bq = beam.Pipeline(runner=interactive_flink_runner, options=options)
delays_by_airline = (
p_bq
| 'Read Dataset from BigQuery' >> beam.io.ReadFromBigQuery(
project=project, use_standard_sql=True,
query=('SELECT airline, arrival_delay '
'FROM `bigquery-samples.airline_ontime_data.flights` '
'WHERE date >= "2010-01-01"'))
| 'Rebalance Data to TM Slots' >> beam.Reshuffle(num_buckets=1000)
| 'Extract Delay Info' >> beam.Map(
lambda e: (e['airline'], e['arrival_delay'] > 0))
| 'Filter Delayed' >> beam.Filter(lambda e: e[1])
| 'Count Delayed Flights Per Airline' >> beam.combiners.Count.PerKey())
# This step reuses the existing cluster.
ib.collect(delays_by_airline)
# Describe the cluster running the pipelines.
# You can access the Flink dashboard from the printed link.
ib.clusters.describe()
# Cleans up all long-lasting clusters managed by the notebook session.
ib.clusters.cleanup(force=True)
Clústeres administrados por notebooks
- De forma predeterminada, si no proporcionas ninguna opción de canalización, Apache Beam interactivo siempre vuelve a usar el clúster más reciente para ejecutar una canalización con
FlinkRunner
.- Para evitar este comportamiento, por ejemplo, para ejecutar otra canalización en la misma sesión de notebook con un FlinkRunner que no está alojado en el notebook, ejecuta
ib.clusters.set_default_cluster(None)
.
- Para evitar este comportamiento, por ejemplo, para ejecutar otra canalización en la misma sesión de notebook con un FlinkRunner que no está alojado en el notebook, ejecuta
- Cuando se crea una instancia de una canalización nueva que usa un proyecto, una región y una configuración de aprovisionamiento que se asignan a un clúster de Dataproc existente, Dataflow también vuelve a usar el clúster, aunque no sea el clúster usado más recientemente.
- Sin embargo, cada vez que se proporciona un cambio de aprovisionamiento, como cuando se cambia el tamaño de un clúster, se crea un clúster nuevo para activar el cambio deseado. Si quieres cambiar el tamaño de un clúster y evitar agotar los recursos de la nube, limpia los clústeres innecesarios mediante
ib.clusters.cleanup(pipeline)
. - Cuando se especifica un Flink
master_url
, si pertenece a un clúster que administra la sesión de notebook, Dataflow vuelve a usar el clúster administrado.- Si no se conoce la
master_url
en la sesión de notebook, significa que se desea unFlinkRunner
alojado por el usuario; el notebook no hará nada de forma implícita.
- Si no se conoce la
Soluciona problemas
En esta sección, se proporciona información para ayudarte a solucionar problemas y depurar el FlinkRunner interactivo en clústeres administrados por notebooks.
Flink IOException: Cantidad insuficiente de búferes de red
Para que sea más simple, la configuración del búfer de red de Flink no se expone a la configuración.
Si el grafo de tu trabajo es demasiado complicado o si tu paralelismo está demasiado alto, la cardinalidad de pasos multiplicada por el paralelismo puede ser demasiado grande, causar que se programen demasiadas tareas en paralelo y hacer que falle la ejecución.
Usa las siguientes sugerencias para mejorar la velocidad de las ejecuciones interactivas:
- Solo asigna la
PCollection
que deseas inspeccionar a una variable. - Inspecciona
PCollections
uno por uno. - Usa reshuffle después de una transformación alta de fan-out.
- Ajusta el paralelismo en función del tamaño de los datos (a veces más pequeño es más rápido).
Se tarda demasiado en inspeccionar los datos
Verifica el panel de Flink para el trabajo en ejecución. Es posible que veas un paso en el que cientos de tareas finalizaron y solo queda una porque los datos en tránsito residen en una sola máquina y no se combinan.
Siempre usa reshuffle después de una transformación de fanout alta, como las siguientes:
- Leer las filas de un archivo
- Leer las filas de una tabla de BigQuery
Sin la redistribución, los datos de fanout siempre se ejecutan en el mismo trabajador y no puedes aprovechar el paralelismo.
¿Cuántos trabajadores necesito?
Como regla general, el clúster de Flink tiene la cantidad de CPU virtuales multiplicada por la cantidad de ranuras de trabajador. Por ejemplo, si tienes 40 trabajadores n1-highmem-8, el clúster de Flink tiene como máximo 320 ranuras, o bien 8 multiplicados por 40.
Lo ideal es que el trabajador administre un trabajo que lea, asigne y combine con paralelismo configurado en los cientos, que programa miles de tareas en paralelo.
¿Funciona con transmisión?
Por el momento, las canalizaciones de transmisión no son compatibles con el Flink interactivo en la función de clúster administrado por notebook.
Beam SQL y beam_sql
mágico
Beam SQL te permite consultar PCollections
delimitadas y no delimitadas con instrucciones de SQL. Si trabajas en un notebook de Apache Beam, puedes usar las funciones mágicas personalizadas de IPython beam_sql
para acelerar el desarrollo de tu canalización.
Puedes verificar el uso mágico de beam_sql
con las opciones -h
o --help
:
Puedes crear un PCollection
a partir de valores constantes:
Puedes unirte a múltiples PCollections
:
Puedes iniciar un trabajo de Dataflow con las opciones -r DataflowRunner
o --runner DataflowRunner
:
Para obtener más información, consulta el notebook de ejemplo Apache Beam SQL en notebooks.
Acelera el uso del compilador JIT y la GPU
Puedes usar bibliotecas como numba y GPU para acelerar tu código de Python y las canalizaciones de Apache Beam. Para ejecutar en GPUs, la instancia de notebook de Apache Beam creada con una GPU nvidia-tesla-t4
debe compilar tu código de Python con numba.cuda.jit
. De manera opcional, para acelerar la ejecución en las CPUs, compila tu código de Python en código máquina con numba.jit
o numba.njit
.
En el siguiente ejemplo, se crea una DoFn
que procesa en las GPUs:
class Sampler(beam.DoFn):
def __init__(self, blocks=80, threads_per_block=64):
# Uses only 1 cuda grid with below config.
self.blocks = blocks
self.threads_per_block = threads_per_block
def setup(self):
import numpy as np
# An array on host as the prototype of arrays on GPU to
# hold accumulated sub count of points in the circle.
self.h_acc = np.zeros(
self.threads_per_block * self.blocks, dtype=np.float32)
def process(self, element: Tuple[int, int]):
from numba import cuda
from numba.cuda.random import create_xoroshiro128p_states
from numba.cuda.random import xoroshiro128p_uniform_float32
@cuda.jit
def gpu_monte_carlo_pi_sampler(rng_states, sub_sample_size, acc):
"""Uses GPU to sample random values and accumulates the sub count
of values within a circle of radius 1.
"""
pos = cuda.grid(1)
if pos < acc.shape[0]:
sub_acc = 0
for i in range(sub_sample_size):
x = xoroshiro128p_uniform_float32(rng_states, pos)
y = xoroshiro128p_uniform_float32(rng_states, pos)
if (x * x + y * y) <= 1.0:
sub_acc += 1
acc[pos] = sub_acc
rng_seed, sample_size = element
d_acc = cuda.to_device(self.h_acc)
sample_size_per_thread = sample_size // self.h_acc.shape[0]
rng_states = create_xoroshiro128p_states(self.h_acc.shape[0], seed=rng_seed)
gpu_monte_carlo_pi_sampler[self.blocks, self.threads_per_block](
rng_states, sample_size_per_thread, d_acc)
yield d_acc.copy_to_host()
En la siguiente imagen, se muestra el notebook que se ejecuta en una GPU:
Puedes encontrar más detalles en el notebook de ejemplo Usa GPU con Apache Beam.
Compila un contenedor personalizado
En la mayoría de los casos, si tu canalización no requiere dependencias o ejecutables de Python adicionales, Apache Beam puede usar automáticamente sus imágenes de contenedor oficiales para ejecutar el código definido por el usuario. Estas imágenes vienen con muchos módulos comunes de Python, y no tienes que compilarlos o especificarlos de forma explícita.
En algunos casos, es posible que tengas dependencias de Python adicionales o incluso dependencias de que no son de Python. En estos casos, puedes compilar un contenedor personalizado y hacer que esté disponible en el clúster de Flink para que se ejecute. En la siguiente lista, se proporcionan las ventajas de usar un contenedor personalizado:
- Tiempo de configuración más rápido para ejecuciones interactivas y consecutivas
- Dependencias y configuraciones estables
- Más flexibilidad: puedes configurar mucho más que dependencias de Python
El proceso de compilación del contenedor puede ser tedioso, pero puedes hacer todo en el notebook con el siguiente patrón de uso.
Crea un lugar de trabajo local
Primero, crea un directorio de trabajo local en el directorio principal de Jupyter.
!mkdir -p /home/jupyter/.flink
Prepara dependencias de Python
A continuación, instala todas las dependencias adicionales de Python que puedas usar y expórtalas en un archivo de requisitos.
%pip install dep_a
%pip install dep_b
...
Puedes crear un archivo de requisitos de forma explícita con el comando mágico de notebook %%writefile
.
%%writefile /home/jupyter/.flink/requirements.txt
dep_a
dep_b
...
Como alternativa, puedes inmovilizar todas las dependencias locales en un archivo de requisitos. Esta opción puede ingresar dependencias no deseadas.
%pip freeze > /home/jupyter/.flink/requirements.txt
Prepara tus dependencias que no son de Python
Copia todas las dependencias que no son de Python en el lugar de trabajo. Si no tienes dependencias que no sean de Python, omite este paso.
!cp /path/to/your-dep /home/jupyter/.flink/your-dep
...
Cree un Dockerfile
Crea un Dockerfile con el comando mágico de notebook de %%writefile
. Por ejemplo:
%%writefile /home/jupyter/.flink/Dockerfile
FROM apache/beam_python3.7_sdk:2.40.0
COPY requirements.txt /tmp/requirements.txt
COPY your_dep /tmp/your_dep
...
RUN python -m pip install -r /tmp/requirements.txt
El contenedor de ejemplo usa la imagen de la versión 2.40.0 del SDK de Apache Beam con Python 3.7 como base, agrega un archivo your_dep
y, luego, instala las dependencias adicionales de Python.
Usa este Dockerfile como plantilla y edítalo para tu caso de uso.
En las canalizaciones de Apache Beam, cuando hagas referencia a dependencias que no son de Python, usa los destinos COPY
. Por ejemplo, /tmp/your_dep
es la ruta de acceso del archivo your_dep
.
Compila una imagen de contenedor en Artifact Registry mediante Cloud Build
Habilita los servicios de Cloud Build y Artifact Registry, si aún no están habilitados.
!gcloud services enable cloudbuild.googleapis.com !gcloud services enable artifactregistry.googleapis.com
Crea un repositorio de Artifact Registry para poder subir artefactos. Cada repositorio puede contener artefactos para un formato compatible único.
Todo el contenido del repositorio se encripta con claves de Google y administradas por Google o con claves de encriptación administradas por el cliente. Artifact Registry usa claves de Google y administradas por Google de forma predeterminada y no se requiere ninguna configuración para esta opción.
Debes tener al menos el acceso de escritor de Artifact Registry al repositorio.
Ejecuta el siguiente comando para crear un repositorio nuevo. El comando usa la marca
--async
y se muestra de inmediato, sin necesidad de esperar a que se complete la operación en curso.gcloud artifacts repositories create REPOSITORY \ --repository-format=docker \ --location=LOCATION \ --async
Reemplaza los siguientes valores:
- REPOSITORY: un nombre para tu repositorio. Para la ubicación de cada repositorio en un proyecto, los nombres de los repositorios deben ser únicos.
- LOCATION: Es la ubicación de tu repositorio.
Antes de poder enviar o extraer imágenes, configura Docker para autenticar solicitudes de Artifact Registry. Para configurar la autenticación en los repositorios de Docker, ejecuta el siguiente comando:
gcloud auth configure-docker LOCATION-docker.pkg.dev
El comando actualiza tu configuración de Docker. Ahora puedes conectarte con Artifact Registry en tu proyecto de Google Cloud para enviar imágenes.
Usa Cloud Build para compilar la imagen de contenedor y guárdala en Artifact Registry.
!cd /home/jupyter/.flink \ && gcloud builds submit \ --tag LOCATION.pkg.dev/PROJECT_ID/REPOSITORY/flink:latest \ --timeout=20m
Reemplaza
PROJECT_ID
por el ID de tu proyecto:
Usar contenedores personalizados
Según el ejecutor, puedes usar contenedores personalizados para diferentes propósitos.
Para el uso general del contenedor de Apache Beam, consulta los siguientes vínculos:
Para conocer el uso del contenedor de Dataflow, consulta los siguientes vínculos:
Inhabilita direcciones IP externas
Cuando crees una instancia de notebook de Apache Beam, para aumentar la seguridad, inhabilita las direcciones IP externas. Debido a que las instancias de notebook deben descargar algunos recursos de Internet pública, como Artifact Registry, primero debes crear una red de VPC nueva sin una dirección IP externa. Luego, crea una puerta de enlace de Cloud NAT para esta red de VPC. Para obtener más información sobre Cloud NAT, consulta la documentación de Cloud NAT. Usa la red de VPC y la puerta de enlace de Cloud NAT para acceder a los recursos de Internet pública necesarios sin habilitar las direcciones IP externas.