Migrar ambientes para o Airflow 2

Nesta página, explicamos como transferir DAGs, dados e configurações dos ambientes atuais do Airflow 1.10.* para ambientes com o Airflow 2 e versões posteriores.

Upgrades lado a lado

O Cloud Composer fornece o script de transferência do banco de dados do Cloud Composer para migrar o banco de dados de metadados, os DAGs, dados e plug-ins dos ambientes do Cloud Composer com o Airflow 1.10.14 e o Airflow 1.10.15. para ambientes atuais do Cloud Composer com o Airflow 2.0.1 e versões posteriores.

Esse é um caminho alternativo para o descrito neste guia. Algumas partes deste guia ainda se aplicam ao usar o script fornecido. Por exemplo, talvez você queiraverificar a compatibilidade dos seus DAGs com o Airflow 2 antes de migrá-los ou garantir queexecuções simultâneas de DAGs não acontecem e não háextra ou ausente Execuções de DAG.

Antes de começar

Antes de começar a usar os ambientes do Cloud Composer com o Airflow 2, pense nas alterações que ele apresenta nos ambientes.

Programador de alta disponibilidade

É possível usar mais de um programador do Airflow no ambiente. É possível definir o número de programadores ao criar um ambiente ou atualizando um ambiente atual.

Executor do Kubernetes + Celery

O Celery+Kubernetes Executor do Airflow 2 não é compatível com a versão atual do Cloud Composer.

Alterações importantes

O Airflow 2 introduz muitas alterações importantes, sendo que algumas estão causando falhas:

Diferenças entre ambientes com o Airflow 2 e o Airflow 1.10.*

Principais diferenças entre os ambientes do Cloud Composer com o Airflow 1.10.* e os ambientes com o Airflow 2:

  • Ambientes com o Airflow 2 usam Python 3.8 (em inglês). Essa é uma versão mais recente do que a usada em ambientes do Airflow 1.10.*. Python 2, Python 3.6, Python 3.7 não são compatíveis.
  • O Airflow 2 usa um formato diferente de CLI. O Cloud Composer é compatível com o novo formato em ambientes com o Airflow 2 por meio do comando gcloud composer environments run.
  • Os pacotes PyPI pré-instalados são diferentes nos ambientes do Airflow 2. Para uma lista dos pacotes PyPI pré-instalados, consulte a Lista de versões do Cloud Composer.
  • A serialização de DAG está sempre ativada no Airflow 2. Como resultado, o carregamento assíncrono de DAGs não é mais necessário e não é compatível com o Airflow 2. Por isso, não é possível configurar os parâmetros [core]store_serialized_dags e [core]store_dag_code no Airflow 2, e as tentativas de defini-los serão informadas como erros.
  • Os plug-ins do servidor da Web do Airflow não são compatíveis. Isso não afeta os plug-ins do programador ou do worker, incluindo os operadores e sensores do Airflow.
  • Nos ambientes do Airflow 2, o papel do usuário do RBAC padrão é Op. Para ambientes com o Airflow 1.10.*, o papel padrão é Admin.

Etapa 1: verificar a compatibilidade com o Airflow 2

Para verificar possíveis conflitos com o Airflow 2, use os scripts de verificação de upgrade fornecidos pelo Airflow no ambiente atual do Airflow 1.10.*.

gcloud

  1. Se o ambiente usa o Airflow 1.10.14 e versões anteriores, faça upgrade do ambiente para uma versão do Cloud Composer que use o Airflow 1.10.15 e posterior. O Cloud Composer é compatível com comandos de verificação de upgrade a partir do Airflow 1.10.15.

  2. Execute verificações de upgrade por meio do comando gcloud composer environments run. Algumas verificações de upgrade relevantes para o Airflow autônomo 1.10.15 não são relevantes para o Cloud Composer. O comando a seguir exclui essas verificações.

    gcloud composer environments run \
        AIRFLOW_1_ENV  \
        --location=AIRFLOW_1_LOCATION \
        upgrade_check \
        -- --ignore VersionCheckRule --ignore LoggingConfigurationRule \
        --ignore PodTemplateFileRule --ignore SendGridEmailerMovedRule
    

    Substitua:

    • AIRFLOW_1_ENV pelo nome do ambiente do Airflow 1.10.*.
    • AIRFLOW_1_LOCATION pela região do Compute Engine em que o ambiente está localizado.
  3. Verifique a saída do comando. Os scripts de verificação de atualização informam possíveis problemas de compatibilidade em ambientes atuais.

  4. Implemente outras alterações nos DAGs, conforme descrito no guia "Como fazer upgrade para o Airflow 2.0 e versões posteriores", na seção sobre como fazer upgrade dos DAGs.

