Usa las funciones avanzadas del notebook de Apache Beam

Usar el ejecutor interactivo de Apache Beam con notebooks de JupyterLab te permite desarrollar canalizaciones de forma iterativa, inspeccionar el grafo de canalización y analizar PCollections individuales en un flujo de trabajo de bucle de lectura-evaluación-impresión (REPL). Para ver un instructivo en el que se muestre cómo usar el ejecutor interactivo de Apache Beam con notebooks de JupyterLab, consulta Desarrolla con notebooks de Apache Beam.

En esta página, se proporcionan detalles sobre las funciones avanzadas que puedes usar con el notebook de Apache Beam.

FlinkRunner interactivo en clústeres administrados por notebooks

Para trabajar con datos en tamaño de producción de forma interactiva desde el notebook, puedes usar FlinkRunner con algunas opciones de canalización genéricas que le indican a la sesión del notebook que administre un clúster de Dataproc de larga duración y ejecute tus canalizaciones de Apache Beam forma distribuida.

Requisitos previos

Para usar esta función, sigue estos pasos:

  • Habilita la API de Dataproc
  • Otorga el rol de administrador o editor a la cuenta de servicio que ejecuta la instancia de notebook para Dataproc.
  • Usa un kernel de notebook con la versión 2.40.0 o posterior del SDK de Apache Beam.

Configuración

Como mínimo, necesitas la siguiente configuración:

# Set a Cloud Storage bucket to cache source recording and PCollections.
# By default, the cache is on the notebook instance itself, but that does not
# apply to the distributed execution scenario.
ib.options.cache_root = 'gs://<BUCKET_NAME>/flink'

# Define an InteractiveRunner that uses the FlinkRunner under the hood.
interactive_flink_runner = InteractiveRunner(underlying_runner=FlinkRunner())

options = PipelineOptions()
# Instruct the notebook that Google Cloud is used to run the FlinkRunner.
cloud_options = options.view_as(GoogleCloudOptions)
cloud_options.project = 'PROJECT_ID'

Aprovisionamiento explícito (opcional)

Puedes agregar las siguientes opciones.

# Change this if the pipeline needs to run in a different region
# than the default, 'us-central1'. For example, to set it to 'us-west1':
cloud_options.region = 'us-west1'

# Explicitly provision the notebook-managed cluster.
worker_options = options.view_as(WorkerOptions)
# Provision 40 workers to run the pipeline.
worker_options.num_workers=40
# Use the default subnetwork.
worker_options.subnetwork='default'
# Choose the machine type for the workers.
worker_options.machine_type='n1-highmem-8'

# When working with non-official Apache Beam releases, such as Apache Beam built from source
# code, configure the environment to use a compatible released SDK container.
# If needed, build a custom container and use it. For more information, see:
# https://beam.apache.org/documentation/runtime/environments/
options.view_as(PortableOptions).environment_config = 'apache/beam_python3.7_sdk:2.41.0 or LOCATION.pkg.dev/PROJECT_ID/REPOSITORY/your_custom_container'

Uso

# The parallelism is applied to each step, so if your pipeline has 10 steps, you
# end up having 10 * 10 = 100 tasks scheduled, which can be run in parallel.
options.view_as(FlinkRunnerOptions).parallelism = 10

p_word_count = beam.Pipeline(interactive_flink_runner, options=options)
word_counts = (
    p_word_count
    | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
    | 'count' >> beam.combiners.Count.PerElement())
# The notebook session automatically starts and manages a cluster to run
# your pipelines with the FlinkRunner.
ib.show(word_counts)

# Interactively adjust the parallelism.
options.view_as(FlinkRunnerOptions).parallelism = 150
# The BigQuery read needs a Cloud Storage bucket as a temporary location.
options.view_as(GoogleCloudOptions).temp_location = ib.options.cache_root
p_bq = beam.Pipeline(runner=interactive_flink_runner, options=options)
delays_by_airline = (
    p_bq
    | 'Read Dataset from BigQuery' >> beam.io.ReadFromBigQuery(
        project=project, use_standard_sql=True,
        query=('SELECT airline, arrival_delay '
               'FROM `bigquery-samples.airline_ontime_data.flights` '
               'WHERE date >= "2010-01-01"'))
    | 'Rebalance Data to TM Slots' >> beam.Reshuffle(num_buckets=1000)
    | 'Extract Delay Info' >> beam.Map(
        lambda e: (e['airline'], e['arrival_delay'] > 0))
    | 'Filter Delayed' >> beam.Filter(lambda e: e[1])
    | 'Count Delayed Flights Per Airline' >> beam.combiners.Count.PerKey())
