Fluxo de trabalho usando o Cloud Composer

Neste tutorial, usamos os seguintes componentes faturáveis do Google Cloud:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

Criar o projeto

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  3. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

  4. Ative as APIs Dataproc, Compute Engine, and Cloud Composer .

    Ative as APIs

  5. Instale e inicialize o SDK do Cloud..
  6. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  7. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

  8. Ative as APIs Dataproc, Compute Engine, and Cloud Composer .

    Ative as APIs

  9. Instale e inicialize o SDK do Cloud..

Criar um modelo de fluxo de trabalho do Dataproc

Copie e execute os comandos listados abaixo em uma janela de terminal local ou no Cloud Shell para criar e definir um modelo de fluxo de trabalho.

  1. Crie o modelo de fluxo de trabalho sparkpi.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Adicione o job do Spark ao modelo de fluxo de trabalho sparkpi. A sinalização step-id "compute" identifica o job do SparkPi.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. Use um cluster gerenciado de nó único para executar o fluxo de trabalho. O Dataproc criará o cluster, executará o fluxo de trabalho nele e excluirá o cluster quando o fluxo de trabalho for concluído.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. Confirme a criação do modelo de fluxo de trabalho.

    Console

    Clique no nome do sparkpi na página Fluxos de trabalho do Dataproc no Console do Cloud para abrir a página Detalhes do modelo de fluxo de trabalho. Clique no nome do seu modelo de fluxo de trabalho para confirmar os atributos do modelo sparkpi.

    Comando gcloud

    Execute este comando:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

Criar e fazer upload de um DAG para o Cloud Storage

  1. Crie ou use um ambiente atual do Cloud Composer.
  2. Defina variáveis de ambiente.

    IU do Airflow

    1. Na barra de ferramentas, clique em Administrador > Variáveis.
    2. Clique em Criar.
    3. Digite as seguintes informações:
      • Chave: project_id
      • Val: PROJECT_ID: ID do projeto do Google Cloud
    4. Clique em Save.

    Comando gcloud

    Digite os seguintes comandos:

    • ENVIRONMENT é o nome do ambiente do Cloud Composer
    • LOCATION é a região em que o ambiente do Cloud Composer está localizado
    gcloud composer environments run ENVIRONMENT \
        --location LOCATION
        
  3. Copie localmente o seguinte código DAG para um arquivo chamado "composer-dataproc-dag.py", que usa o DataprocInstantiateWorkflowTemplateOperator.

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import DataprocInstantiateWorkflowTemplateOperator
    from airflow.utils.dates import days_ago
    
    project_id = models.Variable.get("project_id")
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
    
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Airflow 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = models.Variable.get("project_id")
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
    
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Faça o upload do DAG para a pasta do ambiente no Cloud Storage. Depois que o upload for concluído, clique no link Pasta de DAGs na página do ambiente do Cloud Composer.

Como visualizar o status de uma tarefa

IU do Airflow

  1. Abra a interface da Web do Airflow.
  2. Na página DAGs, clique no nome do DAG, por exemplo, dataproc_workflow_dag.
  3. Na página "Detalhes dos DAGs", clique em Visualizar gráfico.
  4. Verificar status:
    • Falha: a tarefa tem uma caixa vermelha ao redor. Você também pode manter o ponteiro do mouse sobre a tarefa e verificar se há a mensagem Estado: Falha. A tarefa tem uma caixa vermelha ao redor, indicando que ela falhou
    • Bem-sucedida: a tarefa tem uma caixa verde ao redor. Você também pode manter o ponteiro do mouse sobre a tarefa e verificar se há a mensagem Estado: sucesso. a tarefa tem uma caixa verde ao redor dela, indicando que foi bem-sucedida

Console

Clique na guia "Fluxos de trabalho" para ver o status do fluxo de trabalho.

Comando gcloud

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

Limpeza

Para evitar cobranças na sua conta do Google Cloud, exclua os recursos usados neste tutorial:

  1. Exclua o ambiente do Cloud Composer.

  2. Exclua os modelos de fluxo de trabalho.

A seguir