Flujo de trabajo con Cloud Composer

En este documento, se utilizan los siguientes componentes facturables de Google Cloud:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Para generar una estimación de costes basada en el uso previsto, utiliza la calculadora de precios.

Los usuarios nuevos Google Cloud pueden disfrutar de una prueba gratuita.

Antes de empezar

Configurar el proyecto

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. Install the Google Cloud CLI.

  6. Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

  7. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  11. Install the Google Cloud CLI.

  12. Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

  13. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  14. Crear una plantilla de flujo de trabajo de Dataproc

    Copia y ejecuta los siguientes comandos en una ventana de terminal local o en Cloud Shell para crear y definir una plantilla de flujo de trabajo.

    1. Crea la plantilla de flujo de trabajo sparkpi.
      gcloud dataproc workflow-templates create sparkpi \
          --region=us-central1
            
    2. Añade el trabajo de Spark a la sparkpi plantilla de flujo de trabajo. La marca "compute" step-id identifica la tarea SparkPi.
      gcloud dataproc workflow-templates add-job spark \
          --workflow-template=sparkpi \
          --step-id=compute \
          --class=org.apache.spark.examples.SparkPi \
          --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
          --region=us-central1 \
          -- 1000
            
    3. Usa un clúster gestionado de un solo nodo para ejecutar el flujo de trabajo. Dataproc creará el clúster, ejecutará el flujo de trabajo en él y, a continuación, eliminará el clúster cuando se complete el flujo de trabajo.
      gcloud dataproc workflow-templates set-managed-cluster sparkpi \
          --cluster-name=sparkpi \
          --single-node \
          --region=us-central1
            
    4. Confirma la creación de la plantilla de flujo de trabajo.

      Consola

      Haz clic en el nombre de sparkpi en la página Flujos de trabajo de Dataproc en la consola de Google Cloud para abrir la página Detalles de la plantilla de flujo de trabajo. Haz clic en el nombre de la plantilla de flujo de trabajo para confirmar los atributos de la plantilla sparkpi.

      Comando gcloud

      Ejecuta el siguiente comando:

      gcloud dataproc workflow-templates describe sparkpi --region=us-central1
          

    Crear y subir un DAG a Cloud Storage

    1. Crea o usa un entorno de Cloud Composer.
    2. Define las variables de entorno.

      Interfaz de usuario de Airflow

      1. En la barra de herramientas, haga clic en Administrar > Variables.
      2. Haz clic en Crear.
      3. Introduce la siguiente información:
        • Tecla:project_id
        • Valor: PROJECT_ID (tu ID de proyecto) Google Cloud
      4. Haz clic en Guardar.

      Comando gcloud

      Introduce los comandos siguientes:

      • ENVIRONMENT es el nombre del entorno de Cloud Composer.
      • LOCATION es la región en la que se encuentra el entorno de Cloud Composer.
      • PROJECT_ID es el ID del proyecto que contiene el entorno de Cloud Composer.
          gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
          
    3. Copia el siguiente código DAG de forma local en un archivo llamado "composer-dataproc-dag.py", que usa el DataprocInstantiateWorkflowTemplateOperator.

      Airflow 2

      
      """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
      Spark Pi Job.
      
      This DAG relies on an Airflow variable
      https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
      * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
      """
      
      import datetime
      
      from airflow import models
      from airflow.providers.google.cloud.operators.dataproc import (
          DataprocInstantiateWorkflowTemplateOperator,
      )
      from airflow.utils.dates import days_ago
      
      project_id = "{{var.value.project_id}}"
      
      
      default_args = {
          # Tell airflow to start one day ago, so that it runs as soon as you upload it
          "start_date": days_ago(1),
          "project_id": project_id,
      }
      
      # Define a DAG (directed acyclic graph) of tasks.
      # Any task you create within the context manager is automatically added to the
      # DAG object.
      with models.DAG(
          # The id you will see in the DAG airflow page
          "dataproc_workflow_dag",
          default_args=default_args,
          # The interval with which to schedule the DAG
          schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
      ) as dag:
          start_template_job = DataprocInstantiateWorkflowTemplateOperator(
              # The task id of your job
              task_id="dataproc_workflow_dag",
              # The template id of your workflow
              template_id="sparkpi",
              project_id=project_id,
              # The region for the template
              region="us-central1",
          )
      

      Airflow 1

      
      """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
      Spark Pi Job.
      
      This DAG relies on an Airflow variable
      https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
      * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
      """
      
      import datetime
      
      from airflow import models
      from airflow.contrib.operators import dataproc_operator
      from airflow.utils.dates import days_ago
      
      project_id = "{{var.value.project_id}}"
      
      
      default_args = {
          # Tell airflow to start one day ago, so that it runs as soon as you upload it
          "start_date": days_ago(1),
          "project_id": project_id,
      }
      
      # Define a DAG (directed acyclic graph) of tasks.
      # Any task you create within the context manager is automatically added to the
      # DAG object.
      with models.DAG(
          # The id you will see in the DAG airflow page
          "dataproc_workflow_dag",
          default_args=default_args,
          # The interval with which to schedule the DAG
          schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
      ) as dag:
          start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
              # The task id of your job
              task_id="dataproc_workflow_dag",
              # The template id of your workflow
              template_id="sparkpi",
              project_id=project_id,
              # The region for the template
              # For more info on regions where Dataflow is available see:
              # https://cloud.google.com/dataflow/docs/resources/locations
              region="us-central1",
          )
      
    4. Sube tu DAG a la carpeta de tu entorno en Cloud Storage. Una vez que se haya completado la subida correctamente, haga clic en el enlace Carpeta DAGs de la página Entorno de Cloud Composer.

    Ver el estado de una tarea

    Interfaz de usuario de Airflow

    1. Abre la interfaz web de Airflow.
    2. En la página DAGs, haz clic en el nombre del DAG (por ejemplo, dataproc_workflow_dag).
    3. En la página Detalles de los DAGs, haz clic en Vista de gráfico.
    4. Comprobar el estado:
      • Error: la tarea tiene un cuadro rojo alrededor. También puedes mantener el puntero sobre la tarea y buscar Estado: Fallido. La tarea tiene un recuadro rojo, lo que indica que ha fallado
      • Éxito: la tarea tiene un recuadro verde. También puedes mantener el puntero sobre la tarea y comprobar si aparece el mensaje Estado: Éxito. La tarea tiene un recuadro verde a su alrededor, lo que indica que se ha completado correctamente.

    Consola

    Haz clic en la pestaña Flujos de trabajo para ver el estado de los flujos de trabajo.

    Comando gcloud

    gcloud dataproc operations list \
        --region=us-central1 \
        --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
        

    Limpieza

    Para evitar que se apliquen cargos en tu cuenta, puedes eliminar los recursos utilizados en este tutorial: Google Cloud

    1. Elimina el entorno de Cloud Composer.

    2. Elimina la plantilla de flujo de trabajo.

    Siguientes pasos