# This step reuses the existing cluster.
ib.collect(delays_by_airline)

# Describe the cluster running the pipelines.
# You can access the Flink dashboard from the printed link.
ib.clusters.describe()

# Cleans up all long-lasting clusters managed by the notebook session.
ib.clusters.cleanup(force=True)

Clústeres administrados por notebooks

  • De forma predeterminada, si no proporcionas ninguna opción de canalización, Apache Beam interactivo siempre vuelve a usar el clúster más reciente para ejecutar una canalización con FlinkRunner.
    • Para evitar este comportamiento, por ejemplo, a fin de ejecutar otra canalización en la misma sesión de notebook con un FlinkRunner que no aloja el notebook, ejecuta ib.clusters.set_default_cluster(None).
  • Cuando se crea una instancia de una canalización nueva que usa un proyecto, una región y una configuración de aprovisionamiento que se asignan a un clúster de Dataproc existente, Dataflow también vuelve a usar el clúster, aunque no sea el clúster usado más recientemente.
  • Sin embargo, cada vez que se proporciona un cambio de aprovisionamiento, como cuando se cambia el tamaño de un clúster, se crea un clúster nuevo para activar el cambio deseado. Si planeas cambiar el tamaño de un clúster y evitar el agotamiento de los recursos de la nube, limpia los clústeres innecesarios con ib.clusters.cleanup(pipeline).
  • Cuando se especifica un Flink master_url, si pertenece a un clúster que administra la sesión de notebook, Dataflow vuelve a usar el clúster administrado.
    • Si no se conoce la master_url en la sesión de notebook, significa que se desea un FlinkRunner alojado por el usuario; el notebook no hará nada de forma implícita.

Soluciona problemas

En esta sección, se proporciona información para ayudarte a solucionar problemas y depurar el FlinkRunner interactivo en clústeres administrados por notebooks.

Para que sea más simple, la configuración del búfer de red de Flink no se expone a la configuración.

Si el grafo de tu trabajo es demasiado complicado o si tu paralelismo está demasiado alto, la cardinalidad de pasos multiplicada por el paralelismo puede ser demasiado grande, causar que se programen demasiadas tareas en paralelo y hacer que falle la ejecución.

Usa las siguientes sugerencias para mejorar la velocidad de las ejecuciones interactivas:

  • Solo asigna la PCollection que deseas inspeccionar a una variable.
  • Inspecciona PCollections uno por uno.
  • Usa reshuffle después de una transformación alta de fan-out.
  • Ajusta el paralelismo en función del tamaño de los datos (a veces más pequeño es más rápido).

Se tarda demasiado en inspeccionar los datos

Verifica el panel de Flink para el trabajo en ejecución. Es posible que veas un paso en el que cientos de tareas finalizaron y solo queda una porque los datos en tránsito residen en una sola máquina y no se combinan.

Siempre usa reshuffle después de una transformación de fanout alta, como las siguientes:

  • Leer las filas de un archivo
  • Leer las filas de una tabla de BigQuery

Sin la redistribución, los datos de distribución siempre se ejecutan en el mismo trabajador y no puedes aprovechar el paralelismo.

¿Cuántos trabajadores necesito?

Como regla general, el clúster de Flink tiene aproximadamente la cantidad de CPU virtuales multiplicada por la cantidad de ranuras de trabajador. Por ejemplo, si tienes 40 trabajadores n1-highmem-8, el clúster de Flink tiene como máximo 320 ranuras, o bien 8 multiplicados por 40.

Lo ideal es que el trabajador administre un trabajo que lea, asigne y combine con paralelismo configurado en los cientos, que programa miles de tareas en paralelo.

¿Funciona con transmisión?

Por el momento, las canalizaciones de transmisión no son compatibles con la función Flink interactiva en un clúster administrado por notebooks.

Beam SQL y beam_sql mágico

Beam SQL te permite consultar PCollections delimitadas y no delimitadas con instrucciones de SQL. Si trabajas en un notebook de Apache Beam, puedes usar las funciones mágicas personalizadas de IPython beam_sql para acelerar el desarrollo de tu canalización.

Puedes verificar el uso mágico de beam_sql con las opciones -h o --help:

