Guía de inicio rápido del uso de Python

En esta guía de inicio rápido, aprenderás a usar el SDK de Apache Beam para Python a fin de compilar un programa que defina una canalización. Luego, ejecuta la canalización mediante un ejecutor local directo o uno basado en la nube, como Dataflow.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  3. Comprueba que la facturación esté habilitada en tu proyecto.

    Descubre cómo puedes habilitar la facturación

  4. Habilita las API de Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager.

    Habilita las API

  5. Crea una cuenta de servicio:

    1. En Cloud Console, ve a la página Crear cuenta de servicio.

      Ir a Crear cuenta de servicio
    2. Selecciona un proyecto
    3. Ingresa un nombre en el campo Nombre de cuenta de servicio. Cloud Console completa el campo ID de cuenta de servicio con base en este nombre.

      Opcional: en el campo Descripción de la cuenta de servicio, ingresa una descripción. Por ejemplo, Service account for quickstart.

    4. Haz clic en Crear y continuar.
    5. Haz clic en el campo Seleccionar una función.

      En Acceso rápido, haz clic en Básica y, luego, en Propietario.

    6. Haga clic en Continuar.
    7. Haz clic en Listo para terminar de crear la cuenta de servicio.

      No cierres la ventana del navegador. La usarás en la próxima tarea.

  6. Para crear una clave de cuenta de servicio, haz lo siguiente:

    1. En Cloud Console, haz clic en la dirección de correo electrónico de la cuenta de servicio que creaste.
    2. Haga clic en Claves.
    3. Haz clic en Agregar clave, luego haz clic en Crear clave nueva.
    4. Haga clic en Crear. Se descargará un archivo de claves JSON en tu computadora.
    5. Haga clic en Cerrar.
  7. Configura la variable de entorno GOOGLE_APPLICATION_CREDENTIALS en la ruta del archivo JSON que contiene la clave de tu cuenta de servicio. Esta variable solo se aplica a la sesión actual de shell. Por lo tanto, si abres una sesión nueva, deberás volver a configurar la variable.

  8. Crea un bucket de Cloud Storage:
    1. En Cloud Console, ve a la página Navegador de Cloud Storage.

      Ir al navegador

    2. Haz clic en Crear bucket.
    3. En la página Crear un bucket, ingresa la información de tu bucket. Para ir al siguiente paso, haz clic en Continuar.
      • Para Asignar un nombre a tu depósito, ingresa un nombre de bucket único. No incluyas información sensible en el nombre del bucket porque su espacio de nombres es global y públicamente visible.
      • Para Elige dónde almacenar tus datos, sigue estos pasos:
        • Seleccione una opción de Tipo de ubicación.
        • Selecciona una opción de Ubicación.
      • Para Elegir una clase de almacenamiento predeterminado para tus datos, selecciona lo siguiente:Estándar.
      • En Elegir cómo controlar el acceso a los objetos, selecciona una opción Control de acceso.
      • En Configuración avanzada (opcional), especifica un método de encriptación, una política de retención o etiquetas de depósito.
    4. Haga clic en Crear.
  9. Copia el ID del proyecto de Google Cloud y el nombre del bucket de Cloud Storage. Necesitarás estos valores más adelante en este documento.

Configure su entorno

En esta sección, usa el símbolo del sistema a fin de configurar un entorno virtual aislado de Python para ejecutar el proyecto de canalización mediante venv. Este proceso te permite aislar las dependencias de un proyecto de las dependencias de otros proyectos.

Si no tienes un símbolo del sistema disponible, puedes usar Cloud Shell. Cloud Shell ya tiene el administrador de paquetes para Python 3 instalado, por lo que puedes omitir la creación de un entorno virtual.

Para instalar Python y, luego, crear un entorno virtual, sigue estos pasos:

  1. Verifica que Python 3 y pip se estén ejecutando en el sistema:
    python --version
    python -m pip --version
    
  2. Si es necesario, instala Python 3 y, luego, configura un entorno virtual de Python: sigue las instrucciones proporcionadas en las secciones Instala Python y Configura venv de la página Configura un entorno de desarrollo de Python.

Una vez completada la guía de inicio rápido, puedes ejecutar deactivate para desactivar el entorno virtual.

Obtén el SDK de Apache Beam

El SDK de Apache Beam es un modelo de programación de código abierto para canalizaciones de datos. Debes definir una canalización con un programa Apache Beam y, luego, elegir un ejecutor, como Dataflow, para ejecutar tu canalización.

Para descargar y, luego, instalar el SDK de Apache Beam, sigue estos pasos:

  1. Verifica que estés en el entorno virtual de Python que creaste en la sección anterior. Asegúrate de que el mensaje comience con <env_name>, en el que env_name sea el nombre del entorno virtual.
  2. Instala el estándar de empaquetado de la rueda de Python:
    pip install wheel
    
  3. Instala la versión más reciente del SDK de Apache Beam para Python:
  4. pip install 'apache-beam[gcp]'

    Según la conexión, la instalación puede tardar un poco.

