Migrar ambientes para o Airflow 2

Nesta página, explicamos como transferir DAGs, dados e configurações dos seus ambientes atuais do Cloud versão 1.10.* para ambientes com o Airflow 2 e versões posteriores do Airflow.

Antes de começar

Antes de começar a usar os ambientes do Cloud Composer com o Airflow 2, pense em alterações que o Airflow 2 traz para os ambientes do Cloud Composer.

Alta disponibilidade do programador

A alta disponibilidade do programador 2 do Airflow (alta disponibilidade) não é compatível com a versão atual do Cloud Composer.

Executor do Celery+Kubernetes

O Airflow 2 Celery+Kubernetes Executor não é compatível com a versão atual do Cloud Composer.

Alterações importantes

O Airflow 2 introduz muitas das principais mudanças que estão confirando:

Diferenças entre ambientes com o Airflow 2 e Airflow 1.1.10.*

Principais diferenças entre os ambientes do Cloud Composer com o Airflow 1.10.* e os ambientes com o Airflow 2:

  • Ambientes com o Airflow 2 usam o Python 3.8. Esta é uma versão mais recente do que a usada nos ambientes do Airflow 1.10.*. Python 2, Python 3.6 e Python 3.7 não são compatíveis.
  • O Airflow 2 usa um formato de CLI diferente. O Cloud Composer é compatível com o novo formato em ambientes com o Airflow 2 por meio do comando gcloud beta composer environments run.
  • Os pacotes PyPI pré-instalados são diferentes nos ambientes do Airflow 2. Para ver uma lista de pacotes PyPI pré-instalados, consulte Lista de versões do Cloud Composer.
  • A serialização de DAG está sempre ativada no Airflow 2. Como resultado, o carregamento assíncrono do DAG não é mais necessário e não é compatível com o Airflow 2. Como resultado, a configuração dos parâmetros [core]store_serialized_dags e [core]store_dag_code não é compatível com o Airflow 2, e as tentativas de configurá-los serão informadas como erros.
  • Os plug-ins do servidor da Web do Airflow não são compatíveis. Isso não afeta os plug-ins do programador ou do worker, incluindo operadores e sensores do Airflow.
  • Nos ambientes do Airflow 2, o papel de usuário padrão do RBAC é Op. Para ambientes com Airflow 1.10.*, o papel padrão é Admin.

Etapa 1: verificar a compatibilidade com o Airflow 2

Para verificar possíveis conflitos com o Airflow 2, use scripts de verificação de upgrade fornecidos pelo Airflow no ambiente atual do Airflow 1.10.*.

gcloud

  1. Se seu ambiente usa o Airflow 1.10.14 e versões anteriores, faça upgrade do ambiente para uma versão do Cloud Composer que use o Airflow 1.10.15 e posteriores. O Cloud Composer é compatível com os comandos de verificação de upgrade a partir do Airflow 1.10.15.

  2. Execute verificações de upgrade por meio do comando gcloud beta composer environments run. Algumas verificações de upgrade relevantes para o Airflow 1.11.10.15 não são relevantes para o Cloud Composer. O comando a seguir exclui essas verificações.

    gcloud beta composer environments run \
        AIRFLOW_1_ENV  \
        --location=AIRFLOW_1_LOCATION \
        upgrade_check \
        -- --ignore VersionCheckRule --ignore LoggingConfigurationRule \
        --ignore PodTemplateFileRule --ignore SendGridEmailerMovedRule
    

    Substitua:

    • AIRFLOW_1_ENV pelo nome do ambiente do Airflow 1.10.*.
    • AIRFLOW_1_LOCATION pela região do Compute Engine em que o ambiente está localizado.
  3. Verifique a saída do comando. Os scripts de verificação de atualização informam possíveis problemas de compatibilidade em ambientes existentes.

  4. Implemente outras alterações nos DAGs, conforme descrito no guia de upgrade para o Airflow 2.0+, na seção sobre como fazer upgrade dos DAGs.

