Automatiza la infraestructura con Cloud Composer

En este instructivo, se demuestra una forma de automatizar la infraestructura de nube mediante Cloud Composer. En el ejemplo, se muestra cómo programar copias de seguridad automáticas de instancias de máquina virtual (VM) de Compute Engine.

Cloud Composer es un servicio de organización de flujos de trabajo completamente administrado en Google Cloud. Cloud Composer te permite crear flujos de trabajo mediante una API de Python, programarlos para que se ejecuten de forma automática o iniciarlos de forma manual, y supervisar la ejecución de sus tareas en tiempo real a través de una IU gráfica.

Cloud Composer se basa en Apache Airflow. Google ejecuta esta plataforma de organización de código abierto sobre un clúster de Google Kubernetes Engine (GKE). Este clúster administra los trabajadores de Airflow y proporciona una gran cantidad de oportunidades de integración con otros productos de Google Cloud.

Este instructivo está dirigido a los operadores, los administradores de TI y los desarrolladores interesados en automatizar la infraestructura y conocer los detalles técnicos de las características principales de Cloud Composer. El instructivo no se diseñó con el fin de ser una guía de recuperación ante desastres (DR) a nivel empresarial ni una guía de prácticas recomendadas para las copias de seguridad. Si deseas obtener más información sobre cómo crear un plan de DR destinado a tu empresa, consulta la guía de planificación para la recuperación ante desastres.

Define la arquitectura

Los flujos de trabajo de Cloud Composer se definen mediante la creación de un grafo acíclico dirigido (DAG). Desde la perspectiva de Airflow, un DAG es una colección de tareas organizadas con el fin de reflejar sus interdependencias direccionales. En este instructivo, aprenderás cómo definir un flujo de trabajo de Airflow que se ejecute con regularidad para crear una copia de seguridad de una instancia de máquina virtual de Compute Engine mediante el uso de instantáneas de discos persistentes.

La VM de Compute Engine que se usa en este ejemplo consta de una instancia con un disco persistente de arranque asociado. Si se siguen los lineamientos de instantáneas, que se describen más adelante, el flujo de trabajo de copia de seguridad de Cloud Composer llama a la API de Compute Engine con el fin de detener la instancia, tomar una instantánea del disco persistente y reiniciar la instancia. Entre estas tareas, el flujo de trabajo espera a que se complete cada operación antes de continuar.

En el siguiente diagrama, se resume la arquitectura:

Arquitectura para la automatización de la infraestructura

En la siguiente sección, antes de comenzar el instructivo, se muestra cómo crear un entorno de Cloud Composer. La ventaja de este entorno es que usa varios productos de Google Cloud, pero no es necesario que configures cada uno de forma individual.

  • Cloud Storage: Los registros, el complemento y el DAG de Airflow se almacenan en un depósito de Cloud Storage.
  • Google Kubernetes Engine: La plataforma de Airflow se basa en una arquitectura de microservicios y es adecuada para ejecutarse en GKE.
    • Los trabajadores de Airflow cargan las definiciones de los complementos y los flujos de trabajo desde Cloud Storage y ejecutan cada tarea mediante la API de Compute Engine.
    • El programador de Airflow se asegura de que las copias de seguridad se ejecuten en la cadencia configurada y en el orden de tareas adecuado.
    • Redis se usa como agente de mensajes entre los componentes de Airflow.
    • El proxy de Cloud SQL se usa para la comunicación con el repositorio de metadatos.
  • Cloud SQL y App Engine Flex: Cloud Composer también usa una instancia de Cloud SQL para los metadatos y una app de App Engine Flex que entrega la IU de Airflow. Estos recursos no se muestran en el diagrama porque se encuentran en un proyecto independiente que administra Google.

Para obtener más detalles, consulta la Descripción general de Cloud Composer.

Escala el flujo de trabajo

El caso práctico que se presenta en este instructivo es simple: toma una instantánea de una sola máquina virtual con un programa fijo. Sin embargo, en una situación real, se pueden incluir cientos de VM que pertenezcan a distintas partes de la organización o a distintos niveles de un sistema, y cada una requiere distintos programas de copia de seguridad. El escalamiento no solo se aplica a este ejemplo con VM de Compute Engine, sino también a cualquier componente de una infraestructura para el que se deba ejecutar un proceso programado.

Cloud Composer se destaca en estas situaciones complejas debido a que es un motor completo de flujo de trabajo basado en Apache Airflow y alojado en la nube. No es solo una alternativa a Cloud Scheduler o cron.

Los DAG de Airflow, que son representaciones flexibles de un flujo de trabajo, se adaptan a las necesidades del mundo real, a la vez que se ejecutan desde una sola base de código. Si deseas crear DAG adecuados para tu caso práctico, puedes usar una combinación de los siguientes dos enfoques:

  • Crear una instancia de DAG para los grupos de componentes de infraestructura en los que se pueda usar el mismo programa a fin de iniciar el proceso
  • Crear instancias de DAG independientes para los grupos de componentes de infraestructura que requieran sus propios programas

Un DAG puede procesar componentes en paralelo. Una tarea debe iniciar una operación asíncrona para cada componente, o bien debes crear una rama a fin de procesar cada uno. Puedes compilar DAG de forma dinámica a partir de código con el fin de agregar o quitar ramas y tareas según sea necesario.

Además, puedes modelar dependencias entre los niveles de la aplicación dentro del mismo DAG. Por ejemplo, se recomienda detener todas las instancias del servidor web antes de detener cualquier instancia del servidor de apps.

Estas optimizaciones están fuera del alcance de este instructivo.

Usa prácticas recomendadas para las instantáneas

El disco persistente (PD) es un almacenamiento en bloque duradero que se puede conectar a una instancia de máquina virtual y usar como disco de arranque principal para la instancia o como disco secundario que no es de arranque para los datos críticos. Los PD tienen una alta disponibilidad, ya que, por cada escritura, se escriben tres réplicas. Sin embargo, a los clientes de Google Cloud se les cobra solo una.

Una instantánea es una copia exacta de un disco persistente en un momento determinado. Las instantáneas son incrementales, están comprimidas y se almacenan con transparencia en Cloud Storage.

Es posible tomar instantáneas de cualquier disco persistente mientras se ejecutan las apps. Ninguna instantánea contendrá un bloque escrito de forma parcial. Sin embargo, si una operación de escritura que abarca varios bloques se encuentra en proceso cuando el backend recibe la solicitud de creación de la instantánea, esta podría contener solo algunos de los bloques actualizados. Puedes solucionar estas inconsistencias de la misma forma en que lo harías en el caso de los apagados poco claros.

