Migre ambientes para o Airflow 2

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página explica como transferir DAGs, dados e configuração dos seus ambientes do Airflow 1.10.* existentes para ambientes com o Airflow 2 e versões posteriores do Airflow.

Outros guias de migração

De Para Método Guia
Cloud Composer 2 Cloud Composer 3 Paralelamente, usando o guião de migração Guia de migração de scripts
Cloud Composer 2 Cloud Composer 3 Lado a lado, usando instantâneos Guia de migração do Snapshots
Cloud Composer 1, Airflow 2 Cloud Composer 3 Lado a lado, usando instantâneos Guia de migração do Snapshots
Cloud Composer 1, Airflow 2 Cloud Composer 2 Lado a lado, usando instantâneos Guia de migração do Snapshots
Cloud Composer 1, Airflow 2 Cloud Composer 2 Transferência manual lado a lado Guia de migração manual
Cloud Composer 1, Airflow 1 Cloud Composer 2, Airflow 2 Lado a lado, usando instantâneos Guia de migração do Snapshots
Cloud Composer 1, Airflow 1 Cloud Composer 2, Airflow 2 Transferência manual lado a lado Guia de migração manual
Cloud Composer 1, Airflow 1 Cloud Composer 1, Airflow 2 Transferência manual lado a lado Este guia

Atualizações paralelas

O Cloud Composer fornece o script de transferência da base de dados do Cloud Composer para migrar a base de dados de metadados, DAGs, dados e plug-ins de ambientes do Cloud Composer com o Airflow 1.10.14 e o Airflow 1.10.15 para ambientes do Cloud Composer existentes com o Airflow 2.0.1 e versões posteriores do Airflow.

Este é um caminho alternativo ao descrito neste guia. Algumas partes deste guia continuam a aplicar-se quando usa o script fornecido. Por exemplo, pode querer verificar a compatibilidade dos seus DAGs com o Airflow 2 antes de os migrar ou certificar-se de que não ocorrem execuções de DAGs em simultâneo e que não existem execuções de DAGs adicionais ou em falta.

Antes de começar

Antes de começar a usar ambientes do Cloud Composer com o Airflow 2, considere as alterações que o Airflow 2 traz aos ambientes do Cloud Composer.

Vários agendadores

Pode usar mais do que um programador do Airflow no seu ambiente. Pode definir o número de programadores quando criar um ambiente ou atualizar um ambiente existente.

Celery+Kubernetes Executor

O Celery+Kubernetes Executor do Airflow 2 é compatível no Cloud Composer 3.

Alterações interruptivas

O Airflow 2 introduz muitas alterações importantes, algumas das quais são disruptivas:

Diferenças entre ambientes com o Airflow 2 e o Airflow 1.10.*

Principais diferenças entre os ambientes do Cloud Composer com o Airflow 1.10.* e os ambientes com o Airflow 2:

  • Os ambientes com o Airflow 2 usam o Python 3.8. Esta é uma versão mais recente do que a usada em ambientes do Airflow 1.10.*. O Python 2, o Python 3.6 e o Python 3.7 não são suportados.
  • O Airflow 2 usa um formato de CLI diferente. O Cloud Composer suporta o novo formato em ambientes com o Airflow 2 através do comando gcloud composer environments run.
  • Os pacotes PyPI pré-instalados são diferentes nos ambientes do Airflow 2. Para uma lista de pacotes PyPI pré-instalados, consulte a lista de versões do Cloud Composer.
  • A serialização de DAGs está sempre ativada no Airflow 2. Como resultado, o carregamento assíncrono de DAGs já não é necessário e não é suportado no Airflow 2. Como resultado, a configuração dos parâmetros [core]store_serialized_dags e [core]store_dag_code não é suportada para o Airflow 2, e as tentativas de os definir são comunicadas como erros.
  • Os plug-ins do servidor Web do Airflow não são suportados. Isto não afeta os plug-ins do programador nem do trabalhador, incluindo os operadores e os sensores do Airflow.
  • Nos ambientes do Airflow 2, a função de utilizador do Airflow predefinida é Op. Para ambientes com o Airflow 1.10.*, a função predefinida é Admin.