Comprueba la ayuda de beam_sql

Puedes crear una PCollection a partir de valores constantes:

Crea PCollection a partir de valores constantes

Puedes unir varias PCollections:

Une varias PCollections

Puedes iniciar un trabajo de Dataflow con las opciones -r DataflowRunner o --runner DataflowRunner:

Inicia un trabajo de Dataflow con Apache Beam SQL

Para obtener más información, consulta el notebook de ejemplo SQL de Apache Beam en notebooks.

Acelera el uso del compilador JIT y la GPU

Puedes usar bibliotecas como numba y GPU para acelerar tu código de Python y las canalizaciones de Apache Beam. Para ejecutar en GPUs, la instancia de notebook de Apache Beam creada con una GPU nvidia-tesla-t4 debe compilar tu código de Python con numba.cuda.jit. De manera opcional, para acelerar la ejecución en las CPUs, compila tu código de Python en código máquina con numba.jit o numba.njit.

En el siguiente ejemplo, se crea una DoFn que procesa en las GPUs:

class Sampler(beam.DoFn):
    def __init__(self, blocks=80, threads_per_block=64):
        # Uses only 1 cuda grid with below config.
        self.blocks = blocks
        self.threads_per_block = threads_per_block

    def setup(self):
        import numpy as np
        # An array on host as the prototype of arrays on GPU to
        # hold accumulated sub count of points in the circle.
        self.h_acc = np.zeros(
            self.threads_per_block * self.blocks, dtype=np.float32)

    def process(self, element: Tuple[int, int]):
        from numba import cuda
        from numba.cuda.random import create_xoroshiro128p_states
        from numba.cuda.random import xoroshiro128p_uniform_float32

        @cuda.jit
        def gpu_monte_carlo_pi_sampler(rng_states, sub_sample_size, acc):
            """Uses GPU to sample random values and accumulates the sub count
            of values within a circle of radius 1.
            """
            pos = cuda.grid(1)
            if pos < acc.shape[0]:
                sub_acc = 0
                for i in range(sub_sample_size):
                    x = xoroshiro128p_uniform_float32(rng_states, pos)
                    y = xoroshiro128p_uniform_float32(rng_states, pos)
                    if (x * x + y * y) <= 1.0:
                        sub_acc += 1
                acc[pos] = sub_acc

        rng_seed, sample_size = element
        d_acc = cuda.to_device(self.h_acc)
        sample_size_per_thread = sample_size // self.h_acc.shape[0]
        rng_states = create_xoroshiro128p_states(self.h_acc.shape[0], seed=rng_seed)
        gpu_monte_carlo_pi_sampler[self.blocks, self.threads_per_block](
            rng_states, sample_size_per_thread, d_acc)
        yield d_acc.copy_to_host()

En la siguiente imagen, se muestra el notebook que se ejecuta en una GPU:

Ejecuta DoFn en GPU

Puedes encontrar más detalles en el notebook de ejemplo Usa GPU con Apache Beam.

Compila un contenedor personalizado

En la mayoría de los casos, si tu canalización no requiere dependencias o ejecutables de Python adicionales, Apache Beam puede usar automáticamente sus imágenes de contenedor oficiales para ejecutar el código definido por el usuario. Estas imágenes vienen con muchos módulos comunes de Python, y no tienes que compilarlos o especificarlos de forma explícita.

En algunos casos, es posible que tengas dependencias adicionales de Python o incluso dependencias que no sean de Python. En estas situaciones, puedes compilar un contenedor personalizado y hacer que esté disponible para el clúster de Flink que se ejecutará. En la siguiente lista, se proporcionan las ventajas de usar un contenedor personalizado:

  • Tiempo de configuración más rápido para ejecuciones interactivas y consecutivas
  • Dependencias y configuraciones estables
  • Más flexibilidad: puedes configurar mucho más que dependencias de Python

El proceso de compilación del contenedor puede ser tedioso, pero puedes hacer todo en el notebook con el siguiente patrón de uso.

Crea un lugar de trabajo local

Primero, crea un directorio de trabajo local en el directorio principal de Jupyter.

!mkdir -p /home/jupyter/.flink

Prepara dependencias de Python

A continuación, instala todas las dependencias adicionales de Python que puedes usar y expórtalas a un archivo de requisitos.

%pip install dep_a
%pip install dep_b
...

Puedes crear un archivo de requisitos de forma explícita mediante el comando mágico de notebook %%writefile.

