Desarrolla con notebooks de Apache Beam

Usar el ejecutor interactivo de Apache Beam con notebooks de JupyterLab te permite desarrollar canalizaciones de forma iterativa, inspeccionar el grafo de canalización y analizar PCollections individuales en un flujo de trabajo de bucle de lectura-evaluación-impresión (REPL). Estos notebooks de Apache Beam están disponibles a través de AI Platform Notebooks, un servicio administrado que aloja máquinas virtuales de notebooks preinstaladas con los frameworks más recientes de ciencia de datos y aprendizaje automático.

En esta guía, nos enfocamos en las funciones que presentan los notebooks de Apache Beam, pero no se muestra cómo compilar uno. Para obtener más información sobre Apache Beam, consulta Apache Beam programming guide (Guía de programación de Apache Beam).

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyecto

  3. Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Descubre cómo confirmar que tienes habilitada la facturación en un proyecto.

  4. Habilita las API de Compute Engine, AI Platform Notebooks.

    Habilita las API

Cuando termines esta guía, puedes borrar los recursos que creaste para evitar que se sigan facturando. Para obtener más información, consulta Realiza una limpieza.

Inicia una instancia de notebook de Apache Beam

  1. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.
  2. Navega hasta Dataflow en el panel lateral y haz clic en Notebooks.
  3. En la barra de herramientas, haz clic en  Instancia nueva.
  4. Selecciona Apache Beam.
  5. En la página Instancia de notebook nueva, selecciona una red para la VM del notebook y haz clic en Crear.
  6. Si deseas configurar una instancia de notebook personalizada, haz clic en Personalizar (opcional). Para obtener más información sobre cómo personalizar las propiedades de la instancia, consulta Crea una instancia de AI Platform Notebooks con propiedades específicas.
  7. Haz clic en Abrir JupyterLab cuando se active el vínculo. AI Platform Notebooks crea una nueva instancia de notebook de Apache Beam.

Instala dependencias (opcional)

Los notebooks de Apache Beam ya tienen instaladas las dependencias de conector de Apache Beam y Google Cloud. Si la canalización contiene conectores personalizados o PTransforms personalizadas que dependen de bibliotecas de terceros, los puedes instalar después de crear una instancia de notebook. Para obtener más información, consulta Instala dependencias en la documentación de AI Platform Notebooks.

Comienza a usar los notebooks de Apache Beam

Una vez que abres una instancia de AI Platform Notebooks, hay notebooks de ejemplo disponibles en la carpeta Ejemplos. Las siguientes opciones están disponibles en este momento:

  • Conteo de palabras
  • Transmisión del conteo de palabras
  • Transmisión de datos de viajes en taxi en la ciudad de Nueva York
  • Conteo de palabras de Dataflow

En estos notebooks, se incluye texto explicativo y bloques de código comentados para que puedas comprender los conceptos de Apache Beam y el uso de la API.

Crea una instancia de notebook

Navega hacia Archivo > Nuevo > Notebook y selecciona un kernel que sea Apache Beam 2.22 o una versión posterior.

Apache Beam está instalado en tu instancia de notebook, así que incluye los módulos interactive_runner y interactive_beam en el notebook.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Si el notebook usa otras API de Google, agrega las siguientes instrucciones de importación:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Configura opciones de interactividad

A continuación, se establece la cantidad de tiempo que InteractiveRunner registra los datos de una fuente ilimitada. En este ejemplo, la duración se establece en 10 minutos.

ib.options.recording_duration = '10m'

También puedes cambiar el límite de registro (en bytes) para una fuente ilimitada a través de la propiedad recording_size_limit.

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9

Para descubrir más opciones interactivas, consulta la clase interactive_beam.options.

Crea la canalización

Inicializa la canalización mediante un objeto InteractiveRunner.

options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

p = beam.Pipeline(InteractiveRunner(), options=options)

Lee y visualiza los datos

En el siguiente ejemplo, se muestra una canalización de Apache Beam que crea una suscripción al tema de Pub/Sub determinado y realiza operaciones de lectura desde la suscripción.

words = p
    | "read" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/shakespeare-kinglear")

La canalización cuenta las palabras por ventanas desde la fuente. Crea un sistema de ventanas fijo de 10 segundos de duración por ventana.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

Una vez que los datos se organizaron en ventanas, las palabras se cuentan por ventana.

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Con el método show(), se visualiza la PCollection resultante en el notebook.

ib.show(windowed_word_counts, include_window_info=True)

El método show con el que se visualiza una PCollection en formato de tabla

