Fluxo de trabalho com o Cloud Composer

Neste documento, usa os seguintes componentes faturáveis do Google Cloud:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Para gerar uma estimativa de custos com base na sua utilização projetada, use a calculadora de preços.

Os novos Google Cloud utilizadores podem ser elegíveis para uma avaliação gratuita.

Antes de começar

Configure o seu projeto

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. Install the Google Cloud CLI.

  6. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  7. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  11. Install the Google Cloud CLI.

  12. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  13. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  14. Crie um modelo de fluxo de trabalho do Dataproc

    Copie e execute os seguintes comandos numa janela de terminal local ou no Cloud Shell para criar e definir um modelo de fluxo de trabalho.

    1. Crie o modelo de fluxo de trabalho sparkpi.
      gcloud dataproc workflow-templates create sparkpi \
          --region=us-central1
            
    2. Adicione a tarefa Spark ao modelo de fluxo de trabalho sparkpi. O indicador "compute" step-id identifica a tarefa SparkPi.
      gcloud dataproc workflow-templates add-job spark \
          --workflow-template=sparkpi \
          --step-id=compute \
          --class=org.apache.spark.examples.SparkPi \
          --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
          --region=us-central1 \
          -- 1000
            
    3. Use um cluster gerido, de nó único para executar o fluxo de trabalho. O Dataproc cria o cluster, executa o fluxo de trabalho no mesmo e, em seguida, elimina o cluster quando o fluxo de trabalho estiver concluído.
      gcloud dataproc workflow-templates set-managed-cluster sparkpi \
          --cluster-name=sparkpi \
          --single-node \
          --region=us-central1
            
    4. Confirme a criação do modelo de fluxo de trabalho.

      Consola

      Clique no nome do sparkpi na página Fluxos de trabalho do Dataproc na Google Cloud consola para abrir a página Detalhes do modelo de fluxo de trabalho. Clique no nome do modelo do fluxo de trabalho para confirmar os atributos do modelo sparkpi.

      comando gcloud

      Execute o seguinte comando:

      gcloud dataproc workflow-templates describe sparkpi --region=us-central1
          

    Crie e carregue um DAG para o Cloud Storage

    1. Crie ou use um ambiente do Cloud Composer existente.
    2. Defina variáveis de ambiente.

      IU do Airflow

      1. Na barra de ferramentas, clique em Admin > Variáveis.
      2. Clique em Criar.
      3. Introduza as seguintes informações:
        • Tecla:project_id
        • Val: PROJECT_ID – o seu Google Cloud ID do projeto
      4. Clique em Guardar.

      comando gcloud

      Introduza os seguintes comandos:

      • ENVIRONMENT é o nome do ambiente do Cloud Composer
      • LOCATION é a região onde o ambiente do Cloud Composer está localizado
      • PROJECT_ID é o ID do projeto que contém o ambiente do Cloud Composer
          gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
          
    3. Copie o seguinte código DAG localmente para um ficheiro com o título "composer-dataproc-dag.py", que usa o DataprocInstantiateWorkflowTemplateOperator.

      Airflow 2

      
      """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
      Spark Pi Job.
      
      This DAG relies on an Airflow variable
      https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
      * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
      """
      
      import datetime
      
      from airflow import models
      from airflow.providers.google.cloud.operators.dataproc import (
          DataprocInstantiateWorkflowTemplateOperator,
      )
      from airflow.utils.dates import days_ago
      
      project_id = "{{var.value.project_id}}"
      
      
      default_args = {
          # Tell airflow to start one day ago, so that it runs as soon as you upload it
          "start_date": days_ago(1),
          "project_id": project_id,
      }
      
      # Define a DAG (directed acyclic graph) of tasks.
      # Any task you create within the context manager is automatically added to the
      # DAG object.
      with models.DAG(
          # The id you will see in the DAG airflow page
          "dataproc_workflow_dag",
          default_args=default_args,
          # The interval with which to schedule the DAG
          schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
      ) as dag:
          start_template_job = DataprocInstantiateWorkflowTemplateOperator(
              # The task id of your job
              task_id="dataproc_workflow_dag",
              # The template id of your workflow
              template_id="sparkpi",
              project_id=project_id,
              # The region for the template
              region="us-central1",
          )
      

      Fluxo de ar 1

      
      """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
      Spark Pi Job.
      
      This DAG relies on an Airflow variable
      https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
      * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
      """
      
      import datetime
      
      from airflow import models
      from airflow.contrib.operators import dataproc_operator
      from airflow.utils.dates import days_ago
      
      project_id = "{{var.value.project_id}}"
      
      
      default_args = {
          # Tell airflow to start one day ago, so that it runs as soon as you upload it
          "start_date": days_ago(1),
          "project_id": project_id,
      }
      
      # Define a DAG (directed acyclic graph) of tasks.
      # Any task you create within the context manager is automatically added to the
      # DAG object.
      with models.DAG(
          # The id you will see in the DAG airflow page
          "dataproc_workflow_dag",
          default_args=default_args,
          # The interval with which to schedule the DAG
          schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
      ) as dag:
          start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
              # The task id of your job
              task_id="dataproc_workflow_dag",
              # The template id of your workflow
              template_id="sparkpi",
              project_id=project_id,
              # The region for the template
              # For more info on regions where Dataflow is available see:
              # https://cloud.google.com/dataflow/docs/resources/locations
              region="us-central1",
          )
      
    4. Carregue o seu DAG para a pasta do ambiente no Cloud Storage. Depois de o carregamento ser concluído com êxito, clique no link Pasta DAGs na página do ambiente do Cloud Composer.

    Veja o estado de uma tarefa

    IU do Airflow

    1. Abra a interface Web do Airflow.
    2. Na página DAGs, clique no nome do DAG (por exemplo, dataproc_workflow_dag).
    3. Na página de detalhes dos DAGs, clique em Vista de gráfico.
    4. Verifique o estado:
      • Falha: a tarefa tem uma caixa vermelha à volta. Também pode manter o ponteiro sobre a tarefa e procurar Estado: falhou. A tarefa tem uma caixa vermelha à volta, o que indica que falhou
      • Êxito: a tarefa tem uma caixa verde à volta. Também pode passar o ponteiro sobre a tarefa e verificar se o Estado: Concluído. A tarefa tem uma caixa verde à volta, o que indica que foi bem-sucedida

    Consola

    Clique no separador Fluxos de trabalho para ver o estado do fluxo de trabalho.

    comando gcloud

    gcloud dataproc operations list \
        --region=us-central1 \
        --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
        

    Limpar

    Para evitar incorrer em custos na sua conta Google Cloud , pode eliminar os recursos usados neste tutorial:

    1. Elimine o ambiente do Cloud Composer.

    2. Elimine o modelo de fluxo de trabalho.

    O que se segue?