Fluxo de trabalho usando o Cloud Composer

Neste documento, você vai usar os seguintes componentes faturáveis do Google Cloud:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.

Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

Criar o projeto

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.

  6. Se você estiver usando um provedor de identidade externo (IdP), primeiro faça login na CLI gcloud com sua identidade federada.

  7. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  11. Install the Google Cloud CLI.

  12. Se você estiver usando um provedor de identidade externo (IdP), primeiro faça login na CLI gcloud com sua identidade federada.

  13. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  14. Criar um modelo de fluxo de trabalho do Dataproc

    Copie e execute os comandos a seguir em uma janela de terminal local ou no Cloud Shell para criar e definir um modelo de fluxo de trabalho.

    1. Crie o modelo de fluxo de trabalho sparkpi.
      gcloud dataproc workflow-templates create sparkpi \
          --region=us-central1
            
    2. Adicione o job do Spark ao modelo de fluxo de trabalho sparkpi. A sinalização step-id "compute" identifica o job do SparkPi.
      gcloud dataproc workflow-templates add-job spark \
          --workflow-template=sparkpi \
          --step-id=compute \
          --class=org.apache.spark.examples.SparkPi \
          --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
          --region=us-central1 \
          -- 1000
            
    3. Use um cluster gerenciado de nó único para executar o fluxo de trabalho. O Dataproc criará o cluster, executará o fluxo de trabalho nele e excluirá o cluster quando o fluxo de trabalho for concluído.
      gcloud dataproc workflow-templates set-managed-cluster sparkpi \
          --cluster-name=sparkpi \
          --single-node \
          --region=us-central1
            
    4. Confirme a criação do modelo de fluxo de trabalho.

      Console

      Clique no nome do sparkpi na página Fluxos de trabalho do Dataproc no console Google Cloud para abrir a página Detalhes do modelo de fluxo de trabalho. Clique no nome do seu modelo de fluxo de trabalho para confirmar os atributos do modelo sparkpi.

      Comando gcloud

      Execute este comando:

      gcloud dataproc workflow-templates describe sparkpi --region=us-central1
          

    Criar e fazer upload de um DAG para o Cloud Storage

    1. Crie ou use um ambiente atual do Cloud Composer.
    2. Defina variáveis de ambiente.

      IU do Airflow

      1. Na barra de ferramentas, clique em Administrador > Variáveis.
      2. Clique em Criar.
      3. Digite as seguintes informações:
        • Chave: project_id
        • Val: PROJECT_ID: ID do projeto Google Cloud
      4. Clique em Salvar.

      Comando gcloud

      Digite os seguintes comandos:

      • ENVIRONMENT é o nome do ambiente do Cloud Composer
      • LOCATION é a região em que o ambiente do Cloud Composer está localizado
      • PROJECT_ID é o ID do projeto que contém o ambiente do Cloud Composer.
          gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
          
    3. Copie o código do DAG a seguir localmente em um arquivo chamado "composer-dataproc-dag.py", que usa o DataprocInstantiateWorkflowTemplateOperator.

      Airflow 2

      
      """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
      Spark Pi Job.
      
      This DAG relies on an Airflow variable
      https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
      * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
      """
      
      import datetime
      
      from airflow import models
      from airflow.providers.google.cloud.operators.dataproc import (
          DataprocInstantiateWorkflowTemplateOperator,
      )
      from airflow.utils.dates import days_ago
      
      project_id = "{{var.value.project_id}}"
      
      
      default_args = {
          # Tell airflow to start one day ago, so that it runs as soon as you upload it
          "start_date": days_ago(1),
          "project_id": project_id,
      }
      
      # Define a DAG (directed acyclic graph) of tasks.
      # Any task you create within the context manager is automatically added to the
      # DAG object.
      with models.DAG(
          # The id you will see in the DAG airflow page
          "dataproc_workflow_dag",
          default_args=default_args,
          # The interval with which to schedule the DAG
          schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
      ) as dag:
          start_template_job = DataprocInstantiateWorkflowTemplateOperator(
              # The task id of your job
              task_id="dataproc_workflow_dag",
              # The template id of your workflow
              template_id="sparkpi",
              project_id=project_id,
              # The region for the template
              region="us-central1",
          )
      

      Airflow 1

      
      """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
      Spark Pi Job.
      
      This DAG relies on an Airflow variable
      https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
      * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
      """
      
      import datetime
      
      from airflow import models
      from airflow.contrib.operators import dataproc_operator
      from airflow.utils.dates import days_ago
      
      project_id = "{{var.value.project_id}}"
      
      
      default_args = {
          # Tell airflow to start one day ago, so that it runs as soon as you upload it
          "start_date": days_ago(1),
          "project_id": project_id,
      }
      
      # Define a DAG (directed acyclic graph) of tasks.
      # Any task you create within the context manager is automatically added to the
      # DAG object.
      with models.DAG(
          # The id you will see in the DAG airflow page
          "dataproc_workflow_dag",
          default_args=default_args,
          # The interval with which to schedule the DAG
          schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
      ) as dag:
          start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
              # The task id of your job
              task_id="dataproc_workflow_dag",
              # The template id of your workflow
              template_id="sparkpi",
              project_id=project_id,
              # The region for the template
              # For more info on regions where Dataflow is available see:
              # https://cloud.google.com/dataflow/docs/resources/locations
              region="us-central1",
          )
      
    4. Faça o upload do DAG para a pasta do ambiente no Cloud Storage. Depois que o upload for concluído, clique no link Pasta de DAGs na página do ambiente do Cloud Composer.

    Ver o status de uma tarefa

    IU do Airflow

    1. Abra a interface da Web do Airflow.
    2. Na página DAGs, clique no nome do DAG, por exemplo, dataproc_workflow_dag.
    3. Na página "Detalhes dos DAGs", clique em Visualizar gráfico.
    4. Verificar status:
      • Falha: a tarefa tem uma caixa vermelha ao redor. Você também pode manter o ponteiro do mouse sobre a tarefa e verificar se há a mensagem Estado: Falha. A tarefa tem uma caixa vermelha ao redor, indicando que ela falhou
      • Bem-sucedida: a tarefa tem uma caixa verde ao redor. Você também pode manter o ponteiro do mouse sobre a tarefa e verificar se há a mensagem Estado: sucesso. a tarefa tem uma caixa verde ao redor dela, indicando que foi bem-sucedida

    Console

    Clique na guia "Fluxos de trabalho" para ver o status do fluxo de trabalho.

    Comando gcloud

    gcloud dataproc operations list \
        --region=us-central1 \
        --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
        

    Limpar

    Para evitar cobranças na sua conta do Google Cloud , exclua os recursos usados neste tutorial:

    1. Exclua o ambiente do Cloud Composer.

    2. Exclua o modelo de fluxo de trabalho.

    A seguir