Únete a la comunidad de Apache Beam del 18 al 20 de julio para la Cumbre de Beam 2022 a fin de obtener más información sobre Beam y compartir tu experiencia.

Desarrolla con notebooks de Apache Beam

Usar el ejecutor interactivo de Apache Beam con notebooks de JupyterLab te permite desarrollar canalizaciones de forma iterativa, inspeccionar el grafo de canalización y analizar PCollections individuales en un flujo de trabajo de bucle de lectura-evaluación-impresión (REPL). Estos notebooks de Apache Beam están disponibles a través de Notebooks administrados por el usuario de Vertex AI Workbench, un servicio que aloja máquinas virtuales de notebooks preinstaladas con la ciencia de datos más reciente y marcos de trabajo de aprendizaje automático.

En esta guía, nos enfocamos en las funciones que presentan los notebooks de Apache Beam, pero no se muestra cómo compilar uno. Para obtener más información sobre Apache Beam, consulta Apache Beam programming guide (Guía de programación de Apache Beam).

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  3. Comprueba que la facturación esté habilitada en tu proyecto.

    Descubre cómo puedes habilitar la facturación

  4. Habilita las API de Compute Engine, Notebooks.

    Habilita las API

  5. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  6. Comprueba que la facturación esté habilitada en tu proyecto.

    Descubre cómo puedes habilitar la facturación

  7. Habilita las API de Compute Engine, Notebooks.

    Habilita las API

Cuando termines esta guía, puedes borrar los recursos que creaste para evitar que se sigan facturando. Para obtener más información, consulta Realiza una limpieza.

Inicia una instancia de notebook de Apache Beam

  1. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.
  2. Navega hasta Dataflow en el panel lateral y haz clic en Workbench.
  3. En la barra de herramientas, haz clic en  Instancia nueva.
  4. Selecciona Apache Beam > Sin GPU.
  5. Si deseas ejecutar notebooks en una GPU, puedes seleccionar Apache Beam > Con 1 NVIDIA Tesla T4 (opcional).
  6. En la página Instancia de notebook nueva, selecciona una red para la VM del notebook y haz clic en Crear.
  7. Si eliges crear la instancia de notebook con una GPU, en la página Nueva instancia de notebook, debes verificar Instalar automáticamente el controlador de GPU de NVIDIA antes de hacer clic en Crear (opcional).
  8. Si deseas configurar una instancia de notebook personalizada, haz clic en Personalizar (opcional). Para obtener más información sobre cómo personalizar las propiedades de la instancia, consulta Crea una instancia de notebook administrada por el usuario con propiedades específicas.
  9. Haz clic en Abrir JupyterLab cuando se active el vínculo. Vertex AI Workbench crea una instancia nueva de notebook de Apache Beam.

Instala dependencias (opcional)

Los notebooks de Apache Beam ya tienen instaladas las dependencias de conector de Apache Beam y Google Cloud. Si la canalización contiene conectores personalizados o PTransforms personalizadas que dependen de bibliotecas de terceros, los puedes instalar después de crear una instancia de notebook. Para obtener más información, consulta Instala dependencias en la documentación de notebooks administrados por el usuario.

Comienza a usar los notebooks de Apache Beam

Después de abrir una instancia de notebook administrada por el usuario, hay notebooks de ejemplo disponibles en la carpeta Ejemplos. Las siguientes opciones están disponibles en este momento:

  • Conteo de palabras
  • Transmisión del conteo de palabras
  • Transmisión de datos de viajes en taxi en la ciudad de Nueva York
  • Conteo de palabras de Dataflow
  • SQL de Apache Beam en notebooks
  • Usa GPU con Apache Beam
  • Visualizar datos

En la carpeta Instructivos, encontrarás instructivos adicionales que explican los aspectos básicos de Apache Beam. Las siguientes opciones están disponibles en este momento:

  • Operaciones básicas
  • Operaciones en términos de elementos
  • Datos recopilados
  • Windows
  • Operaciones de E/S
  • Transmisión

