Flujo de trabajo con Cloud Composer

En este instructivo, se usan los siguientes componentes facturables de Google Cloud:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud sean aptos para obtener una prueba gratuita.

Antes de comenzar

Configura el proyecto

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyecto

  3. Comprueba que la facturación esté habilitada en tu proyecto.

    Descubre cómo puedes habilitar la facturación

  4. Habilita las API de Dataproc, Compute Engine, and Cloud Composer .

    Habilita las API

  5. Instala e inicializa el SDK de Cloud.

Crea una plantilla de flujo de trabajo de Dataproc

Copia y ejecuta los comandos que se muestran a continuación en una ventana de la terminal local o en Cloud Shell para crear y definir una plantilla de flujo de trabajo.

  1. Crea la plantilla de flujo de trabajo sparkpi.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Agrega el trabajo de Spark a la plantilla de flujo de trabajo.sparkpi La marca step-id de “compute” identifica el trabajo de SparkPi.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. Usa un clúster de un solo nodo administrado para ejecutar el flujo de trabajo. Dataproc creará el clúster, ejecutará el flujo de trabajo en él y, luego, lo borrará cuando se complete el flujo de trabajo.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. Confirma la creación de la plantilla de flujo de trabajo.

    Console

    Haz clic en elsparkpi nombre en DataprocFlujos de trabajo en Cloud Console para abrir la página deDetalles de la plantilla de flujo de trabajo página. Haz clic en el nombre de tu plantilla de flujo de trabajo para confirmar los atributos de la plantilla sparkpi.

    Comando de gcloud

    Ejecuta el siguiente comando:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

Cree y suba un DAG a Cloud Storage

  1. Crear o usar un entorno de Cloud Composer existente.
  2. Configurar variables de entorno

    IU de Airflow

    1. En la barra de herramientas, haz clic en Administrador > Variables.
    2. Haga clic en Crear.
    3. Ingresa la siguiente información:
      • Key: project_id
      • Val: PROJECT_ID: el ID de tu proyecto de Google Cloud
    4. Haga clic en Save.

    Comando de gcloud

    Ingresa los siguientes comandos:

    • ENVIRONMENT es el nombre del entorno de Cloud Composer.
    • LOCATION es la región en la que se encuentra el entorno de Cloud Composer.
    gcloud composer environments run ENVIRONMENT \
        --location LOCATION
        
  3. Copia el siguiente código de DAG de forma local en un archivo llamado “composer-dataproc-dag.py”, que usa DataprocInstantiateWorkflowTemplateOperator.

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/concepts.html#variables
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = models.Variable.get("project_id")
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
    
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    
  4. Sube el DAG a tu carpeta de entorno en Cloud Storage. Una vez que la carga se haya completado con éxito, haz clic en el vínculo Carpeta DAG en la página del entorno de Cloud Composer.

Visualiza el estado de una tarea

IU de Airflow

  1. Abre la interfaz web de Airflow.
  2. En la página de los DAG, haz clic en el nombre del DAG (por ejemplo, dataproc_workflow_dag).
  3. En la página de detalles de los DAG, haz clic en Graph View.
  4. Verifica el estado:
    • Tarea con errores: la tarea estará encerrada en un cuadro rojo. También puedes mantener el puntero sobre la tarea y ver si aparece el mensaje Estado: con errores. La tarea tiene un cuadro rojo a su alrededor, lo que indica que falló
    • Éxito: La tarea tiene un cuadro verde a su alrededor. También puedes mantener el puntero sobre la tarea y ver si aparece el mensaje Estado: correcto. La tarea tiene un cuadro verde a su alrededor, lo que indica que se realizó correctamente.

Console

Haz clic en la pestaña Flujos de trabajo para ver el estado del flujo de trabajo.

Comando de gcloud

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

Realice una limpieza

Para evitar que se apliquen cargos a tu cuenta de Google Cloud, puedes borrar los recursos que usaste en este instructivo:

  1. Borra el entorno de Cloud Composer.

  2. Borra la plantilla de flujo de trabajo.

¿Qué sigue?