Passo 1: verifique a compatibilidade com o Airflow 2

Para verificar potenciais conflitos com o Airflow 2, consulte o guia de atualização para o Airflow 2.0+, na secção sobre a atualização de DAGs.

Um problema comum que pode encontrar está relacionado com caminhos de importação incompatíveis. Para mais informações sobre como resolver este problema de compatibilidade, no guia Atualizar para o Airflow 2.0 ou superior, consulte a secção sobre fornecedores de backport.

Passo 2: crie um ambiente do Airflow 2, transfira substituições de configuração e variáveis de ambiente

Crie um ambiente do Airflow 2 e transfira substituições de configuração e variáveis de ambiente:

  1. Siga os passos para criar um ambiente. Antes de criar um ambiente, especifique também substituições de configuração e variáveis de ambiente, conforme explicado mais adiante.

  2. Quando selecionar uma imagem, escolha uma imagem com Airflow 2.

  3. Transfira manualmente os parâmetros de configuração do seu ambiente do Airflow 1.10.* para o novo ambiente do Airflow 2.

    Consola

    1. Quando cria um ambiente, expanda a secção Rede, substituições da configuração do Airflow e funcionalidades adicionais.

    2. Em Substituições da configuração do Airflow, clique em Adicionar substituição da configuração do Airflow.

    3. Copie todas as substituições de configuração do seu ambiente do Airflow 1.10.* .

      Algumas opções de configuração usam um nome e uma secção diferentes no Airflow 2. Para mais informações, consulte o artigo Alterações de configuração.

    4. Em Variáveis de ambiente, clique em Adicionar variável de ambiente

    5. Copie todas as variáveis de ambiente do seu ambiente do Airflow 1.10.*.

    6. Clique em Criar para criar um ambiente.

Passo 3: instale pacotes PyPI no ambiente do Airflow 2

Depois de criar o ambiente do Airflow 2, instale-lhe pacotes PyPI:

Consola

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Selecione o seu ambiente do Airflow 2.

  3. Aceda ao separador Pacotes PyPI e clique em Editar.

  4. Copie os requisitos do pacote PyPI do seu ambiente do Airflow 1.10.*. Clique em Guardar e aguarde até que o ambiente seja atualizado.

    Uma vez que os ambientes do Airflow 2 usam um conjunto diferente de pacotes pré-instalados e uma versão diferente do Python, pode encontrar conflitos de pacotes do PyPI difíceis de resolver.

Passo 4: transfira variáveis e pools para o Airflow 2

O Airflow 1.10.* suporta a exportação de variáveis e pools para ficheiros JSON. Em seguida, pode importar estes ficheiros para o seu ambiente do Airflow 2.

Só precisa de transferir conjuntos se tiver conjuntos personalizados que não sejam default_pool. Caso contrário, ignore os comandos que exportam e importam conjuntos.

gcloud

  1. Exporte variáveis do seu ambiente do Airflow 1.10.*:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         variables -- -e /home/airflow/gcs/data/variables.json
    

    Substituir:

    • AIRFLOW_1_ENV com o nome do seu ambiente do Airflow 1.10.*.
    • AIRFLOW_1_LOCATION com a região onde o ambiente está localizado.
  2. Exporte pools do seu ambiente do Airflow 1.10.*:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         pool -- -e /home/airflow/gcs/data/pools.json
    
  3. Obtenha o URI do contentor do ambiente do Airflow 2.

    1. Execute o seguinte comando:

      gcloud composer environments describe AIRFLOW_2_ENV \
          --location AIRFLOW_2_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      Substituir:

      • AIRFLOW_2_ENV com o nome do seu ambiente do Airflow 2.
      • AIRFLOW_2_LOCATION com a região onde o ambiente está localizado.
    2. Na saída, remova a pasta /dags. O resultado é o URI do contentor do ambiente do Airflow 2.

      Por exemplo, altere gs://us-central1-example-916807e1-bucket/dags para gs://us-central1-example-916807e1-bucket.

  4. Transfira ficheiros JSON com variáveis e pools para o seu ambiente do Airflow 2:

    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=variables.json
    
    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=pools.json
    

    Substitua AIRFLOW_2_BUCKET pelo URI do seu bucket do ambiente do Airflow 2, obtido no passo anterior.

  5. Importe variáveis e pools para o Airflow 2:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables import \
        -- /home/airflow/gcs/data/variables.json
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools import \
        -- /home/airflow/gcs/data/pools.json
    
  6. Verifique se as variáveis e os conjuntos foram importados:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables list
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools list
    
  7. Remova os ficheiros JSON dos contentores:

    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    