En estos notebooks, se incluye texto explicativo y bloques de código comentados para que puedas comprender los conceptos de Apache Beam y el uso de la API. Los instructivos también proporcionan ejercicios prácticos en los que practicarás los conceptos aprendidos.

Crea una instancia de notebook

Navega hacia Archivo > Nuevo > Notebook y selecciona un kernel que sea Apache Beam 2.22 o una versión posterior.

Apache Beam está instalado en tu instancia de notebook, así que incluye los módulos interactive_runner y interactive_beam en el notebook.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Si el notebook usa otras API de Google, agrega las siguientes instrucciones de importación:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Configura opciones de interactividad

A continuación, se establece la cantidad de tiempo que InteractiveRunner registra los datos de una fuente ilimitada. En este ejemplo, la duración se establece en 10 minutos.

ib.options.recording_duration = '10m'

También puedes cambiar el límite de registro (en bytes) para una fuente ilimitada a través de la propiedad recording_size_limit.

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9

Para descubrir más opciones interactivas, consulta la clase interactive_beam.options.

Crea la canalización

Inicializa la canalización mediante un objeto InteractiveRunner.

options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

p = beam.Pipeline(InteractiveRunner(), options=options)

Lee y visualiza los datos

En el siguiente ejemplo, se muestra una canalización de Apache Beam que crea una suscripción al tema de Pub/Sub determinado y realiza operaciones de lectura desde la suscripción.

words = p
    | "read" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/shakespeare-kinglear")

La canalización cuenta las palabras por ventanas desde la fuente. Crea un sistema de ventanas fijo de 10 segundos de duración por ventana.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

Una vez que los datos se organizaron en ventanas, las palabras se cuentan por ventana.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Con el método show(), se visualiza la PCollection resultante en el notebook.

ib.show(windowed_word_counts, include_window_info=True)

El método show con el que se visualiza una PCollection en formato de tabla

Puedes solicitar el permiso del conjunto de resultados desde show(), mediante la configuración de dos parámetros opcionales: n y duration. Si estableces n, se limita el conjunto de resultados para que se muestren como máximo n cantidad de elementos, alrededor de 20. Si no se establece n, el comportamiento predeterminado consiste en enumerar los elementos más recientes que se capturaron hasta que la grabación de origen finalizó. Establecer duration, limita el conjunto de resultados en un número específico de segundos válidos de datos desde el comienzo de la grabación de la fuente. Si no se establece duration, el comportamiento predeterminado consiste en enumerar todos los elementos hasta que finalice la grabación.

Si se configuran ambos parámetros opcionales, show() se detiene cuando se cumple cualquiera de los límites. En el siguiente ejemplo, show() muestra como máximo 20 elementos que se calculan en función de los primeros 30 segundos de datos de las fuentes registradas.

ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)

Para mostrar visualizaciones de los datos, pasa visualize_data=True al método show(). Puedes aplicar varios filtros a las visualizaciones. La siguiente visualización te permite filtrar por etiqueta y eje:

Método show con el que se muestra una PCollection como un conjunto enriquecido de elementos de IU que se pueden filtrar.

Otra visualización útil en los notebooks de Apache Beam es un DataFrame de Pandas. En el siguiente ejemplo, primero se convierten las palabras en minúsculas y, luego, se calcula la frecuencia de cada palabra.

windowed_lower_word_counts = (windowed_words
   | beam.Map(lambda word: word.lower())
   | "count" >> beam.combiners.Count.PerElement())

El método collect() proporciona el resultado en un DataFrame de Pandas.

ib.collect(windowed_lower_word_counts, include_window_info=True)

El método collect que representa una PCollection en un DataFrame de Pandas

Visualiza los datos a través del inspector interactivo de Beam