%%writefile /home/jupyter/.flink/requirements.txt
dep_a
dep_b
...

De manera alternativa, puedes inmovilizar todas las dependencias locales en un archivo de requisitos. Esta opción puede generar dependencias no deseadas.

%pip freeze > /home/jupyter/.flink/requirements.txt

Prepara tus dependencias que no son de Python

Copia todas las dependencias que no sean de Python en el lugar de trabajo. Si no tienes dependencias que no sean de Python, omite este paso.

!cp /path/to/your-dep /home/jupyter/.flink/your-dep
...

Cree un Dockerfile

Crea un Dockerfile con el comando mágico de notebook de %%writefile. Por ejemplo:

%%writefile /home/jupyter/.flink/Dockerfile
FROM apache/beam_python3.7_sdk:2.40.0

COPY  requirements.txt /tmp/requirements.txt
COPY  your_dep /tmp/your_dep
...

RUN python -m pip install -r /tmp/requirements.txt

El contenedor de ejemplo usa la imagen de la versión 2.40.0 del SDK de Apache Beam con Python 3.7 como base, agrega un archivo your_dep y, luego, instala las dependencias adicionales de Python. Usa este Dockerfile como plantilla y edítalo para tu caso de uso.

En las canalizaciones de Apache Beam, cuando hagas referencia a dependencias que no son de Python, usa los destinos COPY. Por ejemplo, /tmp/your_dep es la ruta del archivo your_dep.

Compila una imagen de contenedor en Artifact Registry mediante Cloud Build

  1. Habilita los servicios de Cloud Build y Artifact Registry, si aún no están habilitados.

    !gcloud services enable cloudbuild.googleapis.com
    !gcloud services enable artifactregistry.googleapis.com
    
  2. Crea un repositorio de Artifact Registry para poder subir artefactos. Cada repositorio puede contener artefactos para un formato compatible único.

    Todo el contenido del repositorio se encripta con claves de encriptación administradas por Google o administradas por el cliente. Artifact Registry usa claves de encriptación administradas por Google de forma predeterminada y no se requiere ninguna configuración para esta opción.

    Debes tener al menos el acceso de escritor de Artifact Registry al repositorio.

    Ejecuta el siguiente comando para crear un repositorio nuevo. El comando usa la marca --async y se muestra de inmediato, sin necesidad de esperar a que se complete la operación en curso.

    gcloud artifacts repositories create REPOSITORY \
    --repository-format=docker \
    --location=LOCATION \
    --async
    

    Reemplaza los siguientes valores:

    • REPOSITORY: un nombre para tu repositorio. Para la ubicación de cada repositorio en un proyecto, los nombres de los repositorios deben ser únicos.
    • LOCATION: Es la ubicación de tu repositorio.
  3. Antes de poder enviar o extraer imágenes, configura Docker para autenticar solicitudes de Artifact Registry. Para configurar la autenticación en los repositorios de Docker, ejecuta el siguiente comando:

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    El comando actualiza tu configuración de Docker. Ahora puedes conectarte con Artifact Registry en tu proyecto de Google Cloud para enviar imágenes.

  4. Usa Cloud Build para compilar la imagen de contenedor y guárdala en Artifact Registry.

    !cd /home/jupyter/.flink \
    && gcloud builds submit \
     --tag LOCATION.pkg.dev/PROJECT_ID/REPOSITORY/flink:latest \
     --timeout=20m
    

    Reemplaza PROJECT_ID por el ID de tu proyecto:

Usar contenedores personalizados

Según el ejecutor, puedes usar contenedores personalizados para diferentes propósitos.

Para el uso general del contenedor de Apache Beam, consulta los siguientes vínculos:

Para conocer el uso del contenedor de Dataflow, consulta los siguientes vínculos:

Inhabilita direcciones IP externas

Cuando crees una instancia de notebook de Apache Beam, para aumentar la seguridad, inhabilita las direcciones IP externas. Debido a que las instancias de notebook deben descargar algunos recursos de Internet pública, como Artifact Registry, primero debes crear una red de VPC nueva sin una dirección IP externa. Luego, crea una puerta de enlace de Cloud NAT para esta red de VPC. Para obtener más información sobre Cloud NAT, consulta la documentación de Cloud NAT. Usa la red de VPC y la puerta de enlace de Cloud NAT para acceder a los recursos de Internet pública necesarios sin habilitar las direcciones IP externas.