Passo 5: transfira outros dados do seu contentor do ambiente Airflow 1.10.*

gcloud

  1. Transfira plug-ins para o seu ambiente do Airflow 2. Para o fazer, exporte os plug-ins do contentor do ambiente do Airflow 1.10.* para a pasta /plugins no contentor do ambiente do Airflow 2:

    gcloud composer environments storage plugins export \
        --destination=AIRFLOW_2_BUCKET/plugins \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
  2. Verifique se a pasta /plugins foi importada com êxito:

    gcloud composer environments storage plugins list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
  3. Exporte a pasta /data do seu ambiente do Airflow 1.10.* para o ambiente do Airflow 2:

        gcloud composer environments storage data export \
            --destination=AIRFLOW_2_BUCKET/data \
            --environment=AIRFLOW_1_ENV \
            --location=AIRFLOW_1_LOCATION
    
  4. Verifique se a pasta /data foi importada com êxito:

    gcloud composer environments storage data list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    

Passo 6: transfira ligações e utilizadores

O Airflow 1.10.* não suporta a exportação de utilizadores nem de ligações. Para transferir utilizadores e associações, crie manualmente novas contas de utilizador e associações no seu ambiente do Airflow 2.

gcloud

  1. Para obter uma lista de ligações no seu ambiente do Airflow 1.10.*, execute o seguinte comando:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         connections -- --list
    
  2. Para criar uma nova associação no seu ambiente do Airflow 2, execute o connectionscomando da CLI do Airflow através do gcloud. Por exemplo:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        connections add \
        -- --conn-host postgres.example.com \
        --conn-port 5432 \
        --conn-type postgres \
        --conn-login example_user \
        --conn-password example_password \
        --conn-description "Example connection" \
        example_connection
    
  3. Para ver uma lista de utilizadores no seu ambiente do Airflow 1.10.*:

    1. Abra a interface Web do Airflow para o seu ambiente do Airflow 1.10.*.

    2. Aceda a Administração > Utilizadores.

  4. Para criar uma nova conta de utilizador no seu ambiente do Airflow 2, execute o users create comando da CLI do Airflow através do gcloud. Por exemplo:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        users create \
        -- --username example_username \
        --firstname Example-Name \
        --lastname Example-Surname \
        --email example-user@example.com \
        --use-random-password \
        --role Admin
    

Passo 7: certifique-se de que os seus DAGs estão prontos para o Airflow 2

Antes de transferir DAGs para o seu ambiente do Airflow 2, certifique-se de que:

  1. Os DAGs são executados com êxito e não existem problemas de compatibilidade restantes.

  2. Os seus DAGs usam declarações de importação corretas.

    Por exemplo, a nova declaração de importação para BigQueryCreateDataTransferOperator pode ter este aspeto:

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. Os seus DAGs são atualizados para o Airflow 2. Esta alteração é compatível com o Airflow 1.10.14 e versões posteriores.

Passo 8: transfira os DAGs para o ambiente do Airflow 2

Os seguintes problemas potenciais podem ocorrer quando transfere DAGs entre ambientes:

  • Se um DAG estiver ativado (não pausado) em ambos os ambientes, cada ambiente executa a sua própria cópia do DAG, conforme agendado. Isto pode levar a execuções de DAG simultâneas para os mesmos dados e hora de execução.

  • Devido à sincronização do DAG, o Airflow agenda execuções de DAG adicionais, a partir da data de início especificada nos seus DAGs. Isto acontece porque a nova instância do Airflow não tem em conta o histórico de execuções de DAGs do ambiente 1.10.*. Isto pode levar a um grande número de execuções de DAG agendadas a partir da data de início especificada.