Es posible que te distraiga la introspección de los datos de una PCollection mediante una llamada constante a show() y collect(), en especial cuando el resultado ocupa una gran parte del espacio en tu pantalla y dificulta navegar por el notebook. Es posible que también desees comparar varias PCollections en paralelo para validar si una transformación funciona según lo previsto, por ejemplo, cuando una PCollection pasa por una transformación y produce la otra. Para estos casos de uso, el inspector interactivo de Beam es una solución más conveniente.

El inspección interactivo de Beam se proporciona como una extensión de JupyterLab apache-beam-jupyterlab-sidepanel que se preinstaló en el notebook de Apache Beam. Con la extensión, puedes inspeccionar de forma interactiva el estado de las canalizaciones y los datos asociados con cada PCollection sin invocar de forma explícita show() o collect().

Existen 3 formas de abrir el inspector:

  • Haz clic en Interactive Beam en la barra de menú superior de JupyterLab. En el menú desplegable, busca Open Inspector y haz clic en él para abrir el inspector.

    Abrir el inspector mediante el menú

  • Usa la página del selector. Si no hay una página de lanzamiento abierta, haz clic en File -> New Launcher para abrirla. En la página del selector, busca Interactive Beam y haz clic en Open Inspector para abrir el inspector.

    Abre el inspector con el Selector

  • Usa la paleta de comandos. Haz clic en View -> Activate Command Palette en la barra de menú superior de JupyterLab. En la ventana emergente, busca Interactive Beam para enumerar todas las opciones de la extensión. Haz clic en Open Inspector para abrir el inspector.

    Abre el inspector mediante la paleta de comandos

Cuando el inspector está a punto de abrirse, haz lo siguiente:

  1. Si se abre exactamente 1 notebook, el inspector se conectará automáticamente a él.

  2. De lo contrario, aparecerá un cuadro de diálogo para que puedas seleccionar un kernel (cuando no se abra ningún notebook) o la sesión de notebook (cuando se abran varios notebooks) a fin de conectarte.

    Selecciona el notebook al que deseas conectarte

Puedes abrir varios inspecciones para un notebook abierto y organizar los inspecciones si arrastras y sueltas sus pestañas libremente en el lugar de trabajo.

Abre 2 inspectores y agrúpalos uno al lado del otro.

La página del inspector se actualiza de forma automática a medida que ejecutas celdas en el notebook. En el lado izquierdo, se enumeran las canalizaciones y las PCollections definidas en el notebook conectado. Las PCollections se organizan según las canalizaciones a las que pertenecen y se pueden contraer si haces clic en la canalización del encabezado.

Para los elementos de las listas de PCollections y canalizaciones, cuando se hace clic, el inspector renderiza las visualizaciones correspondientes en el lado derecho:

  • Si se trata de una PCollection, el inspector renderiza sus datos (de forma dinámica si los datos aún ingresan para PCollections no delimitadas) con widgets adicionales para ajustar la visualización después de hacer clic en el botón APPLY.

    Página del inspector

  • Si es una canalización, el inspector muestra el grafo de la canalización.

    Página del inspector

Tal vez notes que hay canalizaciones anónimas. Estas son canalizaciones con PCollections a las que puedes acceder, pero la sesión principal ya no hace referencia a ellas. Por ejemplo:

p = beam.Pipeline()
pcoll = p | beam.Create([1, 2, 3])

p = beam.Pipeline()

En el ejemplo anterior, se crea una canalización vacía p y una canalización anónima que contiene 1 PCollection pcoll. Aún puedes acceder a la canalización anónima a través de pcoll.pipeline.

La canalización y la lista de PCollection, a la izquierda, se pueden activar o desactivar a fin de ahorrar espacio para las visualizaciones grandes. Lista izquierda de Toggel

Comprende el estado de registro de una canalización

Además de las visualizaciones, también puedes inspeccionar el estado de registro de una o todas las canalizaciones en tu instancia de notebook mediante una llamada a describe.

# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.
ib.recordings.describe(p)

