Guía de inicio rápido para Python

En esta página, se muestra cómo configurar tu entorno de desarrollo de Python, obtener el SDK de Apache Beam para Python y ejecutar y modificar una canalización de ejemplo.

Otra opción para aprender a crear y ejecutar una canalización de Apache Beam es desarrollar una de forma interactiva mediante un notebook de Apache Beam. Si ya tienes un proyecto de Google Cloud configurado, la canalización de ejemplo WordCount en esta guía de inicio rápido está disponible como un notebook de ejemplo.

Antes de comenzar

  1. Accede a tu Cuenta de Google.

    Si todavía no tienes una cuenta, regístrate para obtener una nueva.

  2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir a la página del selector de proyectos

  3. Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Descubre cómo confirmar que tienes habilitada la facturación en un proyecto.

  4. Habilita las API de Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, JSON de Cloud Storage, BigQuery, Cloud Pub/Sub, Cloud Datastore, y Cloud Resource Manager.

    Habilita las API

  5. Configura la autenticación
    1. En Cloud Console, ve a la página Crea una clave de cuenta de servicio.

      Ir a la página Crea una clave de la cuenta de servicio
    2. En la lista Cuenta de servicio, selecciona Cuenta de servicio nueva.
    3. Ingresa un nombre en el campo Nombre de cuenta de servicio.
    4. En la lista Función, selecciona Proyecto > Propietario.

    5. Haz clic en Crear. Se descargará un archivo JSON que contiene tus claves a tu computadora.
  6. Configura la variable de entorno GOOGLE_APPLICATION_CREDENTIALS en la ruta del archivo JSON que contiene la clave de tu cuenta de servicio. Esta variable solo se aplica a la sesión actual de shell. Por lo tanto, si abres una sesión nueva, deberás volver a configurar la variable.

  7. Crea un bucket de Cloud Storage:
    1. En Cloud Console, ve a la página Navegador de Cloud Storage.

      Ir a la página del navegador de Cloud Storage

    2. Haz clic en Crear depósito.
    3. En el diálogo Crear bucket, especifica los siguientes atributos:
      • Nombre: Un nombre de bucket único No incluyas información sensible en el nombre del bucket, ya que el espacio de nombres del bucket es global y tiene visibilidad pública.
      • Clase de almacenamiento predeterminada: Estándar
      • Una ubicación donde se almacenan los datos del depósito.
    4. Haz clic en Crear.

Configure su entorno

  1. Usa el SDK de Apache Beam para Python con pip y la versión 3.6, 3.7, o 3.8 de Python. Ejecuta este comando para comprobar que tengas Python y una instalación pip en funcionamiento:
    python --version
    python -m pip --version
    Si no tienes Python, busca los pasos de instalación para tu sistema operativo en la página Instalación de Python.
  2. Dataflow ya no admite canalizaciones que usan Python 2. Obtén más información en la página Compatibilidad con Python 2 en Google Cloud.

  3. Configura y activa un entorno virtual de Python para esta guía de inicio rápido.

    Una vez completada la guía de inicio rápido, puedes ejecutar deactivate para desactivar el entorno virtual.

Obtén más información sobre el uso de Python en Google Cloud en la página Configura un entorno de desarrollo de Python.

Nota: Para obtener los mejores resultados, inicia las canalizaciones de Python 3 con Apache Beam 2.16.0 o posterior. La versión 2.24.0 del SDK de Apache Beam era la última versión compatible con Python 2 y Python 3.5. Para obtener un resumen de las mejoras recientes de Python 3 en Apache Beam, consulta la herramienta de seguimiento de errores de Apache Beam.

Obtén el SDK de Apache Beam

El SDK de Apache Beam es un modelo de programación de código abierto para canalizaciones de datos. Debes definir estas canalizaciones con un programa Apache Beam y puedes elegir un ejecutor, como Dataflow, para ejecutar tu canalización.

Instala la versión más reciente del SDK de Apache Beam para Python mediante la ejecución del siguiente comando desde un entorno virtual:

pip install 'apache-beam[gcp]'

Ejecuta WordCount de forma local

El ejemplo de WordCount muestra una canalización que realiza los pasos siguientes:
  1. Toma un archivo de texto como entrada.
  2. Analiza cada línea en palabras.
  3. Realiza un recuento de frecuencia en las palabras con asignación de token.