Puedes solicitar el permiso del conjunto de resultados desde show(), mediante la configuración de dos parámetros opcionales: n y duration. Si estableces n, se limita el conjunto de resultados para que se muestren como máximo n cantidad de elementos, alrededor de 20. Si no se establece n, el comportamiento predeterminado consiste en enumerar los elementos más recientes que se capturaron hasta que la grabación de origen finalizó. Establecer duration, limita el conjunto de resultados en un número específico de segundos válidos de datos desde el comienzo de la grabación de la fuente. Si no se establece duration, el comportamiento predeterminado consiste en enumerar todos los elementos hasta que finalice la grabación.

Si se configuran ambos parámetros opcionales, show() se detiene cuando se cumple cualquiera de los límites. En el siguiente ejemplo, show() muestra como máximo 20 elementos que se calculan en función de los primeros 30 segundos de datos de las fuentes registradas.

ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)

Para mostrar visualizaciones de los datos, pasa visualize_data=True al método show(). Puedes aplicar varios filtros a las visualizaciones. La siguiente visualización te permite filtrar por etiqueta y eje:

Método show con el que se muestra una PCollection como un conjunto enriquecido de elementos de IU que se pueden filtrar.

Otra visualización útil en los notebooks de Apache Beam es un DataFrame de Pandas. En el siguiente ejemplo, primero se convierten las palabras en minúsculas y, luego, se calcula la frecuencia de cada palabra.

windowed_lower_word_counts = (windowed_words
   | beam.Map(lambda word: word.lower())
   | "count" >> beam.combiners.Count.PerElement())

El método collect() proporciona el resultado en un DataFrame de Pandas.

ib.collect(windowed_lower_word_counts, include_window_info=True)

El método collect que representa una PCollection en un DataFrame de Pandas

Comprende el estado de registro de una canalización

Además de las visualizaciones, también puedes inspeccionar el estado de registro de una o todas las canalizaciones en tu instancia de notebook mediante una llamada a describe.

# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.
ib.recordings.describe(p)

El método describe() proporciona los siguientes detalles:

  • Tamaño total (en bytes) de todos los registros de la canalización en el disco
  • Hora de inicio del momento en que comenzó el trabajo de registro en segundo plano (en segundos desde el ciclo de entrenamiento de Unix)
  • Estado actual de la canalización del trabajo de registro en segundo plano
  • Variable de Python para la canalización

Inicia trabajos de Dataflow a partir de una canalización creada en el notebook

  1. Antes de usar el notebook para ejecutar trabajos de Dataflow, reinicia el kernel, vuelve a ejecutar todas las celdas y verifica el resultado (opcional). Si omites este paso, los estados ocultos en el notebook pueden afectar el grafo de trabajo en el objeto de canalización.
  2. Habilita la API de Dataflow.
  3. Agrega la siguiente instrucción de importación:

    from apache_beam.runners import DataflowRunner
    
  4. Pasa las opciones de canalización.

    # Set up Apache Beam pipeline options.
    options = pipeline_options.PipelineOptions()
    
    # Set the project to the default project in your current Google Cloud
    # environment.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
    # Set the Google Cloud region to run Dataflow.
    options.view_as(GoogleCloudOptions).region = 'us-central1'
    
    # Choose a Cloud Storage location.
    dataflow_gcs_location = 'gs://<change me>/dataflow'
    
    # Set the staging location. This location is used to stage the
    # Dataflow pipeline and SDK binary.
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    
    # Set the temporary location. This location is used to store temporary files
    # or intermediate results before outputting to the sink.
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
    
    # Set the SDK location. This is used by Dataflow to locate the
    # SDK needed to run the pipeline.
    options.view_as(pipeline_options.SetupOptions).sdk_location = (
        '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
        beam.version.__version__)
    

    Puedes ajustar los valores de los parámetros. Por ejemplo, puedes cambiar el valor region de us-central1.

  5. Ejecuta la canalización con DataflowRunner. Esto ejecuta el trabajo en el servicio de Dataflow.

    runner = DataflowRunner()
    runner.run_pipeline(p, options=options)
    

    p es un objeto de canalización que se describe en Crea la canalización.

Para obtener un ejemplo sobre cómo realizar esta conversión en un notebook interactivo, consulta el notebook “Conteo de palabras de Dataflow” en la instancia de notebook.

Como alternativa, puedes exportar el notebook como una secuencia de comandos ejecutable, modificar el archivo .py generado mediante los pasos anteriores y, luego, implementar la canalización en el servicio de Dataflow.

Guarda el notebook

Los notebooks que creas se guardan de forma local en la instancia de notebook en ejecución. Si restableces o cierras la instancia de notebook durante el desarrollo, esos notebooks nuevos se borran. Si quieres conservar los notebooks para usarlos en el futuro, descárgalos de forma local en tu estación de trabajo, guárdalos en GitHub o expórtalos a un formato de archivo diferente.

Realice una limpieza

Una vez que termines de usar la instancia de notebook de Apache Beam, limpia los recursos que creaste en Google Cloud. Para ello, cierra la instancia de notebook.