Etapa 2: criar um ambiente do Airflow 2, transferir substituições de configuração e variáveis de ambiente

Crie um ambiente do Airflow 2 e transfira as modificações de configuração e variáveis de ambiente:

  1. Siga as etapas para criar um ambiente. Antes de criar um ambiente, especifique também as modificações de configuração e as variáveis de ambiente, conforme explicado em mais detalhes.

  2. Ao selecionar uma imagem, escolha uma imagem com o Airflow 2.

  3. Transfira manualmente os parâmetros de configuração do seu ambiente do Airflow 1.10.* para o novo ambiente.

    Console

    1. Ao criar um ambiente, expanda a seção Redes, modificações de configuração do Airflow e outros recursos.

    2. Em Modificações de configuração do Airflow, clique em Adicionar modificação da configuração do Airflow.

    3. Copie todas as modificações de configuração do ambiente do Airflow 1.10.*.

      Algumas opções de configuração usam um nome e uma seção diferentes no Airflow 2. Para mais informações, consulte Alterações de configuração.

    4. Em Variáveis de ambiente, clique em Adicionar variável de ambiente.

    5. Copie todas as variáveis de ambiente do ambiente do Airflow 1.10.*.

    6. Clique em Criar para criar um ambiente.

Etapa 3: instalar pacotes PyPI para o ambiente do Airflow 2

Depois que o ambiente do Airflow 2 for criado, instale os pacotes PyPI nele:

Console

  1. No Console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Selecione o ambiente do Airflow 2.

  3. Acesse a guia Pacotes PyPI e clique em Editar.

  4. Copie os requisitos do pacote PyPI do ambiente do Airflow 1.10.*. Clique em Save e aguarde até que o ambiente seja atualizado.

    Como os ambientes do Airflow 2 usam um conjunto diferente de pacotes pré-instalados e uma versão diferente do Python, talvez você encontre conflitos de pacote PyPI que são difíceis de resolver. Uma maneira de diagnosticar problemas de dependência de pacotes é verificar se há erros de pacote do PyPI instalando pacotes em um pod de worker do Airflow.

Etapa 4: transferir variáveis e pools para o Airflow 2

O Airflow 1.10.* é compatível com a exportação de variáveis e pools para arquivos JSON. Em seguida, importe esses arquivos para seu ambiente do Airflow 2.

Você só precisa transferir pools se tiver pools personalizados diferentes de default_pool. Caso contrário, ignore os comandos que exportam e importam pools.

gcloud

  1. Exporte variáveis do ambiente do Airflow 1.10.*:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         variables -- -e /home/airflow/gcs/data/variables.json
    

    Substitua:

    • AIRFLOW_1_ENV pelo nome do ambiente do Airflow 1.10.*.
    • AIRFLOW_1_LOCATION pela região do Compute Engine em que o ambiente está localizado.
  2. Exporte pools do seu ambiente do Airflow 1.10.*:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         pool -- -e /home/airflow/gcs/data/pools.json
    
  3. Veja o URI do bucket do ambiente do Airflow 2.

    1. Execute este comando:

      gcloud composer environments describe AIRFLOW_2_ENV \
          --location AIRFLOW_2_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      Substitua:

      • AIRFLOW_2_ENV pelo nome do ambiente do Airflow 2.
      • AIRFLOW_2_LOCATION pela região do Compute Engine em que o ambiente está localizado.
    2. Na saída, remova a pasta /dags. O resultado é o URI do bucket de ambiente Airflow 1.10.*.

      Por exemplo, altere gs://us-central1-example-916807e1-bucket/dags para gs://us-central1-example-916807e1-bucket.

  4. Transfira arquivos JSON com variáveis e pools para o ambiente do Airflow 2:

    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=variables.json
    
    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=pools.json
    

    Substitua AIRFLOW_2_BUCKET pelo URI do bucket do ambiente do Airflow 2, recebido na etapa anterior.

  5. Importe variáveis e pools para o Airflow 2:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables import \
        -- /home/airflow/gcs/data/variables.json
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools import \
        -- /home/airflow/gcs/data/pools.json
    
  6. Verifique se as variáveis e os pools são importados:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables list
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools list
    
  7. Remova arquivos JSON dos buckets:

    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    