Ejecuta el módulo wordcount desde el paquete apache_beam en tu máquina local con el comando siguiente:

python -m apache_beam.examples.wordcount \
  --output outputs
Este archivo de texto se encuentra en un bucket de Cloud Storage con el nombre del recurso gs://dataflow-samples/shakespeare/kinglear.txt. Para ver los resultados, ejecuta el comando siguiente:
more outputs*

Para salir, presiona la tecla q.

Ejecutar la canalización de manera local te permite probar y depurar el programa de Apache Beam. Puedes ver el código fuente wordcount.py en GitHub de Apache Beam.

Ejecuta WordCount en el servicio de Dataflow

Puedes ejecutar el módulo wordcount desde el paquete apache_beam en el servicio de Dataflow si especificas DataflowRunner en el campo runner y seleccionas la región en la que se ejecutará la canalización.

Primero, define las variables PROJECT, BUCKET y REGION:

PROJECT=PROJECT_ID
BUCKET=GCS_BUCKET
REGION=DATAFLOW_REGION
Para ejecutar esta canalización, ejecuta el comando siguiente:
python -m apache_beam.examples.wordcount \
  --region $REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://$BUCKET/results/outputs \
  --runner DataflowRunner \
  --project $PROJECT \
  --temp_location gs://$BUCKET/tmp/

Visualiza los resultados con GCP

Cuando ejecutas una canalización con Dataflow, los resultados se encuentran en un bucket de Cloud Storage.

Puedes usar la herramienta gsutil para ver los resultados desde tu terminal.

Ejecuta este comando para crear una lista de los archivos de salida:

gsutil ls -lh "gs://$BUCKET/results/outputs*"  
Ejecuta este comando para ver los resultados en estos archivos:
gsutil cat "gs://$BUCKET/results/outputs*"

Ejecuta este comando para ver los resultados en la IU de supervisión:
  1. Abre la IU de supervisión de Dataflow.
    Ir a la IU web de Dataflow

    Verás el trabajo wordcount con estado En ejecución primero y, luego, Correcto (Succeeded):

    El trabajo WordCount de Cloud Dataflow con estado correcto
  2. Abre el navegador de Cloud Storage en Google Cloud Console.
    Ir al navegador de Cloud Storage

    En el directorio wordcount, deberías ver los archivos de salida que creó tu trabajo:

    Directorio de resultados, con archivos de salida del trabajo de WordCount

Modifica el código de canalización

La canalización de WordCount en los ejemplos anteriores distingue entre palabras en mayúsculas y minúsculas. En la explicación siguiente, se muestra cómo modificar la canalización de WordCount para que no distinga entre mayúsculas y minúsculas.
  1. Descarga la copia más reciente del código WordCount del repositorio de GitHub de Apache Beam.
  2. Ejecuta la canalización en tu máquina local:
    python wordcount.py --output outputs
  3. Observa los resultados mediante la ejecución del comando siguiente:
    more outputs*
    Para salir, presiona la tecla q.
  4. Abre el archivo wordcount.py en el editor que prefieras.
  5. Examina los pasos de la canalización dentro de la función run. Después de split, las líneas se dividen en palabras como strings.
    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones))
  6. Modifica la línea después de split para que las strings queden en minúscula.
    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'lowercase' >> beam.Map(unicode.lower)     # Add this line to modify the pipeline
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones)) 
    Esta modificación asigna la función str.lower a cada palabra. Esta línea es equivalente a beam.Map(lambda word: str.lower(word)).
  7. Guarda el archivo y ejecuta el trabajo modificado de WordCount en tu máquina local:
    python wordcount.py --output outputs
  8. Para ver los resultados de la canalización modificada, ejecuta el comando siguiente:
    more outputs*
    Para salir, presiona la tecla q.

Realiza una limpieza

Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que usaste en esta guía de inicio rápido.

  1. En Cloud Console, ve a la página Navegador de Cloud Storage.

    Ir a la página Navegador de Cloud Storage

  2. Haz clic en la casilla de verificación del bucket que quieras borrar.
  3. Para borrar el depósito, haz clic en Borrar .

¿Qué sigue?

Apache Beam™ es una marca de The Apache Software Foundation o sus afiliados de Estados Unidos y otros países.