Como resolver problemas de atualização e upgrade de ambientes

Cloud Composer 1 | Cloud Composer 2

Nesta página, você encontrará informações para solucionar problemas que podem ser encontrados ao atualizar ou atualizar ambientes do Cloud Composer.

Para informações sobre solução de problemas relacionadas à criação de ambientes, consulte Solução de problemas de criação de ambientes.

Quando os ambientes do Cloud Composer são atualizados, a maioria dos problemas acontece pelos seguintes motivos:

  • Problemas de permissão da conta de serviço
  • Problemas de dependência do PyPI
  • Tamanho do banco de dados do Airflow

Permissões insuficientes para atualizar ou fazer upgrade de um ambiente

Se o Cloud Composer não puder atualizar ou fazer upgrade de um ambiente devido a permissões insuficientes, ele gerará a seguinte mensagem de erro:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Solução: atribua papéis à conta e à conta de serviço do ambiente, conforme descrito em Controle de acesso.

A conta de serviço do ambiente não tem permissões suficientes

Ao criar um ambiente do Cloud Composer, você especifica uma conta de serviço que executa os nós do cluster do GKE do ambiente. Se essa conta de serviço não tiver permissões suficientes para a operação solicitada, o Cloud Composer gerará um erro:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Solução: atribua papéis à conta e à conta de serviço do ambiente, conforme descrito em Controle de acesso.

O tamanho do banco de dados do Airflow é muito grande para realizar a operação

Uma operação de upgrade do Cloud Composer pode não ser bem-sucedida porque o tamanho do banco de dados do Airflow é grande demais para que as operações de upgrade sejam bem-sucedidas.

Se o tamanho do banco de dados do Airflow for maior do que 16 GB, o Cloud Composer gerará o seguinte erro:

Airflow database uses more than 16 GB. Please clean the database before upgrading.

Solução: execute a limpeza do banco de dados do Airflow, conforme descrito em Manutenção do banco de dados do Airflow.

Falha no upgrade para uma nova versão do Cloud Composer devido a conflitos de pacote do PyPI

Ao fazer upgrade de um ambiente com pacotes PyPI personalizados instalados, você pode encontrar erros relacionados a conflitos de pacotes PyPI. Isso pode acontecer porque a nova imagem do Cloud Composer contém versões mais recentes de pacotes pré-instalados que causam conflitos de dependência com os pacotes PyPI instalados no ambiente.

Solução:

  • Para informações detalhadas sobre conflitos de pacote, execute uma verificação de upgrade.
  • Reduzir as restrições de versão para pacotes PyPI personalizados instalados. Por exemplo, em vez de especificar uma versão como ==1.0.1, especifique-a como >=1.0.1.
  • Para mais informações sobre como alterar os requisitos de versão para resolver dependências conflitantes, consulte a documentação do pip (em inglês).

A falta de conectividade com o DNS pode causar problemas durante upgrades ou atualizações

Esses problemas de conectividade podem resultar em entradas de registro como estas:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

Isso geralmente significa que não há rota para o DNS. Portanto, verifique se o nome do DNS metadata.google.internal pode ser resolvido para o endereço IP de dentro do cluster, de pods e das redes de serviços. Verifique se o Acesso privado do Google está ativado na VPC (no projeto host ou de serviço) em que o ambiente foi criado.

Mais informações:

A CPU do engatilhador excede o limite de 1 vCPU

O Cloud Composer 2 nas versões 2.4.4 e mais recentes apresenta uma estratégia diferente de alocação de recursos do acionador para melhorar o escalonamento de desempenho. Se você encontrar um erro relacionado à CPU do engatilhador ao executar uma atualização de ambiente, isso significa que os gatilhos atuais estão configurados para usar mais de uma vCPU por acionador.

Solução:

Inspecionar avisos de migração com falha

Ao fazer upgrade do Airflow para uma versão posterior, às vezes novas restrições são aplicadas ao banco de dados do Airflow. Se essas restrições não puderem ser aplicadas, o Airflow criará novas tabelas para armazenar as linhas em que as restrições não puderam ser aplicadas. A interface do Airflow exibe uma mensagem de aviso até que as tabelas de dados movidas sejam renomeadas ou descartadas.

Solução:

É possível usar os dois DAGs a seguir para inspecionar os dados movidos e renomear as tabelas.

O DAG list_moved_tables_after_upgrade_dag lista as linhas que foram movidas de todas as tabelas em que não foi possível aplicar restrições. Inspecione os dados e decida se quer mantê-los. Para mantê-lo, você precisa corrigir manualmente os dados no banco de dados do Airflow. Por exemplo, adicionando as linhas de volta com os dados corretos.

Se você não precisa dos dados ou já os corrigiu, execute o DAG rename_moved_tables_after_upgrade_dag. Esse DAG renomeia as tabelas movidas. As tabelas e os dados delas não são excluídos. Assim, é possível revisar os dados posteriormente.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX

def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )

def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())

def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")

with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

A seguir