Etapa 2: criar um ambiente do Airflow 2, modificações de configuração de transferência e variáveis de ambiente

Crie um ambiente do Airflow 2 e as substituições de configuração de transferência e variáveis de ambiente:

  1. Siga as etapas para criar um ambiente. Antes de criar um ambiente, especifique também as modificações de configuração e as variáveis de ambiente, conforme explicado mais adiante.

  2. Quando você selecionar uma imagem, escolha uma imagem com o Airflow 2.

  3. Transfira manualmente os parâmetros de configuração do seu ambiente do Airflow 1.10.* para o novo ambiente do Airflow 2.

    Console

    1. Ao criar um ambiente, expanda a seção Rede, modificações de configuração do Airflow e recursos adicionais.

    2. Em Modificações da configuração do Airflow, clique em Adicionar modificação da configuração do Airflow.

    3. Copie todas as modificações de configuração do ambiente do Airflow 1.10.*.

      Algumas opções de configuração usam um nome e uma seção diferentes no Airflow 2. Para mais informações, consulte Alterações na configuração.

    4. Em Variáveis de ambiente, clique em Adicionar variável de ambiente

    5. Copie todas as variáveis de ambiente do ambiente Airflow 1.10.*.

    6. Clique em Criar para criar um ambiente.

Etapa 3: instalar pacotes PyPI no ambiente do Airflow 2

Após a criação do ambiente do Airflow 2, instale os pacotes PyPI nele:

Console

  1. No Console do Google Cloud, acesse a página Ambientes.

    Acessar ambientes

  2. Selecione seu ambiente do Airflow 2.

  3. Acesse a guia pacotes PyPI e clique em Editar.

  4. Copie os requisitos do pacote PyPI do seu ambiente do Airflow 1.10.*. Clique em Save e aguarde até que o ambiente seja atualizado.

    Como os ambientes do Airflow 2 usam um conjunto diferente de pacotes pré-instalados e uma versão diferente do Python, talvez você encontre conflitos de pacote PyPI que são difíceis de resolver. Uma maneira de diagnosticar problemas de dependências de pacotes é verificar erros de pacote PyPI instalando pacotes em um pod de worker do Airflow.

Etapa 4: transferir variáveis e pools para o Airflow 2

O Airflow 1.10.* é compatível com a exportação de variáveis e pools para arquivos JSON. Em seguida, você pode importar esses arquivos para o ambiente do Airflow 2.

Só é necessário transferir pools se você tiver pools personalizados diferentes de default_pool. Caso contrário, pule os comandos que exportam e importam pools.

gcloud

  1. Exporte variáveis do ambiente do Airflow 1.10.*:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         variables -- -e /home/airflow/gcs/data/variables.json
    

    Substitua:

    • AIRFLOW_1_ENV pelo nome do ambiente do Airflow 1.10.*.
    • AIRFLOW_1_LOCATION pela região do Compute Engine em que o ambiente está localizado.
  2. Exporte pools do seu ambiente do Airflow 1.10.*:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         pool -- -e /home/airflow/gcs/data/pools.json
    
  3. Consiga o URI do bucket do ambiente do Airflow 2.

    1. Execute este comando:

      gcloud composer environments describe AIRFLOW_2_ENV \
          --location AIRFLOW_2_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      Substitua:

      • AIRFLOW_2_ENV pelo nome do ambiente do Airflow 2.
      • AIRFLOW_2_LOCATION pela região do Compute Engine em que o ambiente está localizado.
    2. Na saída, remova a pasta /dags. O resultado é o URI do bucket de ambiente do Airflow 1.10.*.

      Por exemplo, altere gs://us-central1-example-916807e1-bucket/dags para gs://us-central1-example-916807e1-bucket.

  4. Transfira arquivos JSON com variáveis e pools para o ambiente do Airflow 2:

    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=variables.json
    
    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=pools.json
    

    Substitua AIRFLOW_2_BUCKET pelo URI do bucket de ambiente do Airflow 2, que você recebeu na etapa anterior.

  5. Importe variáveis e pools para o Airflow 2:

    gcloud beta composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables import \
        -- /home/airflow/gcs/data/variables.json
    
    gcloud beta composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools import \
        -- /home/airflow/gcs/data/pools.json
    
  6. Verifique se as variáveis e os pools são importados:

    gcloud beta composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables list
    
    gcloud beta composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools list
    
  7. Remova os arquivos JSON dos buckets:

    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    