El método describe() proporciona los siguientes detalles:

  • Tamaño total (en bytes) de todos los registros de la canalización en el disco
  • Hora de inicio del momento en que comenzó el trabajo de registro en segundo plano (en segundos desde el ciclo de entrenamiento de Unix)
  • Estado actual de la canalización del trabajo de registro en segundo plano
  • Variable de Python para la canalización

Inicia trabajos de Dataflow a partir de una canalización creada en el notebook

  1. Antes de usar el notebook para ejecutar trabajos de Dataflow, reinicia el kernel, vuelve a ejecutar todas las celdas y verifica el resultado (opcional). Si omites este paso, los estados ocultos en el notebook pueden afectar el grafo de trabajo en el objeto de canalización.
  2. Habilita la API de Dataflow.
  3. Agrega la siguiente instrucción de importación:

    from apache_beam.runners import DataflowRunner
    
  4. Pasa las opciones de canalización.

    # Set up Apache Beam pipeline options.
    options = pipeline_options.PipelineOptions()
    
    # Set the project to the default project in your current Google Cloud
    # environment.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
    # Set the Google Cloud region to run Dataflow.
    options.view_as(GoogleCloudOptions).region = 'us-central1'
    
    # Choose a Cloud Storage location.
    dataflow_gcs_location = 'gs://<change me>/dataflow'
    
    # Set the staging location. This location is used to stage the
    # Dataflow pipeline and SDK binary.
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    
    # Set the temporary location. This location is used to store temporary files
    # or intermediate results before outputting to the sink.
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
    
    # If and only if you are using Apache Beam SDK built from source code, set
    # the SDK location. This is used by Dataflow to locate the SDK
    # needed to run the pipeline.
    options.view_as(pipeline_options.SetupOptions).sdk_location = (
        '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
        beam.version.__version__)
    

    Puedes ajustar los valores de los parámetros. Por ejemplo, puedes cambiar el valor region de us-central1.

  5. Ejecuta la canalización con DataflowRunner. Esto ejecuta el trabajo en el servicio de Dataflow.

    runner = DataflowRunner()
    runner.run_pipeline(p, options=options)
    

    p es un objeto de canalización que se describe en Crea la canalización.

Para obtener un ejemplo sobre cómo realizar esta conversión en un notebook interactivo, consulta el notebook “Conteo de palabras de Dataflow” en la instancia de notebook.

Como alternativa, puedes exportar el notebook como una secuencia de comandos ejecutable, modificar el archivo .py generado mediante los pasos anteriores y, luego, implementar la canalización en el servicio de Dataflow.

Guarda el notebook

Los notebooks que creas se guardan de forma local en la instancia de notebook en ejecución. Si restableces o cierras la instancia de notebook durante el desarrollo, esos notebooks nuevos se mantienen siempre que se creen en el directorio /home/jupyter. Sin embargo, si se borra una instancia de notebook, esos notebooks también se borran.

Si quieres conservar los notebooks para usarlos en el futuro, descárgalos de forma local en tu estación de trabajo, guárdalos en GitHub o expórtalos a un formato de archivo diferente.

Guarda el notebook en discos persistentes adicionales

Si quieres mantener tu trabajo como notebooks y secuencias de comandos en varias instancias de notebook, puedes almacenarlos en el disco persistente.

  1. Crea o adjunta un disco persistente. Sigue las instrucciones para usar ssh a fin de conectarte a la VM de la instancia de notebook y emitir comandos en el Cloud Shell abierto.

  2. Ten en cuenta el directorio en el que se activa el disco persistente, por ejemplo, /mnt/myDisk.

  3. Edita los detalles de la VM de la instancia de notebook para agregar una entrada a Custom metadata: clave - container-custom-params; valor: -v /mnt/myDisk:/mnt/myDisk. Se necesitan metadatos adicionales para vincular el PD activado

  4. Haz clic en Guardar.

  5. Para actualizar estos cambios, restablece la instancia de notebook. Restablece una instancia de notebook

  6. Cuando el vínculo se active después del restablecimiento, haz clic en Abrir JupyterLab. La IU de JupyterLab puede tardar un tiempo en estar disponible. Una vez que aparezca la IU, abre una terminal y ejecuta el siguiente comando: ls -al /mnt. Se debería enumerar el directorio /mnt/myDisk. Enumera límites de volumen