Te recomendamos seguir estos lineamientos para asegurarte de que las instantáneas sean coherentes:

  • Minimiza o evita las escrituras en los discos durante el proceso de creación de instantáneas. Programar copias de seguridad durante las horas de menor demanda es un buen comienzo.
  • En el caso de los discos secundarios que no sean de arranque, pausa las apps y los procesos que escriban datos, y desactiva o inmoviliza el sistema de archivos.
  • En el caso de los discos de arranque, no es seguro ni factible inmovilizar el volumen raíz. Detener la instancia de máquina virtual antes de tomar una instantánea puede ser un enfoque adecuado.

    Para evitar el tiempo de inactividad del servicio ocasionado por la inmovilización o la detención de una máquina virtual, recomendamos el uso de una arquitectura con alta disponibilidad. Si deseas obtener más información, consulta Situaciones de recuperación ante desastres para aplicaciones.

  • Usa una convención coherente de asignación de nombres para las instantáneas. Por ejemplo, usa una marca de tiempo que tenga un nivel de detalle adecuado y esté concatenada con el nombre de la instancia, el disco y la zona.

Si deseas obtener más información sobre cómo crear instantáneas coherentes, consulta las prácticas recomendadas para las instantáneas.

Objetivos

  • Crear operadores personalizados de Airflow y un sensor para Compute Engine
  • Crear un flujo de trabajo de Cloud Composer mediante los operadores de Airflow y un sensor
  • Programar el flujo de trabajo para crear una copia de seguridad de una instancia de Compute Engine en intervalos regulares

Costos

Puedes usar la calculadora de precios para generar una estimación de costos según el uso previsto.

Antes de comenzar

  1. Accede a tu Cuenta de Google.

    Si todavía no tienes una cuenta, regístrate para obtener una nueva.

  2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir a la página del selector de proyectos

  3. Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Descubre cómo confirmar que tienes habilitada la facturación en un proyecto.

  4. Crea un entorno de Cloud Composer. Para minimizar el costo, elige un tamaño de disco de 20 GB.

    IR A LA PÁGINA CREAR ENTORNO

    Por lo general, el aprovisionamiento del entorno de Cloud Composer toma alrededor de 15 minutos, pero puede demorar hasta una hora.

  5. El código completo de este instructivo está disponible en GitHub. Para examinar los archivos a medida que avanzas, abre el repositorio en Cloud Shell:

    IR A Cloud Shell
  6. En el directorio principal de la consola de Cloud Shell, ejecuta el siguiente comando:
    git clone https://github.com/GoogleCloudPlatform/composer-infra-python.git

Cuando finalices este instructivo, podrás borrar los recursos creados para evitar que se te siga facturando. Para obtener más información, consulta cómo hacer una limpieza.

Configura una instancia de muestra de Compute Engine

El primer paso consiste en crear la instancia de máquina virtual de muestra de Compute Engine para crear una copia de seguridad. Esta instancia ejecuta WordPress, un sistema de administración de contenido de código abierto.

Sigue estos pasos para crear la instancia de WordPress en Compute Engine:

  1. En Google Cloud Marketplace, ve a la página de inicio WordPress certificado por Bitnami.
  2. Haz clic en Iniciar.
  3. Aparecerá una ventana emergente que contiene una lista de tus proyectos. Selecciona el proyecto que creaste antes para este instructivo.

    Google Cloud configura las API necesarias en tu proyecto y, luego de una breve espera, se muestra una pantalla en la que se incluyen las diferentes opciones de configuración para la instancia de Compute Engine de WordPress.

  4. De forma opcional, puedes cambiar el tipo de disco de arranque a SSD para aumentar la velocidad de arranque de la instancia.

  5. Haz clic en Implementar.

    Se te dirigirá a la pantalla de Deployment Manager, en la que podrás ver el estado de la implementación.

    Mediante la secuencia de comandos de Deployment Manager de WordPress, se crea la instancia de Compute Engine de WordPress y dos reglas de firewall para permitir que el tráfico de TCP ingrese a la instancia a través de los puertos 80 y 443. Este proceso puede tomar varios minutos. Se implementará cada elemento y se mostrará un ícono de rueda de progreso.

    Cuando se haya completado el proceso, la instancia de WordPress estará lista y entregará el contenido predeterminado en la URL del sitio web. En la pantalla de Deployment Manager, se muestra la URL del sitio web [Dirección del sitio (Site address)], la URL de la Consola del administrador (Admin URL) con el usuario y la contraseña, los vínculos de la documentación y los próximos pasos sugeridos.

    Se muestra la instancia implementada en Deployment Manager

  6. Haz clic en la dirección del sitio para verificar que la instancia de WordPress se encuentre en funcionamiento. Deberías ver una página de blog predeterminada de WordPress.

La instancia de muestra de Compute Engine ya está lista. El siguiente paso es configurar un proceso automático de copia de seguridad incremental del disco persistente de esa instancia.

Crea operadores de Airflow personalizados

Para crear una copia de seguridad del disco persistente de la instancia de prueba, puedes crear un flujo de trabajo de Airflow que detenga la instancia, tome una instantánea de su disco persistente y reinicie la instancia. Cada una de estas tareas se define como código con un operador personalizado de Airflow. Luego, el código de los operadores se agrupa en un complemento de Airflow.

En esta sección, aprenderás a compilar operadores personalizados de Airflow que llamen a la biblioteca cliente de Python de Compute Engine con el fin de controlar el ciclo de vida de la instancia. Para ello, tienes otras opciones, como las siguientes:

  • Usar el BashOperator de Airflow para ejecutar comandos de gcloud compute
  • Usar el HTTPOperator de Airflow para ejecutar llamadas HTTP directamente a la API de REST de Compute Engine
  • Usar el PythonOperator de Airflow para llamar a funciones arbitrarias de Python sin definir operadores personalizados

En este instructivo, no se exploran estas alternativas.

Autoriza llamadas a la API de Compute Engine

Los operadores personalizados que creas en este instructivo usan la biblioteca cliente de Python para llamar a la API de Compute Engine. Las solicitudes a la API se deben autenticar y autorizar. La forma recomendada de hacerlo consiste en usar una estrategia llamada credenciales predeterminadas de la aplicación (ADC).

La estrategia ADC se aplica cada vez que se realiza una llamada desde una biblioteca cliente:

  1. La biblioteca verifica si se especifica una cuenta de servicio en la variable de entorno GOOGLE_APPLICATION_CREDENTIALS.
  2. Si no se especifica, la biblioteca usa la cuenta de servicio predeterminada que proporcionan Compute Engine o GKE.

Si estos dos métodos fallan, se produce un error.

Los operadores de Airflow de este instructivo son parte del segundo método. Cuando creas el entorno de Cloud Composer, se aprovisiona un clúster de GKE. Los nodos de este clúster ejecutan pods trabajadores de Airflow. A su vez, estos trabajadores ejecutan el flujo de trabajo con los operadores personalizados que defines. Debido a que no especificaste una cuenta de servicio cuando creaste el entorno, lo que se usa en la estrategia ADC es la cuenta de servicio predeterminada para los nodos del clúster de GKE.

Los nodos del clúster de GKE son instancias de Compute Engine. Por lo tanto, es sencillo obtener las credenciales asociadas con la cuenta de servicio predeterminada de Compute Engine en el código del operador.