Etapa 5: transferir outros dados do bucket do ambiente do Airflow 1.10.*

gcloud

  1. Transfira plug-ins para o ambiente do Airflow. Para fazer isso, exporte plug-ins do bucket do ambiente do Airflow 1.10.* para a pasta /plugins no bucket do ambiente do Airflow 2:

    gcloud composer environments storage plugins export \
        --destination=AIRFLOW_2_BUCKET/plugins \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
  2. Verifique se a pasta /plugins foi importada com sucesso:

    gcloud composer environments storage plugins list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
  3. Exporte a pasta /data do ambiente do Airflow 1.10.* para o ambiente do Airflow 2:

        gcloud composer environments storage data export \
            --destination=AIRFLOW_2_BUCKET/data \
            --environment=AIRFLOW_1_ENV \
            --location=AIRFLOW_1_LOCATION
    
  4. Verifique se a pasta /data foi importada com sucesso:

    gcloud composer environments storage data list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    

Etapa 6: transferir conexões e usuários

O Airflow 1.10.* não é compatível com a exportação de usuários e conexões. Para transferir usuários e conexões, crie manualmente novas contas de usuário e conexões para o ambiente do Airflow 2.10.*.

gcloud

  1. Para ver uma lista de conexões no ambiente do Airflow 1.10.*, execute:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         connections -- --list
    
  2. Para criar uma nova conexão no ambiente do Airflow 2, execute o comando connections da CLI do Airflow usando a gcloud. Exemplo:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        connections add \
        -- --conn-host postgres.example.com \
        --conn-port 5432 \
        --conn-type postgres \
        --conn-login example_user \
        --conn-password example_password \
        --conn-description "Example connection" \
        example_connection
    
  3. Para ver uma lista de usuários no ambiente do Airflow 1.10.*:

    1. Abra a interface da Web do Airflow para o ambiente do Airflow 1.10.*.

    2. Acesse Administrador > Usuários.

  4. Para criar uma nova conta de usuário no ambiente do Airflow 2, execute o comando users create da CLI do Airflow usando a gcloud. Exemplo:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        users create \
        -- --username example_username \
        --firstname Example-Name \
        --lastname Example-Surname \
        --email example-user@example.com \
        --password example_password \
        --role Admin
    

Etapa 7: verifique se os DAGs estão prontos para o Airflow 2

Antes de transferir DAGs para o ambiente do Airflow 2, verifique se:

  1. Scripts de verificações de upgrade para os DAGs são executados com êxito e não há problemas de compatibilidade restantes.

  2. Seus DAGs usam instruções de importação corretas.

    Por exemplo, a nova instrução de importação para BigQueryCreateDataTransferOperator pode ter esta aparência:

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. Os DAGs são atualizados para o Airflow 2. Essa mudança é compatível com o Airflow 1.10.14 e versões posteriores.

Etapa 8: transferir DAGs para o ambiente do Airflow 2

Os seguintes problemas podem acontecer quando você transfere DAGs entre ambientes:

  • Se um DAG estiver ativado (não pausado) nos dois ambientes, cada ambiente executará uma cópia própria do DAG, conforme programado. Isso pode gerar execuções simultâneas de DAGs para os mesmos dados e ambiente de execução.

  • Devido à coleta do DAG, o Airflow programa execuções extras de DAGs, começando pela data de início especificada nos DAGs. Isso acontece porque a nova instância do Airflow não considera o histórico de execuções do DAG do ambiente 1.10.*. Isso pode levar a um grande número de execuções de DAGs programadas a partir da data de início especificada.