Ahora puedes guardar tu trabajo en el directorio /mnt/myDisk. Incluso si se borra la instancia de notebook, el disco persistente aún existe en tu proyecto. Luego, puedes conectar este disco persistente a otras instancias de notebook.

Realice una limpieza

Una vez que termines de usar la instancia de notebook de Apache Beam, limpia los recursos que creaste en Google Cloud. Para ello, cierra la instancia de notebook.

Funciones avanzadas

Beam SQL y beam_sql mágico

Beam SQL te permite consultar PCollections delimitadas y no delimitadas con instrucciones de SQL. Si trabajas en un notebook de Apache Beam, puedes usar las funciones mágicas personalizadas de IPython beam_sql para acelerar el desarrollo de tu canalización.

Puedes verificar el uso mágico de beam_sql con las opciones -h o --help: Comprueba la ayuda de beam_sql

Puedes crear una PCollection a partir de valores constantes: Crea PCollection a partir de valores constantes

Puedes unir varias PCollections: Une varias PCollections

Puedes iniciar un trabajo de Dataflow con las opciones -r DataflowRunner o --runner DataflowRunner: Inicia un trabajo de Dataflow con Beam SQL

Para obtener más información, consulta el notebook de ejemplo Apache Beam SQL in notebooks.

Acelera el uso del compilador JIT y la GPU

Puedes usar bibliotecas como numba y GPU para acelerar tu código de Python y las canalizaciones de Apache Beam. En la instancia de notebook de Apache Beam creada con una GPU nvidia-tesla-t4, puedes compilar tu código de Python con numba.cuda.jit para ejecutarlo en la GPU. De manera opcional, puedes compilar tu código de Python en código máquina con numba.jit o numba.njit para acelerar la ejecución en las CPU.

Un DoFn que procesa en GPU:

class Sampler(beam.DoFn):
    def __init__(self, blocks=80, threads_per_block=64):
        # Uses only 1 cuda grid with below config.
        self.blocks = blocks
        self.threads_per_block = threads_per_block

    def setup(self):
        import numpy as np
        # An array on host as the prototype of arrays on GPU to
        # hold accumulated sub count of points in the circle.
        self.h_acc = np.zeros(
            self.threads_per_block * self.blocks, dtype=np.float32)

    def process(self, element: Tuple[int, int]):
        from numba import cuda
        from numba.cuda.random import create_xoroshiro128p_states
        from numba.cuda.random import xoroshiro128p_uniform_float32

        @cuda.jit
        def gpu_monte_carlo_pi_sampler(rng_states, sub_sample_size, acc):
            """Uses GPU to sample random values and accumulates the sub count
            of values within a circle of radius 1.
            """
            pos = cuda.grid(1)
            if pos < acc.shape[0]:
                sub_acc = 0
                for i in range(sub_sample_size):
                    x = xoroshiro128p_uniform_float32(rng_states, pos)
                    y = xoroshiro128p_uniform_float32(rng_states, pos)
                    if (x * x + y * y) <= 1.0:
                        sub_acc += 1
                acc[pos] = sub_acc

        rng_seed, sample_size = element
        d_acc = cuda.to_device(self.h_acc)
        sample_size_per_thread = sample_size // self.h_acc.shape[0]
        rng_states = create_xoroshiro128p_states(self.h_acc.shape[0], seed=rng_seed)
        gpu_monte_carlo_pi_sampler[self.blocks, self.threads_per_block](
            rng_states, sample_size_per_thread, d_acc)
        yield d_acc.copy_to_host()

Ejecuta en una GPU: Ejecuta DoFn en GPU

Puedes encontrar más detalles en el notebook de ejemplo Use GPUs with Apache Beam.