def get_compute_api_client(self):
  credentials = GoogleCredentials.get_application_default()
  return googleapiclient.discovery.build(
      'compute', 'v1', cache_discovery=False, credentials=credentials)

En este código, se usan las credenciales de la aplicación predeterminada para crear un cliente de Python que enviará solicitudes a la API de Compute Engine. En las siguientes secciones, deberás hacer referencia a este código durante la creación de cada operador de Airflow.

Como alternativa al uso de la cuenta de servicio predeterminada de Compute Engine, es posible crear una cuenta de servicio y configurarla como una conexión en la Consola del administrador de Airflow. Este método se describe en la página Administra conexiones de Airflow y permite un control más detallado del acceso a los recursos de Google Cloud. En este instructivo, no se explora esta alternativa.

Cierra la instancia de Compute Engine de forma segura

En esta sección, se analiza la creación del primer operador personalizado de Airflow, StopInstanceOperator. Este operador llama a la API de Compute Engine para detener la instancia de Compute Engine que ejecuta WordPress:

  1. En Cloud Shell, usa un editor de texto, como nano o vim, para abrir el archivo gce_commands_plugin.py:

    vi $HOME/composer-infra-python/no_sensor/plugins/gce_commands_plugin.py
    
  2. Examina las importaciones en la parte superior del archivo:

    import datetime
    import logging
    import time
    from airflow.models import BaseOperator
    from airflow.plugins_manager import AirflowPlugin
    from airflow.utils.decorators import apply_defaults
    import googleapiclient.discovery
    from oauth2client.client import GoogleCredentials

    Las importaciones destacadas son las siguientes:

    • BaseOperator: Es la clase básica que todos los operadores personalizados de Airflow deben heredar.
    • AirflowPlugin: Es la clase básica para crear un grupo de operadores y formar un complemento.
    • apply_defaults: Es el decorador de funciones que completa los argumentos con valores predeterminados si no se especifican en el constructor del operador.
    • GoogleCredentials: Es la clase que se usa para recuperar las credenciales predeterminadas de la app.
    • googleapiclient.discovery: Es el punto de entrada de la biblioteca cliente que permite el descubrimiento de las API de Google subyacentes. En este caso, la biblioteca cliente compila un recurso para interactuar con la API de Compute Engine.
  3. A continuación, observa la clase StopInstanceOperator que se encuentra debajo de las importaciones:

    class StopInstanceOperator(BaseOperator):
      """Stops the virtual machine instance."""
    
      @apply_defaults
      def __init__(self, project, zone, instance, *args, **kwargs):
        self.compute = self.get_compute_api_client()
        self.project = project
        self.zone = zone
        self.instance = instance
        super(StopInstanceOperator, self).__init__(*args, **kwargs)
    
      def get_compute_api_client(self):
        credentials = GoogleCredentials.get_application_default()
        return googleapiclient.discovery.build(
            'compute', 'v1', cache_discovery=False, credentials=credentials)
    
      def execute(self, context):
        logging.info('Stopping instance %s in project %s and zone %s',
                     self.instance, self.project, self.zone)
        self.compute.instances().stop(
            project=self.project, zone=self.zone, instance=self.instance).execute()
        time.sleep(90)

    La clase StopInstanceOperator tiene tres métodos:

    • __init__: Es el constructor de la clase. Recibe el nombre del proyecto, la zona en la que se ejecuta la instancia y el nombre de la instancia que deseas detener. Además, inicializa la variable self.compute mediante una llamada a get_compute_api_client.
    • get_compute_api_client: Es el método auxiliar que muestra una instancia de la API de Compute Engine. Usa la ADC que proporciona GoogleCredentials para autenticarse con la API y autorizar las llamadas posteriores.
    • execute: Es el método del operador principal que se anula desde BaseOperator. Airflow llama a este método para ejecutar el operador. El método imprime un mensaje de información en los registros y, luego, llama a la API de Compute Engine para detener la instancia de Compute Engine que se especifica mediante los tres parámetros que se reciben en el constructor. La función sleep() al final espera hasta que la instancia se haya detenido. En un entorno de producción, debes usar un método más determinista, como la comunicación cruzada entre operadores. Esta técnica se describe más adelante en este instructivo.

Mediante el método stop() de la API de Compute Engine, se cierra la instancia de máquina virtual de forma correcta. El sistema operativo ejecuta las secuencias de comandos de apagado init.d, incluida la de WordPress en /etc/init.d/bitnami. Esta secuencia de comandos también controla el inicio de WordPress cuando se vuelve a iniciar la máquina virtual. Puedes examinar la definición del servicio con la configuración de inicio y apagado en /etc/systemd/system/bitnami.service..

Crea instantáneas de copia de seguridad incrementales con nombres únicos

En esta sección, se crea el segundo operador personalizado, SnapshotDiskOperator. Este operador toma una instantánea del disco persistente de la instancia.

En el archivo gce_commands_plugin.py que abriste en la sección anterior, observa la clase SnapshotDiskOperator:

