Fluxo de trabalho usando o Cloud Composer

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

Criar o projeto

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Make sure that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init

Criar um modelo de fluxo de trabalho do Dataproc

Copie e execute os comandos listados abaixo em uma janela de terminal local ou no Cloud Shell para criar e definir um modelo de fluxo de trabalho.

  1. Crie o modelo de fluxo de trabalho sparkpi.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Adicione o job do Spark ao modelo de fluxo de trabalho sparkpi. A sinalização step-id "compute" identifica o job do SparkPi.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. Use um cluster gerenciado de nó único para executar o fluxo de trabalho. O Dataproc criará o cluster, executará o fluxo de trabalho nele e excluirá o cluster quando o fluxo de trabalho for concluído.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. Confirme a criação do modelo de fluxo de trabalho.

    Console

    Clique no nome do sparkpi na página Fluxos de trabalho Dataproc no console do Google Cloud para abrir a página Detalhes do modelo de fluxo de trabalho. Clique no nome do seu modelo de fluxo de trabalho para confirmar os atributos do modelo sparkpi.

    Comando gcloud

    Execute este comando:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

Criar e fazer upload de um DAG para o Cloud Storage

  1. Crie ou use um ambiente atual do Cloud Composer.
  2. Defina variáveis de ambiente.

    IU do Airflow

    1. Na barra de ferramentas, clique em Administrador > Variáveis.
    2. Clique em Criar.
    3. Digite as seguintes informações:
      • Chave: project_id
      • Val: PROJECT_ID: ID do projeto do Google Cloud
    4. Clique em Save.

    Comando gcloud

    Digite os seguintes comandos:

    • ENVIRONMENT é o nome do ambiente do Cloud Composer
    • LOCATION é a região em que o ambiente do Cloud Composer está localizado
    • PROJECT_ID é o ID do projeto que contém o ambiente do Cloud Composer
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. Copie o código do DAG a seguir localmente em um arquivo chamado "composer-dataproc-dag.py", que usa o DataprocInstantiateWorkflowTemplateOperator.

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Airflow 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Faça o upload do DAG para a pasta do ambiente no Cloud Storage. Depois que o upload for concluído, clique no link Pasta de DAGs na página do ambiente do Cloud Composer.

Como visualizar o status de uma tarefa

IU do Airflow

  1. Abra a interface da Web do Airflow.
  2. Na página DAGs, clique no nome do DAG, por exemplo, dataproc_workflow_dag.
  3. Na página "Detalhes dos DAGs", clique em Visualizar gráfico.
  4. Verificar status:
    • Falha: a tarefa tem uma caixa vermelha ao redor. Você também pode manter o ponteiro do mouse sobre a tarefa e verificar se há a mensagem Estado: Falha. A tarefa tem uma caixa vermelha ao redor, indicando que ela falhou
    • Bem-sucedida: a tarefa tem uma caixa verde ao redor. Você também pode manter o ponteiro do mouse sobre a tarefa e verificar se há a mensagem Estado: sucesso. a tarefa tem uma caixa verde ao redor dela, indicando que foi bem-sucedida

Console

Clique na guia "Fluxos de trabalho" para ver o status do fluxo de trabalho.

Comando gcloud

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

Limpeza

Para evitar cobranças na sua conta do Google Cloud, exclua os recursos usados neste tutorial:

  1. Exclua o ambiente do Cloud Composer.

  2. Exclua os modelos de fluxo de trabalho.

A seguir