Ejecute la canalización de forma local:

Si deseas ver cómo se ejecuta una canalización de forma local, usa un módulo de Python ya preparado para el ejemplo de wordcount incluido en el paquete apache_beam.

En el ejemplo de canalización wordcount, se realiza lo siguiente:

  1. Toma un archivo de texto como entrada.

    Este archivo de texto se encuentra en un bucket de Cloud Storage con el nombre del recurso gs://dataflow-samples/shakespeare/kinglear.txt.

  2. Analiza cada línea en palabras.
  3. Realiza un recuento de frecuencia en las palabras con asignación de token.

Para habilitar a etapa la canalización wordcount de forma local, sigue estos pasos:

  1. Desde tu terminal local, ejecuta el ejemplo wordcount:
    python -m apache_beam.examples.wordcount \
      --output outputs
  2. Visualiza el resultado de la canalización:
    more outputs*
  3. Para salir, presiona q.
Ejecutar la canalización de manera local te permite probar y depurar el programa de Apache Beam. Puedes ver el código fuente wordcount.py en GitHub de Apache Beam.

Ejecuta la canalización en el servicio de Dataflow

En esta sección, ejecuta la canalización de ejemplo wordcount desde el paquete apache_beam en el servicio de Dataflow. En este ejemplo, se especifica DataflowRunner como parámetro para --runner.
  • Ejecuta la canalización:
    python -m apache_beam.examples.wordcount \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://STORAGE_BUCKET/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://STORAGE_BUCKET/tmp/

    Reemplaza lo siguiente:

    • DATAFLOW_REGION: el extremo regional en el que deseas implementar el trabajo de Dataflow, por ejemplo, europe-west1.

      La opción --region anula la región predeterminada que está configurada en el servidor de metadatos, el cliente local o las variables de entorno.

    • STORAGE_BUCKET: Es el nombre de Cloud Storage que copiaste antes.
    • PROJECT_ID por el ID del proyecto de Google Cloud que copiaste antes

Ve los resultados

Cuando ejecutas una canalización con Dataflow, los resultados se almacenan en un bucket de Cloud Storage. En esta sección, debes verificar que la canalización se esté ejecutando mediante Cloud Console o la terminal local.

Cloud Console

Para ver los resultados en Cloud Console, sigue estos pasos:

  1. En Cloud Console, ve a la página de Jobs.

    Ir a Trabajos

    En la página Trabajos, se muestran detalles del trabajo wordcount, incluido un estado En ejecución primero y, luego, Correcto.

  2. Ir a la página Navegador de Cloud Storage

    Ir al navegador

  3. En la lista de buckets de tu proyecto, haz clic en el bucket de almacenamiento que creaste antes.

    En el directorio wordcount, se muestran los archivos de salida que creó tu trabajo.

Terminal local

Para consultar los resultados de tu terminal, usa la herramienta de gsutil. También puedes ejecutar los comandos desde Cloud Shell.

  1. Enumera los archivos de salida:
    gsutil ls -lh "gs://STORAGE_BUCKET/results/outputs*"  
  2. Reemplaza STORAGE_BUCKET por el nombre del bucket de Cloud Storage que se usó en el programa de canalización.

  3. Visualiza los resultados en los archivos de salida:
    gsutil cat "gs://STORAGE_BUCKET/results/outputs*"

Modifica el código de canalización

La canalización de wordcount en los ejemplos anteriores distingue entre palabras en mayúsculas y minúsculas. En los siguientes pasos, se muestra cómo modificar la canalización para que la canalización wordcount no distinga entre mayúsculas y minúsculas.
  1. En tu máquina local, descarga la copia más reciente del código wordcount del repositorio de GitHub de Apache Beam.
  2. Desde la terminal local, ejecuta la canalización:
    python wordcount.py --output outputs
  3. Observa los resultados.
    more outputs*
  4. Para salir, presiona q.
  5. En el editor que prefieras, abre el archivo wordcount.py.
  6. Dentro de la función run, examina los pasos de la canalización:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

    Después de split, las líneas se dividen en palabras como strings.

  7. Para aplicar minúscula a las strings, modifica la línea después de split:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'lowercase' >> beam.Map(str.lower)
            | 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    Esta modificación asigna la función str.lower a cada palabra. Esta línea es equivalente a beam.Map(lambda word: str.lower(word)).
  8. Guarda el archivo y ejecuta el trabajo wordcount modificado:
    python wordcount.py --output outputs
  9. Visualiza los resultados de la canalización modificada:
    more outputs*
  10. Para salir, presiona q.

Limpia

Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que usaste en esta página.

  1. En Cloud Console, ve a la página Navegador de Cloud Storage.

    Ir al navegador

  2. Haz clic en la casilla de verificación del bucket que deseas borrar.
  3. Para borrar el bucket, haz clic en Borrar y, luego, sigue las instrucciones.

¿Qué sigue?

Apache Beam™ es una marca de The Apache Software Foundation o sus afiliados de Estados Unidos y otros países.