class SnapshotDiskOperator(BaseOperator):
  """Takes a snapshot of a persistent disk."""

  @apply_defaults
  def __init__(self, project, zone, instance, disk, *args, **kwargs):
    self.compute = self.get_compute_api_client()
    self.project = project
    self.zone = zone
    self.instance = instance
    self.disk = disk
    super(SnapshotDiskOperator, self).__init__(*args, **kwargs)

  def get_compute_api_client(self):
    credentials = GoogleCredentials.get_application_default()
    return googleapiclient.discovery.build(
        'compute', 'v1', cache_discovery=False, credentials=credentials)

  def generate_snapshot_name(self, instance):
    # Snapshot name must match regex '(?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)'
    return ('' + self.instance + '-' +
            datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S'))

  def execute(self, context):
    snapshot_name = self.generate_snapshot_name(self.instance)
    logging.info(
        ("Creating snapshot '%s' from: {disk=%s, instance=%s, project=%s, "
         "zone=%s}"),
        snapshot_name, self.disk, self.instance, self.project, self.zone)
    self.compute.disks().createSnapshot(
        project=self.project, zone=self.zone, disk=self.disk,
        body={'name': snapshot_name}).execute()
    time.sleep(120)

La clase SnapshotDiskOperator tiene los siguientes métodos:

  • __init__: Es el constructor de la clase. Es similar al constructor de la clase StopInstanceOperator, pero, además del proyecto, la zona y el nombre de la instancia, recibe el nombre del disco desde el que se debe crear la instantánea. Esto se debe a que una instancia puede tener más de un disco persistente conectado a ella.
  • generate_snapshot_name: A través de este método de muestra, se crea un nombre único y simple para cada instantánea mediante el uso del nombre de la instancia, la fecha y la hora con un nivel de detalle de un segundo. Debes ajustar el nombre a tus necesidades, por ejemplo: mediante el agregado del nombre del disco cuando se conectan varios discos a una instancia o el aumento del nivel de detalle de la hora para admitir solicitudes de creación de instantáneas ad hoc.
  • execute: Es el método del operador principal que se anula desde BaseOperator. Cuando el trabajador de Airflow lo ejecuta, genera un nombre de instantánea mediante el método generate_snapshot_name. Luego, imprime un mensaje de información y llama a la API de Compute Engine para crear la instantánea con los parámetros que se reciben en el constructor.

Inicia la instancia de Compute Engine

En esta sección, deberás crear el tercer y último operador personalizado, StartInstanceOperator. Este operador reinicia una instancia de Compute Engine.

En el archivo gce_commands_plugin.py que abriste antes, observa la clase SnapshotDiskOperator que se encuentra en la parte inferior:

class StartInstanceOperator(BaseOperator):
  """Starts a virtual machine instance."""

  @apply_defaults
  def __init__(self, project, zone, instance, *args, **kwargs):
    self.compute = self.get_compute_api_client()
    self.project = project
    self.zone = zone
    self.instance = instance
    super(StartInstanceOperator, self).__init__(*args, **kwargs)

  def get_compute_api_client(self):
    credentials = GoogleCredentials.get_application_default()
    return googleapiclient.discovery.build(
        'compute', 'v1', cache_discovery=False, credentials=credentials)

  def execute(self, context):
    logging.info('Starting instance %s in project %s and zone %s',
                 self.instance, self.project, self.zone)
    self.compute.instances().start(
        project=self.project, zone=self.zone, instance=self.instance).execute()
    time.sleep(20)

La clase StartInstanceOperator tiene los siguientes métodos:

  • __init__: Es el constructor de la clase. Es similar al constructor de la clase StopInstanceOperator.
  • execute: Es el método del operador principal que se anula desde BaseOperator. Se diferencia de los operadores anteriores mediante la invocación de la API de Compute Engine adecuada con el fin de iniciar la instancia que se indica en los parámetros de entrada del constructor.

Define el flujo de trabajo de Airflow

Antes, definiste un complemento de Airflow que contenía tres operadores. Estos definen las tareas que forman parte de un flujo de trabajo de Airflow. El flujo de trabajo que se presenta en esta página es simple y lineal, pero los de Airflow pueden ser grafos acíclicos dirigidos complejos.

En esta sección, se crea la clase del complemento que expone los tres operadores; luego, se crea el DAG mediante el uso de estos, se implementa en Cloud Composer y se ejecuta el DAG.

Crea el complemento

Hasta ahora, en el archivo gce_commands_plugin.py, se incluyen los operadores de inicio, instantáneas y detención. Para usar estos operadores en un flujo de trabajo, debes incluirlos en una clase de complemento.

  1. Ten en cuenta la clase GoogleComputeEnginePlugin que se encuentra en la parte inferior del archivo gce_commands_plugin.py:

    class GoogleComputeEnginePlugin(AirflowPlugin):
      """Expose Airflow operators."""
    
      name = 'gce_commands_plugin'
      operators = [StopInstanceOperator, SnapshotDiskOperator,
                   StartInstanceOperator]

    Esta clase, que se hereda de AirflowPlugin, le otorga al complemento el nombre interno gce_commands_plugin y le agrega los tres operadores.

  2. Cierra el archivo gce_commands_plugin.py.

Configura el grafo acíclico dirigido

El DAG define el flujo de trabajo que ejecuta Airflow. Para que el DAG sepa en qué disco debe crear la copia de seguridad, debes definir algunas variables: la instancia de Compute Engine a la que se conecta el disco, la zona en la que se ejecuta la instancia y el proyecto en el que están disponibles todos los recursos.

Podrías integrar estas variables como parte del código fuente del DAG, pero se recomienda definirlas como variables de Airflow. De esta manera, cualquier cambio en la configuración se puede administrar de forma centralizada e independiente de las implementaciones de código.

Define la configuración del DAG:

  1. En Cloud Shell, establece la ubicación de tu entorno de Cloud Composer:

    LOCATION=[CLOUD_ENV_LOCATION]
    

    La ubicación es la región de Compute Engine en la que se encuentra el entorno de Cloud Composer, por ejemplo: us-central1 o europe-west1. Se estableció durante la creación del entorno y está disponible en la página de la consola de Cloud Composer.

  2. Establece el nombre del entorno de Cloud Composer:

    ENVIRONMENT=$(gcloud composer environments list \
        --format="value(name)" --locations $LOCATION)
    

    El parámetro --format se usa para seleccionar solo la columna name de la tabla resultante. Puedes suponer que se creó un solo entorno.

  3. Crea la variable PROJECT en Airflow mediante el nombre del proyecto actual de Google Cloud:

    gcloud composer environments run $ENVIRONMENT --location $LOCATION \
        variables -- --set PROJECT $(gcloud config get-value project)
    

    En el ejemplo anterior, se ilustra lo siguiente:

    • gcloud composer environments run se usa para ejecutar comandos de la CLI de Airflow.
    • El comando variables de Airflow configura la variable PROJECT de Airflow como el valor que muestra gcloud config.
  4. Crea la variable INSTANCE en Airflow con el nombre de la instancia de WordPress:

    gcloud composer environments run $ENVIRONMENT --location $LOCATION \
        variables -- --set INSTANCE \
        $(gcloud compute instances list \
        --format="value(name)" --filter="name~'.*wordpress.*'")
    

    En este comando, se usa el parámetro --filter para seleccionar solo la instancia cuyo name coincide con una expresión regular que contiene la string wordpress. En este enfoque, se supone que solo hay una instancia de este tipo y que “wordpress” forma parte del nombre de la instancia y el disco, lo cual es verdadero si aceptaste los valores predeterminados.

  5. Crea la variable ZONE en Airflow mediante la zona de la instancia de WordPress:

    gcloud composer environments run $ENVIRONMENT --location $LOCATION \
        variables -- --set ZONE \
        $(gcloud compute instances list \
        --format="value(zone)" --filter="name~'.*wordpress.*'")
    
  6. Crea la variable DISK en Airflow con el nombre del disco persistente conectado a la instancia de WordPress:

    gcloud composer environments run $ENVIRONMENT --location $LOCATION \
        variables -- --set DISK \
        $(gcloud compute disks list \
        --format="value(name)" --filter="name~'.*wordpress.*'")
    
  7. Verifica que las variables de Airflow se hayan creado de forma adecuada:

    1. En Cloud Console, ve a la página de Cloud Composer.

      Ir a la página de Cloud Composer

    2. En la columna Airflow web server, haz clic en el vínculo de Airflow. Se abrirá una pestaña nueva en la que se muestra la página principal del servidor web de Airflow.

    3. Haz clic en Administrador (Admin) y, luego, en Variables (Variables).

      En la lista, se muestran las variables de configuración del DAG.

      Variables de configuración del DAG

Crea el grafo acíclico dirigido

La definición del DAG se encuentra en un archivo dedicado de Python. El siguiente paso es crear el DAG y así encadenar los tres operadores del complemento.

  1. En Cloud Shell, usa un editor de texto, como nano o vim, para abrir el archivo backup_vm_instance.py:

    vi $HOME/composer-infra-python/no_sensor/dags/backup_vm_instance.py
    
  2. Examina las importaciones en la parte superior del archivo:

    import datetime
    from airflow import DAG
    from airflow.models import Variable
    from airflow.operators import SnapshotDiskOperator
    from airflow.operators import StartInstanceOperator
    from airflow.operators import StopInstanceOperator
    from airflow.operators.dummy_operator import DummyOperator

    A continuación, se resumen estas importaciones:

    • DAG es la clase del grafo acíclico dirigido que define Airflow.
    • DummyOperator se usa para crear los operadores no-ops iniciales y finales con el fin de mejorar la visualización del flujo de trabajo. En los DAG más complejos, DummyOperator se puede usar para unir ramas y crear SubDAG.
    • El DAG usa los tres operadores que definiste en las secciones anteriores.
  3. Define los valores de los parámetros que se pasarán a los constructores del operador:

    INTERVAL = '@daily'
    START_DATE = datetime.datetime(2018, 7, 16)
    PROJECT = Variable.get('PROJECT')
    ZONE = Variable.get('ZONE')
    INSTANCE = Variable.get('INSTANCE')
    DISK = Variable.get('DISK')

    En el ejemplo anterior, se ilustra lo siguiente:

    • INTERVAL define con qué frecuencia se ejecuta el flujo de trabajo de copia de seguridad. En el código anterior, se especifica una recurrencia diaria mediante un ajuste predeterminado del cron de Airflow. Si deseas usar un intervalo diferente, consulta la página de referencia Ejecuciones de DAG. También puedes activar el flujo de trabajo de forma manual e independiente de este programa.
    • START_DATE define el momento en el que las copias de seguridad están programadas para iniciarse. No es necesario cambiar este valor.
    • El resto de los valores se recuperan de las variables de Airflow que configuraste en la sección anterior.
  4. Usa el siguiente código para crear el DAG con algunos de los parámetros definidos antes. Mediante este código, también se le otorga al DAG un nombre y una descripción, que se muestran en la IU de Cloud Composer.

    dag1 = DAG('backup_vm_instance',
               description='Backup a Compute Engine instance using an Airflow DAG',
               schedule_interval=INTERVAL,
               start_date=START_DATE,
               catchup=False)
  5. Propaga el DAG con tareas, que son instancias del operador:

    ## Dummy tasks
    begin = DummyOperator(task_id='begin', retries=1, dag=dag1)
    end = DummyOperator(task_id='end', retries=1)
    
    ## Compute Engine tasks
    stop_instance = StopInstanceOperator(
        project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='stop_instance')
    snapshot_disk = SnapshotDiskOperator(
        project=PROJECT, zone=ZONE, instance=INSTANCE,
        disk=DISK, task_id='snapshot_disk')
    start_instance = StartInstanceOperator(
        project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='start_instance')

    Mediante este código, se crea una instancia de todas las tareas necesarias para el flujo de trabajo y se pasan los parámetros definidos a los constructores correspondientes del operador.

    • Los valores task_id son los ID únicos que se mostrarán en la IU de Cloud Composer. Los usarás más tarde para pasar datos entre las tareas.
    • retries establece la cantidad de veces que se debe reintentar una tarea antes de que falle. En las tareas DummyOperator, estos valores se ignoran.
    • dag=dag indica que una tarea se encuentra adjunta al DAG que se creó antes. Este parámetro solo es necesario en la primera tarea del flujo de trabajo.
  6. Define la secuencia de tareas que componen el DAG del flujo de trabajo:

    # Airflow DAG definition
    begin >> stop_instance >> snapshot_disk >> start_instance >> end
    
  7. Cierra el archivo gce_commands_plugin.py.

Ejecuta el flujo de trabajo

El flujo de trabajo que representa el operador del DAG ya está listo para que Cloud Composer lo ejecute. Cloud Composer lee las definiciones del DAG y del complemento desde un depósito asociado de Cloud Storage. Este depósito y los directorios dags y plugins correspondientes se crearon de forma automática cuando creaste el entorno de Cloud Composer.

Mediante el uso de Cloud Shell, puedes copiar el DAG y el complemento en el depósito asociado de Cloud Storage:

  1. En Cloud Shell, obtén el nombre del depósito:

    BUCKET=$(gsutil ls)
    echo $BUCKET
    

    Debe haber un solo depósito con el nombre del formulario: gs://[REGION]-{ENVIRONMENT_NAME]-{ID}-bucket/.

  2. Ejecuta la siguiente secuencia de comandos para copiar los archivos del DAG y del complemento en los directorios correspondientes de los depósitos:

    gsutil cp $HOME/composer-infra-python/no_sensor/plugins/gce_commands_plugin.py "$BUCKET"plugins
    gsutil cp $HOME/composer-infra-python/no_sensor/dags/backup_vm_instance.py "$BUCKET"dags
    

    En el nombre del depósito, ya se incluye una barra diagonal final, por lo cual la variable $BUCKET lleva comillas dobles.

  3. En Cloud Console, ve a la página de Cloud Composer.

    Ir a la página de Cloud Composer

  4. En la columna Airflow web server, haz clic en el vínculo de Airflow. Se abrirá una pestaña nueva en la que se muestra la página principal del servidor web de Airflow. Espera entre dos y tres minutos, y vuelve a cargar la página. Es posible que debas repetir esto un par de veces hasta que la página esté lista.

    Se muestra una lista en la que aparece el DAG recién creado, que es similar al siguiente:

    Lista de los DAG

    Si hay errores de sintaxis en el código, se mostrará un mensaje en la parte superior de la tabla de los DAG. Si hay errores del entorno de ejecución, se marcarán en Ejecuciones de DAG (DAG Runs). Corrige los errores antes de continuar. La forma más sencilla de hacer esto es volver a copiar los archivos del repositorio de GitHub en el depósito.

  5. Para ver un seguimiento de pila más detallado, ejecuta el siguiente comando en Cloud Shell:

    gcloud composer environments run $ENVIRONMENT --location $LOCATION list_dags
    
  6. Airflow comienza a ejecutar el flujo de trabajo de inmediato, que se muestra en la columna Ejecuciones de DAG (DAG Runs).

    El flujo de trabajo ya está en progreso, pero si necesitas volver a ejecutarlo, puedes activarlo de forma manual con los siguientes pasos:

    1. En la columna Vínculos (Links), haz clic en el primer ícono, Trigger DAG, que está marcado con una flecha en la captura de pantalla anterior.
    2. En la ventana emergente de confirmación ¿Estás seguro?, haz clic en Aceptar.

      Luego de unos segundos, se inicia el flujo de trabajo y aparece una ejecución nueva como un círculo de color verde claro en Ejecuciones de DAG (DAG Runs).

  7. En la columna Vínculos (Links), haz clic en el ícono Vista de gráfico (Graph View), que está marcado con una flecha en la captura de pantalla anterior.

    Captura de pantalla de Cloud Composer

    En la Vista de gráfico (Graph View), se muestra el flujo de trabajo, las tareas ejecutadas de forma adecuada con un borde de color verde oscuro, la tarea que se está ejecutando con un borde de color verde claro y las tareas pendientes sin borde. Puedes hacer clic en la tarea para ver los registros y sus detalles, y realizar otras operaciones.

  8. Para continuar la ejecución, haz clic de forma periódica en el botón de actualización que se encuentra en la esquina superior derecha.

    Felicitaciones. Completaste la ejecución de tu primer flujo de trabajo de Cloud Composer. Cuando finalice el flujo de trabajo, crea una instantánea del disco persistente de la instancia de Compute Engine.

  9. En Cloud Shell, verifica que se haya creado la instantánea:

    gcloud compute snapshots list
    

    Como alternativa, puedes usar el menú de Cloud Console para ir a la página Instantáneas de Compute Engine.

    Página Instantáneas de Compute Engine

En este punto, deberías poder ver una instantánea. Las ejecuciones de los flujos de trabajo posteriores, que se activan de forma manual o automática según el programa especificado, crearán más instantáneas.

Las instantáneas son incrementales. La primera instantánea es la de mayor tamaño, ya que contiene de forma comprimida todos los bloques del disco persistente. Las instantáneas sucesivas solo contienen los bloques en los que hubo cambios desde la instantánea anterior y cualquier referencia a los bloques sin cambios. Por lo tanto, las instantáneas posteriores son más pequeñas que la primera, tardan menos en producirse y cuestan menos.

Si se borra una instantánea, sus datos se transfieren a la siguiente instantánea correspondiente para mantener la coherencia de los deltas consecutivos que se almacenan en la cadena de instantáneas. Solo cuando se hayan quitado todas las instantáneas, se quitarán todos los datos de copia de seguridad del disco persistente.

Crea el sensor de Airflow personalizado

Cuando ejecutaste el flujo de trabajo, es posible que hayas notado que lleva tiempo completar cada paso. Esta espera se debe a que los operadores incluyen una instrucción sleep() al final con el objetivo de darle tiempo a la API de Compute Engine para que finalice su trabajo antes de iniciar la siguiente tarea.

Sin embargo, este enfoque no es óptimo y puede causar problemas inesperados. Por ejemplo, durante la creación de instantáneas, el tiempo de espera puede ser demasiado largo en el caso de las instantáneas incrementales, lo que significa que pierdes tiempo en la espera de una tarea que ya finalizó. O bien, puede que el tiempo de espera sea demasiado corto. Esto puede generar que todo el flujo de trabajo falle o produzca resultados poco confiables debido a que la instancia no se detuvo por completo o el proceso de la instantánea no se completó cuando se inició la máquina.

Debes poder indicarle a la siguiente tarea que se completó la anterior. Una solución consiste en usar los sensores de Airflow, que pausan el flujo de trabajo hasta que se cumplan algunos criterios. En este caso, el criterio es que la operación anterior de Compute Engine finalice de forma adecuada.

Comparte datos de comunicación cruzada entre tareas

Cuando las tareas necesitan comunicarse entre sí, Airflow proporciona un mecanismo conocido como XCom o “comunicación cruzada”. XCom permite que las tareas intercambien mensajes que constan de una clave, un valor y una marca de tiempo.

La forma más sencilla de pasar un mensaje mediante XCom es que un operador muestre un valor de su método execute(). El valor puede ser cualquier objeto que Python pueda serializar mediante el módulo de serialización.

Los tres operadores descritos en las secciones anteriores llaman a la API de Compute Engine. Todas estas llamadas a la API muestran un objeto del recurso de la operación. Estos objetos se diseñaron para usarse en la administración de solicitudes asíncronas, como las de los operadores de Airflow. Cada objeto tiene un campo name que puedes usar para consultar el estado más reciente de la operación de Compute Engine.

Modifica los operadores para que muestren el name del objeto del recurso de la operación:

  1. En Cloud Shell, usa un editor de texto, como nano o vim, para abrir el archivo gce_commands_plugin.py, esta vez desde el directorio sensor/plugins:

    vi $HOME/composer-infra-python/sensor/plugins/gce_commands_plugin.py
    
  2. En el método de ejecución de StopInstanceOperator, observa el siguiente código:

    self.compute.instances().stop(
        project=self.project, zone=self.zone, instance=self.instance).execute()
    time.sleep(90)

    Este código se reemplazó por el siguiente:

    operation = self.compute.instances().stop(
        project=self.project, zone=self.zone, instance=self.instance).execute()
    return operation['name']

    En el ejemplo anterior, se ilustra lo siguiente:

    • En la primera línea, se captura el valor de retorno de la llamada a la API en la variable operation.
    • En la segunda línea, se muestra el campo name de la operación del método execute(). A través de esta instrucción, se serializa el nombre mediante pickle y se lo envía al espacio compartido entre las tareas de XCom. El valor se extraerá más adelante en un orden según el cual el último en ingresar es el primero en ser extraído.

    Si una tarea necesita enviar varios valores, es posible otorgarle a XCom una key explícita si se llama a xcom_push() directamente, en lugar de mostrar el valor.

  3. Del mismo modo, en el método de ejecución de SnapshotDiskOperator, observa el siguiente código:

    self.compute.disks().createSnapshot(
        project=self.project, zone=self.zone, disk=self.disk,
        body={'name': snapshot_name}).execute()
    time.sleep(120)

    Este código se reemplazó por el siguiente:

    operation = self.compute.disks().createSnapshot(
        project=self.project, zone=self.zone, disk=self.disk,
        body={'name': snapshot_name}).execute()
    return operation['name']

    Hay dos nombres que no tienen relación en este código. El primero se refiere al nombre de la instantánea y el segundo es el nombre de la operación.

  4. Por último, en el método de ejecución del StartInstanceOperator, observa el siguiente código:

    self.compute.instances().start(
        project=self.project, zone=self.zone, instance=self.instance).execute()
    time.sleep(20)

    Este código se reemplazó por el siguiente:

    operation = self.compute.instances().start(
        project=self.project, zone=self.zone, instance=self.instance).execute()
    return operation['name']
  5. En este punto, no debería haber llamadas al método sleep() en el archivo gce_commands_plugin.py. Busca sleep en el archivo para asegurarte de que esto sea así. De lo contrario, vuelve a verificar los pasos anteriores de esta sección.

    Como no se realizan llamadas a sleep() a partir del código, se quitó la siguiente línea de la sección de importaciones en la parte superior del archivo:

    import time
    
  6. Cierra el archivo gce_commands_plugin.py.

Implementa y expón el sensor

En la sección anterior, modificaste cada operador para que muestre el nombre de una operación de Compute Engine. En esta sección, mediante el uso de este nombre, deberás crear un sensor de Airflow para sondear la API de Compute Engine a fin de completar cada operación.

  1. En Cloud Shell, usa un editor de texto, como nano o vim, para abrir el archivo gce_commands_plugin.py y asegúrate de usar el directorio sensor/plugins:

    vi $HOME/composer-infra-python/sensor/plugins/gce_commands_plugin.py
    

    Observa la siguiente línea de código en la parte superior de la sección de importación, justo debajo de la línea from airflow.models import BaseOperator:

    from airflow.operators.sensors import BaseSensorOperator
    

    Todos los sensores se derivan de la clase BaseSensorOperator y deben anular su método poke().

  2. Examina la clase OperationStatusSensor nueva:

    class OperationStatusSensor(BaseSensorOperator):
      """Waits for a Compute Engine operation to complete."""
    
      @apply_defaults
      def __init__(self, project, zone, instance, prior_task_id, *args, **kwargs):
        self.compute = self.get_compute_api_client()
        self.project = project
        self.zone = zone
        self.instance = instance
        self.prior_task_id = prior_task_id
        super(OperationStatusSensor, self).__init__(*args, **kwargs)
    
      def get_compute_api_client(self):
        credentials = GoogleCredentials.get_application_default()
        return googleapiclient.discovery.build(
            'compute', 'v1', cache_discovery=False, credentials=credentials)
    
      def poke(self, context):
        operation_name = context['task_instance'].xcom_pull(
            task_ids=self.prior_task_id)
        result = self.compute.zoneOperations().get(
            project=self.project, zone=self.zone,
            operation=operation_name).execute()
    
        logging.info(
            "Task '%s' current status: '%s'", self.prior_task_id, result['status'])
        if result['status'] == 'DONE':
          return True
        else:
          logging.info("Waiting for task '%s' to complete", self.prior_task_id)
          return False

    La clase OperationStatusSensor tiene los siguientes métodos:

    • __init__: Es el constructor de la clase. Este constructor toma parámetros similares a los de los operadores, con una excepción: prior_task_id. Este parámetro es el ID de la tarea anterior.
    • poke: Es el método del sensor principal que se anula desde BaseSensorOperator. Airflow llama a este método cada 60 segundos hasta que el método muestra True. Solo en ese caso se permite la ejecución de las tareas descendentes.

      Si deseas configurar el intervalo de estos reintentos, pasa el parámetro poke_interval al constructor. También puedes definir un timeout. Para obtener más información, consulta la referencia de la API de BaseSensorOperator.

      En la implementación del método poke anterior, la primera línea es una llamada a xcom_pull(). Mediante este método, se obtiene el valor de XCom más reciente para la tarea que identifica prior_task_id. El valor es el nombre de una operación de Compute Engine y se almacena en la variable operation_name.

      Luego, mediante el código, se ejecuta el método zoneOperations.get() y se pasa operation_name como parámetro para obtener el estado más reciente de la operación. Si el estado es DONE, el método poke() muestra True. De lo contrario, muestra False. En el primer caso, se iniciarán las tareas descendentes. En el segundo caso, la ejecución del flujo de trabajo permanece detenida y se vuelve a llamar al método poke() después de poke_interval segundos.

  3. En la parte inferior del archivo, observa que se actualizó la clase GoogleComputeEnginePlugin para agregar OperationStatusSensor a la lista de operadores que exporta el complemento:

    class GoogleComputeEnginePlugin(AirflowPlugin):
      """Expose Airflow operators and sensor."""
    
      name = 'gce_commands_plugin'
      operators = [StopInstanceOperator, SnapshotDiskOperator,
                   StartInstanceOperator, OperationStatusSensor]
  4. Cierra el archivo gce_commands_plugin.py.

Actualiza el flujo de trabajo

Después de crear el sensor en el complemento, puedes agregarlo al flujo de trabajo. En esta sección, deberás actualizar el flujo de trabajo a su estado final, que incluye los tres operadores además de las tareas del sensor entre ellos. Luego, deberás ejecutar y verificar el flujo de trabajo actualizado.

  1. En Cloud Shell, usa un editor de texto, como nano o vim, para abrir el archivo backup_vm_instance.py, esta vez desde el directorio sensor/dags:

    vi $HOME/composer-infra-python/sensor/dags/backup_vm_instance.py
    
    
  2. En la sección de importaciones, observa que el sensor recién creado se importa debajo de la línea from airflow operators import StartInstanceOperator:

    from airflow.operators import OperationStatusSensor
    
  3. Examina las líneas que siguen al comentario ## Wait tasks.

    ## Wait tasks
    wait_for_stop = OperationStatusSensor(
        project=PROJECT, zone=ZONE, instance=INSTANCE,
        prior_task_id='stop_instance', poke_interval=15, task_id='wait_for_stop')
    wait_for_snapshot = OperationStatusSensor(
        project=PROJECT, zone=ZONE, instance=INSTANCE,
        prior_task_id='snapshot_disk', poke_interval=10,
        task_id='wait_for_snapshot')
    wait_for_start = OperationStatusSensor(
        project=PROJECT, zone=ZONE, instance=INSTANCE,
        prior_task_id='start_instance', poke_interval=5, task_id='wait_for_start')

    El código vuelve a usar OperationStatusSensor para definir tres “tareas de espera” intermedias. Cada una de estas espera a que se complete la operación anterior. Los siguientes parámetros se pasan al constructor del sensor:

    • El PROJECT, la ZONE y la INSTANCE de la instancia de WordPress, que ya están definidos en el archivo
    • prior_task_id: Es el ID de la tarea que espera el sensor. Por ejemplo, la tarea wait_for_stop espera a que se complete la tarea con el ID stop_instance

    • poke_interval: Es la cantidad de segundos que Airflow debe esperar entre las llamadas de reintento al método poke() del sensor. En otras palabras, es la frecuencia con la que se debe verificar si ya finalizó prior_task_id

    • task_id: Es el ID de la tarea de espera recién creada

  4. En la parte inferior del archivo, observa el siguiente código:

    begin >> stop_instance >> snapshot_disk >> start_instance >> end
    

    Este código se reemplazó por el siguiente:

    begin >> stop_instance >> wait_for_stop >> snapshot_disk >> wait_for_snapshot \
            >> start_instance >> wait_for_start >> end
    

    Estas líneas definen el flujo de trabajo de la copia de seguridad completa.

  5. Cierra el archivo backup_vm_instance.py.

Ahora, debes copiar el DAG y el complemento del depósito asociado de Cloud Storage:

  1. En Cloud Shell, obtén el nombre del depósito:

    BUCKET=$(gsutil ls)
    echo $BUCKET
    

    Deberías ver un solo depósito que tiene un nombre con el siguiente formato: gs://[REGION]-[ENVIRONMENT_NAME]-[ID]-bucket/.

  2. Ejecuta la siguiente secuencia de comandos para copiar los archivos del DAG y del complemento en los directorios correspondientes de los depósitos:

    gsutil cp $HOME/composer-infra-python/sensor/plugins/gce_commands_plugin.py "$BUCKET"plugins
    gsutil cp $HOME/composer-infra-python/sensor/dags/backup_vm_instance.py "$BUCKET"dags
    

    En el nombre del depósito, ya se incluye una barra diagonal final, por lo cual la variable $BUCKET lleva comillas dobles.

  3. Sube el flujo de trabajo actualizado a Airflow:

    1. En Cloud Console, ve a la página de Cloud Composer.

      Ir a la página de Cloud Composer

    2. En la columna Airflow, haz clic en el vínculo de Airflow web server para que se muestre la página principal de Airflow.

    3. Espera dos o tres minutos hasta que Airflow actualice el complemento y el flujo de trabajo de forma automática. Es posible que observes que la tabla de los DAG se vacía de forma momentánea. Vuelve a cargar la página algunas veces hasta que aparezca la sección Vínculos de forma coherente.

    4. Asegúrate de que no se muestren errores y, en la sección Vínculos, haz clic en Vista de árbol (Tree View).

      Captura de pantalla de la página de la vista de árbol de Cloud Composer El flujo de trabajo se representa a la izquierda como un árbol ascendente. A la derecha, se muestra un gráfico de las ejecuciones de tareas en distintas fechas. Un cuadrado de color verde representa una ejecución exitosa de esa tarea y en esa fecha en específicas. Un cuadrado de color blanco representa una tarea que nunca se ejecutó. Debido a que actualizaste el DAG con tareas nuevas del sensor, todas estas se muestran en blanco, mientras que las tareas de Compute Engine se muestran en verde.

    5. Ejecuta el flujo de trabajo de copia de seguridad actualizado:

      1. En el menú superior, haz clic en DAG para volver a la página principal.
      2. En la columna Vínculos, haz clic en Trigger DAG.
      3. En la ventana emergente de confirmación ¿Estás seguro?, haz clic en Aceptar. Se iniciará la ejecución de un flujo de trabajo nuevo, que aparece como un círculo de color verde claro en la columna Ejecuciones de DAG.
    6. En Vínculos, haz clic en el ícono de Vista de gráfico (Graph View) para observar la ejecución del flujo de trabajo en tiempo real.

    7. Haz clic en el botón de actualización que se encuentra en el lado derecho para seguir la ejecución de la tarea. Observa cómo se detiene el flujo de trabajo en cada una de las tareas del sensor para esperar a que finalice la tarea anterior. El tiempo de espera se ajusta a las necesidades de cada tarea, en lugar de depender de un valor de suspensión integrado como parte del código.

    Captura de pantalla de la ejecución de la tarea de Cloud Composer

  4. De forma opcional, durante el flujo de trabajo, vuelve a Cloud Console, selecciona el menú Compute Engine y haz clic en Instancias de VM para ver cómo se detiene y se reinicia la máquina virtual. También puedes hacer clic en Instantáneas para ver la instantánea nueva que se está creando.

Ya ejecutaste un flujo de trabajo de copia de seguridad que crea una instantánea desde una instancia de Compute Engine. Esta instantánea sigue las prácticas recomendadas y optimiza el flujo con sensores.

Restablece una instancia a partir de una instantánea

Tener una instantánea disponible es solo una parte del proceso de copia de seguridad. La otra parte es poder restablecer tu instancia desde la instantánea.

Para crear una instancia mediante una instantánea, sigue estos pasos:

  1. En Cloud Shell, obtén una lista de las instantáneas disponibles:

    gcloud compute snapshots list
    

    El resultado es similar al siguiente ejemplo:

    NAME                              DISK_SIZE_GB  SRC_DISK                            STATUS
    wordpress-1-vm-2018-07-18-120044  10            us-central1-c/disks/wordpress-1-vm  READY
    wordpress-1-vm-2018-07-18-120749  10            us-central1-c/disks/wordpress-1-vm  READY
    wordpress-1-vm-2018-07-18-125138  10            us-central1-c/disks/wordpress-1-vm  READY
    
  2. Selecciona una instantánea y crea un disco persistente de arranque independiente a partir de ella. Reemplaza los marcadores de posición que se encuentran entre corchetes por tus propios valores.

    gcloud compute disks create [DISK_NAME] --source-snapshot [SNAPSHOT_NAME] \
        --zone=[ZONE]
    

    En el ejemplo anterior, se ilustra lo siguiente:

    • DISK_NAME es el nombre del nuevo disco persistente de arranque independiente.
    • SNAPSHOT_NAME es la instantánea seleccionada de la primera columna del resultado anterior.
    • ZONE es la zona de procesamiento en la que se creará el disco nuevo.
  3. Usa el disco de arranque para crear una instancia nueva. Reemplaza [INSTANCE_NAME] por el nombre de la instancia que deseas crear.

    gcloud compute instances create [INSTANCE_NAME] --disk name=[DISK_NAME],boot=yes \
        --zone=ZONE --tags=wordpress-1-tcp-443,wordpress-1-tcp-80
    

    Gracias las dos etiquetas especificadas en el comando, la instancia puede recibir de forma automática tráfico entrante en los puertos 443 y 80 debido a las reglas de firewall preexistentes que se crearon para la instancia inicial de WordPress.

    Toma nota de la IP externa de la instancia nueva que muestra el comando anterior.

  4. Verifica que WordPress se ejecute en la instancia recién creada. En una pestaña nueva del navegador, navega a la dirección IP externa. Se mostrará la página de destino predeterminada de WordPress.

  5. De forma alternativa, puedes crear una instancia mediante el uso de una instantánea desde consola:

    1. En Cloud Console, ve a la página Instantáneas.

      IR A LA PÁGINA INSTANTÁNEAS

    2. Haz clic en la instantánea más reciente.

    3. Haz clic en Crear instancia.

    4. En el formulario Nueva instancia de VM, haz clic en Administración, seguridad, discos, redes, usuario único y, luego, en Herramientas de redes.

    5. Agrega wordpress-1-tcp-443 y wordpress-1-tcp-80 en el campo Etiquetas de red y presiona Intro después de cada etiqueta. Consulta más arriba para obtener una explicación de estas etiquetas.

    6. Haz clic en Crear.

      Se crea una instancia nueva basada en la instantánea más reciente, que está lista para entregar contenido.

  6. Abre la página Instancias de Compute Engine y toma nota de la IP externa de la instancia nueva.

  7. Verifica que WordPress se ejecute en la instancia recién creada. Navega hacia la IP externa en una pestaña nueva del navegador.

Para obtener más detalles, consulta Crea una instancia a partir de una instantánea.

Realiza una limpieza

  1. En Cloud Console, ve a la página Administrar recursos.

    Ir a la página Administrar recursos

  2. En la lista de proyectos, selecciona el proyecto que deseas borrar y haz clic en Borrar .
  3. En el cuadro de diálogo, escribe el ID del proyecto y haz clic en Cerrar para borrar el proyecto.

Próximos pasos