Flujo de trabajo con Cloud Composer

En este documento, usarás los siguientes componentes facturables de Google Cloud:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Antes de comenzar

Configura el proyecto

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Make sure that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init

Crea una plantilla de flujo de trabajo de Dataproc

Copia y ejecuta los comandos que se indican a continuación en una ventana de la terminal local o en Cloud Shell para crear y definir una plantilla de flujo de trabajo.

  1. Crea la plantilla de flujo de trabajo sparkpi.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Agrega el trabajo de Spark a la plantilla de flujo de trabajo.sparkpi La marca step-id de “compute” identifica el trabajo de SparkPi.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. Usa un clúster administrado y de un solo nodo para ejecutar el flujo de trabajo. Dataproc creará el clúster, ejecutará el flujo de trabajo en él y, luego, borrará el clúster cuando este se complete.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. Confirma la creación de la plantilla de flujo de trabajo.

    Console

    Haz clic en el nombre de sparkpi en Dataproc Flujos de trabajo en la consola de Google Cloud para abrir la Página Detalles de la plantilla de flujo de trabajo. Haz clic en el nombre de tu plantilla de flujo de trabajo para confirmar los atributos de la plantilla sparkpi.

    Comando de gcloud

    Ejecuta el siguiente comando:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

Crea y sube un DAG a Cloud Storage

  1. Crea o usa un entorno existente de Cloud Composer.
  2. Configurar variables de entorno

    IU de Airflow

    1. En la barra de herramientas, haz clic en Administrador > Variables.
    2. Haga clic en Crear.
    3. Ingresa la siguiente información:
      • Key: project_id
      • Val: PROJECT_ID: El ID de tu proyecto de Google Cloud
    4. Haga clic en Save.

    Comando de gcloud

    Ingresa los siguientes comandos:

    • ENVIRONMENT es el nombre del entorno de Cloud Composer.
    • LOCATION es la región en la que se encuentra el entorno de Cloud Composer.
    • PROJECT_ID es el ID del proyecto que contiene el entorno de Cloud Composer.
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. Copia el siguiente código de DAG de forma local en un archivo llamado “composer-dataproc-dag.py”, que usa DataprocInstantiateWorkflowTemplateOperator.

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Airflow 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Sube el DAG a tu carpeta de entorno en Cloud Storage. Una vez que la carga se haya completado de forma correcta, haz clic en el vínculo Carpeta de DAG en la página del entorno de Cloud Composer.

Visualiza el estado de una tarea

IU de Airflow

  1. Abre la interfaz web de Airflow.
  2. En la página de los DAG, haz clic en el nombre del DAG (por ejemplo, dataproc_workflow_dag).
  3. En la página de detalles de los DAG, haz clic en Graph View.
  4. Verifica el estado:
    • Tarea con errores: la tarea estará encerrada en un cuadro rojo. También puedes mantener el puntero sobre la tarea y ver si aparece el mensaje Estado: con errores. La tarea tiene un cuadro rojo a su alrededor, lo que indica que falló
    • Éxito: La tarea tiene un cuadro verde a su alrededor. También puedes mantener el puntero sobre la tarea y ver si aparece el mensaje Estado: correcto. La tarea tiene un cuadro verde a su alrededor, lo que indica que se realizó correctamente.

Console

Haz clic en la pestaña Flujos de trabajo para ver el estado del flujo de trabajo.

Comando de gcloud

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

Realice una limpieza

Para evitar que se apliquen cargos a tu cuenta de Google Cloud, puedes borrar los recursos que usaste en este instructivo:

  1. Borra el entorno de Cloud Composer.

  2. Borra la plantilla de flujo de trabajo.

¿Qué sigue?