Impeça execuções de DAGs simultâneas

No seu ambiente do Airflow 2, substitua a opção de configuração do dags_are_paused_at_creation Airflow. Depois de fazer esta alteração, todos os novos DAGs são pausados por predefinição.

Secção Chave Valor
core dags_are_paused_at_creation True

Evite execuções de DAG adicionais ou em falta

Especifique uma nova data de início estática nos DAGs que transfere para o seu ambiente do Airflow 2.

Para evitar lacunas e sobreposições nas datas de execução, a primeira execução do DAG deve ocorrer no ambiente do Airflow 2 na próxima ocorrência do intervalo de agendamento. Para o fazer, defina a nova data de início no seu DAG para que seja anterior à data da última execução no ambiente do Airflow 1.10.*.

Por exemplo, se o seu DAG for executado às 15:00, 17:00 e 21:00 todos os dias no ambiente do Airflow 1.10.*, a última execução do DAG ocorreu às 15:00 e planeia transferir o DAG às 15:15. Nesse caso, a data de início do ambiente do Airflow 2 pode ser hoje às 14:45. Depois de ativar o DAG no ambiente do Airflow 2, o Airflow agenda uma execução do DAG para as 17:00.

Como outro exemplo, se o seu DAG for executado às 00:00 todos os dias no ambiente do Airflow 1.10.*, a última execução do DAG ocorreu às 00:00 de 26 de abril de 2021 e planeia transferir o DAG às 13:00 de 26 de abril de 2021, a data de início do ambiente do Airflow 2 pode ser às 23:45 de 25 de abril de 2021. Depois de ativar o DAG no ambiente do Airflow 2, o Airflow agenda uma execução do DAG para as 00:00 do dia 27 de abril de 2021.

Transfira os seus DAGs um a um para o ambiente do Airflow 2

Para cada DAG, siga este procedimento para o transferir:

  1. Certifique-se de que a nova data de início no DAG está definida conforme descrito na secção anterior.

  2. Carregue o DAG atualizado para o ambiente do Airflow 2. Este DAG está em pausa no ambiente do Airflow 2 devido à substituição da configuração, pelo que ainda não estão agendadas execuções de DAG.

  3. Na interface Web do Airflow, aceda a DAGs e verifique se existem erros de sintaxe de DAG comunicados.

  4. No momento em que planeia transferir o DAG:

    1. Pause o DAG no seu ambiente do Airflow 1.10.*.

    2. Despause o DAG no seu ambiente do Airflow 2.

    3. Verifique se a nova execução do DAG está agendada para a hora correta.

    4. Aguarde a execução do DAG no ambiente do Airflow 2 e verifique se a execução foi bem-sucedida.

  5. Consoante a execução do DAG seja bem-sucedida ou não:

    • Se a execução do DAG for bem-sucedida, pode continuar e usar o DAG a partir do seu ambiente do Airflow 2. Eventualmente, considere eliminar a versão 1.10.* do DAG do Airflow.

    • Se a execução do DAG falhou, tente resolver os problemas do DAG até que seja executado com êxito no Airflow 2.

      Se necessário, pode sempre voltar à versão 1.10.* do DAG do Airflow:

      1. Pause o DAG no seu ambiente do Airflow 2.

      2. Despause o DAG no seu ambiente do Airflow 1.10.*. Isto agenda uma nova execução do DAG para a mesma data e hora da execução do DAG com falha.

      3. Quando estiver tudo pronto para continuar com a versão 2 do Airflow do DAG, ajuste a data de início, carregue a nova versão do DAG para o seu ambiente do Airflow 2 e repita o procedimento.

Passo 9: monitorize o seu ambiente do Airflow 2

Depois de transferir todos os DAGs e a configuração para o ambiente do Airflow 2, monitorize-o para detetar potenciais problemas, execuções de DAGs com falhas e o estado geral do ambiente. Se o ambiente do Airflow 2 for executado sem problemas durante um período suficiente, pode remover o ambiente do Airflow 1.10.*.

O que se segue?