Etapa 5: transferir outros dados do bucket de ambiente do Airflow 1.10.*

gcloud

  1. Transfira plug-ins para o ambiente do Airflow 2. Para fazer isso, exporte os plug-ins do intervalo de ambiente do Airflow 1.10.* Para a pasta /plugins no intervalo de ambiente do Airflow 2:

    gcloud composer environments storage plugins export \
        --destination=AIRFLOW_2_BUCKET/plugins \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
  2. Verifique se a pasta /plugins foi importada com êxito:

    gcloud composer environments storage plugins list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
  3. Exporte a pasta /data do ambiente do Airflow 1.10.* para o ambiente do Airflow 2:

        gcloud composer environments storage data export \
            --destination=AIRFLOW_2_BUCKET/data \
            --environment=AIRFLOW_1_ENV \
            --location=AIRFLOW_1_LOCATION
    
  4. Verifique se a pasta /data foi importada com sucesso:

    gcloud composer environments storage data list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    

Etapa 6: transferir conexões e usuários

O Airflow 1.10.* não é compatível com a exportação de usuários e conexões. Para transferir usuários e conexões, crie manualmente novas contas e conexões de usuário para seu ambiente do Airflow 2 no ambiente do Airflow 1.10.*.

gcloud

  1. Para ver uma lista de conexões no ambiente 1.10.* do Airflow, execute:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         connections -- --list
    
  2. Para criar uma nova conexão no ambiente do Airflow 2, execute o comando connections da CLI do Airflow por meio da gcloud. Exemplo:

    gcloud beta composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        connections add \
        -- --conn-host postgres.example.com \
        --conn-port 5432 \
        --conn-type postgres \
        --conn-login example_user \
        --conn-password example_password \
        --conn-description "Example connection" \
        example_connection
    
  3. Para ver uma lista de usuários no ambiente 1.10.* do Airflow:

    1. Abra a interface da Web do Airflow para seu ambiente Airflow 1.10.*.

    2. Acesse Administrador > Usuários.

  4. Para criar uma nova conta de usuário no ambiente do Airflow 2, execute o comando users create da CLI do Airflow por meio do gcloud. Exemplo:

    gcloud beta composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        users create \
        -- --username example_username \
        --firstname Example-Name \
        --lastname Example-Surname \
        --email example-user@example.com \
        --password example_password \
        --role Admin
    

Etapa 7: verificar se os DAGs estão prontos para o Airflow 2

Antes de transferir DAGs para seu ambiente do Airflow 2, verifique se:

  1. Os scripts de verificação de upgrade para seus DAGs são executados com sucesso, e não há problemas de compatibilidade restantes.

  2. Os DAGs usam declarações corretas de importação.

    Por exemplo, a nova instrução de importação para BigQueryCreateDataTransferOperator pode ser assim:

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. Os DAGs são atualizados para o Airflow 2. Essa alteração é compatível com o Airflow 1.10.14 e versões posteriores.

Etapa 8: transferir DAGs para o ambiente Airflow 2

Os seguintes problemas podem acontecer quando você transfere DAGs entre ambientes:

  • Se um DAG estiver ativado (não pausado) em ambos os ambientes, cada ambiente executará sua própria cópia do DAG, conforme programado. Isso pode gerar execuções de DAG simultâneas para os mesmos dados e tempo de execução.

  • Devido ao registro de DAG, o Airflow programa execuções extras de DAG, começando pela data de início especificada nos DAGs. Isso ocorre porque a nova instância do Airflow não leva em consideração o histórico de execuções do DAG do ambiente 1.10.*. Isso pode levar a um grande número de execuções de DAG programadas a partir da data de início especificada.