Impedir execuções simultâneas de DAGs

No ambiente do Airflow 2, modifique a opção de configuração dags-are-paused-at-creation do Airflow. Depois que você fizer essa alteração, todos os novos DAGs serão pausados por padrão.

Seção Chave Valor
core dags-are-paused-at-creation True

Evitar execuções extras ou ausentes de DAGs

Especifique uma nova data de início estática nos DAGs que você transferiu para o ambiente do Airflow 2.

Para evitar lacunas e sobreposições nas datas de execução, a primeira execução do DAG precisa acontecer no ambiente do Airflow 2 na próxima ocorrência do intervalo de programação. Para fazer isso, defina a nova data de início no DAG como anterior à data da última execução no ambiente do Airflow 1.10.*.

Por exemplo, se o DAG for executado 15:00, 17:00 e 21.00 todos os dias no ambiente do Airflow 1.10.*, a última execução do DAG aconteceu às 15:00 e você pretende transferir o DAG às 15h15, a data de início do ambiente do Airflow 2 pode ser hoje às 14h45. Depois de ativar o DAG no ambiente do Airflow 2, ele programará uma execução do DAG para 17h.

Outro exemplo: se o DAG for executado à 00h todos os dias no ambiente do Airflow 1.10.*, a última execução do DAG aconteceu às 00h de 26 de abril de 2021 e você planeja transferir o DAG às 13h de 26 de abril de 2021, a data de início do ambiente do Airflow 2 pode ser às 23h45 de 25 de abril de 2021. Depois de ativar o DAG no ambiente do Airflow 2, ele programará uma execução do DAG para a 00:00 do dia 27 de abril de 2021.

Transfira os DAGs um por um para o ambiente do Airflow 2

Para cada DAG, siga este procedimento para transferi-lo:

  1. Certifique-se de que a nova data de início no DAG esteja definida conforme descrito na seção anterior.

  2. Faça upload do DAG atualizado para o ambiente do Airflow 2. Esse DAG está pausado no ambiente do Airflow 2 devido à modificação da configuração. Portanto, nenhuma execução de DAG ainda está programada.

  3. Na interface da Web do Airflow, acesse DAGs e verifique se há erros de sintaxe de DAG relatados.

  4. No momento em que você planeja transferir o DAG:

    1. Pause o DAG no ambiente do Airflow 1.10.*.

    2. Cancele a pausa do DAG no ambiente do Airflow 2.

    3. Verifique se a nova execução do DAG está programada para o horário correto.

    4. Aguarde a execução do DAG acontecer no ambiente do Airflow 2 e verifique se foi bem-sucedida.

  5. Dependendo da conclusão da execução do DAG:

    • Se a execução do DAG for bem-sucedida, será possível continuar e usar o DAG a partir do ambiente do Airflow 2. No fim das contas, considere excluir a versão do Airflow 1.10.* do DAG.

    • Se a execução do DAG falhar, tente resolver problemas do DAG até que ele seja executado com sucesso no Airflow 2.

      Se necessário, sempre é possível recorrer à versão 1.10.* do Airflow do DAG:

      1. Pause o DAG no ambiente do Airflow 2.

      2. Pause o DAG no ambiente do Airflow 1.10.*. Isso programa uma nova execução do DAG para a mesma data e hora que a execução do DAG com falha.

      3. Quando você estiver pronto para continuar com a versão do Airflow 2 do DAG, ajuste a data de início, faça upload da nova versão do DAG para o ambiente do Airflow 2 e repita o procedimento.

Etapa 9: monitorar o ambiente do Airflow 2

Depois de transferir todos os DAGs e a configuração para o ambiente do Airflow 2, monitore-os em busca de possíveis problemas, execuções com falha do DAG e integridade geral do ambiente. Se o ambiente do Airflow 2 for executado sem problemas por um período suficiente, será possível remover o ambiente do Airflow 1.10.*.

A seguir