Evite execuções simultâneas de DAG

No ambiente do Airflow 2, substitua a opção de configuração dags-are-paused-at-creation. Depois que você fizer essa alteração, todos os novos DAGs serão pausados por padrão.

Seção Chave Valor
core dags-are-paused-at-creation True

Evitar execuções extras ou ausentes do DAG

Especifique uma nova data de início estática nos DAGs que você transfere para o ambiente do Airflow 2.

Para evitar lacunas e sobreposições nas datas de execução, a primeira execução do DAG precisa ocorrer no ambiente do Airflow 2 na próxima ocorrência do intervalo da programação. Para isso, defina a nova data de início no DAG como antes da data da última execução no ambiente Airflow 1.10.*.

Por exemplo, se o DAG for executado em 15h, 17h e 21h todos os dias no ambiente Airflow.1.10.*, a última execução do DAG aconteceu em 15h, e você planeja transferir o DAG às 15:15, então a data de início do ambiente do Airflow 2 pode ser hoje às 14:45. Depois que você ativa o DAG no ambiente do Airflow 2, o Airflow programa uma execução do DAG para 17h00.

Como outro exemplo, se o DAG é executado à meia-noite todos os dias no ambiente Airflow 1.10.*, a última execução do DAG aconteceu às 00:00 em 26 de abril de 2021, e você planeja transferir o DAG às 13h do dia 26 de abril de 2021, a data de início do ambiente do Airflow 2 pode ser 23:45 em 25 de abril de 2021. Depois de ativar o DAG no ambiente do Airflow 2, o Airflow programa uma execução do DAG para a meia-noite do dia 27 de abril de 2021.

Transferir os DAGs um por um para o ambiente do Airflow 2

Siga estas etapas para transferir cada DAG:

  1. Verifique se a nova data de início no DAG está definida conforme descrito na seção anterior.

  2. Faça o upload do DAG atualizado para o ambiente do Airflow 2. Esse DAG está pausado no ambiente do Airflow 2 devido à modificação da configuração. Portanto, nenhuma execução de DAG é programada ainda.

  3. Na interface da Web do Airflow, acesse DAGs e verifique se há erros de sintaxe do DAG informados.

  4. No momento em que você planeja transferir o DAG:

    1. Pause o DAG no ambiente do Airflow 1.10.*.

    2. Retome o DAG no ambiente do Airflow 2.

    3. Verifique se a nova execução do DAG está programada no horário correto.

    4. Aguarde até que a execução do DAG aconteça no ambiente do Airflow 2 e verifique se a execução foi bem-sucedida.

  5. Dependendo se a execução do DAG for bem-sucedida:

    • Se a execução do DAG for bem-sucedida, será possível prosseguir e usá-lo a partir do ambiente do Airflow 2. Por fim, pense em excluir a versão 1.0.* do Airflow do DAG.

    • Se a execução do DAG falhar, tente resolver problemas com o DAG até que ele seja executado no Airflow 2.

      Se necessário, é possível voltar para a versão 1.10.* do Airflow:

      1. Pause o DAG no ambiente do Airflow 2.

      2. Cancele a pausa do DAG no ambiente do Airflow 1.10.*. Isso programa uma nova execução do DAG para a mesma data e hora que a execução do DAG com falha.

      3. Quando estiver pronto para continuar com a versão 2 do Airflow do ajuste do DAG, ajuste a data de início, faça upload da nova versão do DAG para o ambiente do Airflow 2 e repita o procedimento.

Etapa 9: monitorar o ambiente do Airflow 2

Depois de transferir todos os DAGs e a configuração para o ambiente do Airflow 2, monitore-o em busca de possíveis problemas, execuções de DAG com falha e integridade geral do ambiente. Se o ambiente do Airflow 2 for executado sem problemas por um período suficiente, remova o ambiente do Airflow 1.10.*.

A seguir