Cette page a été traduite par l'API Cloud Translation.
Switch to English

Mise à niveau des environnements

Cette page explique comment mettre à niveau la version Airflow ou la version Cloud Composer exécutée par votre environnement.

Pendant la mise à niveau, Cloud Composer effectue les opérations suivantes :

  • Redéploiement des pods de planificateur et de nœud de calcul Airflow dans un nouvel espace de noms Kubernetes. Une fois la mise à niveau terminée, Airflow utilisera une nouvelle base de données MySQL. Le nom de la base de données correspond à l'espace de noms Kubernetes. L'historique d'exécution du DAG est conservé.

  • Il met à jour la connexion Airflow airflow_db de sorte qu'elle pointe vers la nouvelle base de données Cloud SQL.

Ces modifications affectent la manière dont vous accédez aux pods et dont vous vous connectez à la base de données Cloud SQL.

  • Pour accéder aux pods du cluster GKE après la mise à niveau, vous devez utiliser des commandes kubectl compatibles avec les espaces de noms. Par exemple, pour répertorier les pods du cluster, utilisez kubectl get pods -A. Pour exécuter une commande sur un pod, utilisez kubectl exec -n <NAMESPACE> ....
  • Si vous utilisez des charges de travail et des connexions Airflow faisant directement référence au proxy SQL, indiquez un nom d'hôte qui comporte l'espace de noms par défaut : airflow-sqlproxy-service.default, et non pas airflow-sqlproxy-service.

La mise à niveau de Cloud Composer ne modifie pas la manière dont vous vous connectez aux ressources de votre environnement, comme les adresses IP de VM de nœuds Google Kubernetes Engine, l'adresse IP de l'instance Cloud SQL, le bucket Cloud Storage ou le nom de domaine du serveur Web Airflow.

Avant de commencer

  • La mise à niveau des environnements est actuellement en aperçu. Utilisez cette fonctionnalité avec prudence dans les environnements de production.
  • Le rôle roles/editor ou roles/composer.admin est requis pour procéder à la mise à niveau.
  • Suspendez tous les DAG et attendez la fin des tâches en cours avant de procéder à la mise à niveau.
  • Vous pouvez mettre à niveau Cloud Composer, la version Airflow ou les deux en même temps.
  • La combinaison Cloud Composer-Airflow que vous mettez à niveau doit être une version publiée.
    • Pour connaître les mises à niveau disponibles, consultez la page Afficher les mises à niveau disponibles. Pour obtenir les dernières fonctionnalités et les derniers correctifs, envisagez de passer à la version la plus récente de Cloud Composer.
    • Pour obtenir la liste des packages PyPI et des personnalisations dans une version compatible, consultez la liste des versions de Cloud Composer.

      Avant de procéder à la mise à niveau, assurez-vous de connaître les différences entre les versions actuelles Airflow et Cloud Composer et les versions vers lesquelles vous effectuez la mise à niveau. Des modifications incompatibles peuvent entraîner le dysfonctionnement des DAG.

Maintenance de la base de données Airflow

La base de données Airflow collecte de plus en plus de données au fil du temps.

Vous pouvez utiliser ce DAG de maintenance pour nettoyer le contenu de votre base de données. Ce DAG supprimera les anciennes entrées des tables DagRun, TaskInstance, Log, XCom, Job DB et SlaMisss.

DAG de maintenance de la base de données

"""
A maintenance workflow that you can deploy into Airflow to periodically clean
out the DagRun, TaskInstance, Log, XCom, Job DB and SlaMiss entries to avoid
having too much data in your Airflow MetaStore.

## Authors

The DAG is a fork of [teamclairvoyant repository.](https://github.com/teamclairvoyant/airflow-maintenance-dags/tree/master/db-cleanup)

## Usage

1. Update the global variables (SCHEDULE_INTERVAL, DAG_OWNER_NAME,
  ALERT_EMAIL_ADDRESSES and ENABLE_DELETE) in the DAG with the desired values

2. Modify the DATABASE_OBJECTS list to add/remove objects as needed. Each
   dictionary in the list features the following parameters:
    - airflow_db_model: Model imported from airflow.models corresponding to
      a table in the airflow metadata database
    - age_check_column: Column in the model/table to use for calculating max
      date of data deletion
    - keep_last: Boolean to specify whether to preserve last run instance
        - keep_last_filters: List of filters to preserve data from deleting
          during clean-up, such as DAG runs where the external trigger is set to 0.
        - keep_last_group_by: Option to specify column by which to group the
          database entries and perform aggregate functions.

3. Create and Set the following Variables in the Airflow Web Server
  (Admin -> Variables)
    - airflow_db_cleanup__max_db_entry_age_in_days - integer - Length to retain
      the log files if not already provided in the conf. If this is set to 30,
      the job will remove those files that are 30 days old or older.

4. Put the DAG in your gcs bucket.
"""
from datetime import datetime, timedelta
import logging
import os

import airflow
from airflow import settings
from airflow.configuration import conf
from airflow.jobs import BaseJob
from airflow.models import DAG, DagModel, DagRun, Log, SlaMiss, \
    TaskInstance, Variable, XCom
from airflow.operators.python_operator import PythonOperator
import dateutil.parser
from sqlalchemy import and_, func
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.orm import load_only

try:
    # airflow.utils.timezone is available from v1.10 onwards
    from airflow.utils import timezone
    now = timezone.utcnow
except ImportError:
    now = datetime.utcnow

# airflow-db-cleanup
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
START_DATE = airflow.utils.dates.days_ago(1)
# How often to Run. @daily - Once a day at Midnight (UTC)
SCHEDULE_INTERVAL = "@daily"
# Who is listed as the owner of this DAG in the Airflow Web Server
DAG_OWNER_NAME = "operations"
# List of email address to send email alerts to if this job fails
ALERT_EMAIL_ADDRESSES = []
# Length to retain the log files if not already provided in the conf. If this
# is set to 30, the job will remove those files that arE 30 days old or older.

DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int(
    Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30))
# Prints the database entries which will be getting deleted; set to False
# to avoid printing large lists and slowdown process
PRINT_DELETES = False
# Whether the job should delete the db entries or not. Included if you want to
# temporarily avoid deleting the db entries.
ENABLE_DELETE = True
# List of all the objects that will be deleted. Comment out the DB objects you
# want to skip.
DATABASE_OBJECTS = [{
    "airflow_db_model": BaseJob,
    "age_check_column": BaseJob.latest_heartbeat,
    "keep_last": False,
    "keep_last_filters": None,
    "keep_last_group_by": None
}, {
    "airflow_db_model": DagRun,
    "age_check_column": DagRun.execution_date,
    "keep_last": True,
    "keep_last_filters": [DagRun.external_trigger.is_(False)],
    "keep_last_group_by": DagRun.dag_id
}, {
    "airflow_db_model": TaskInstance,
    "age_check_column": TaskInstance.execution_date,
    "keep_last": False,
    "keep_last_filters": None,
    "keep_last_group_by": None
}, {
    "airflow_db_model": Log,
    "age_check_column": Log.dttm,
    "keep_last": False,
    "keep_last_filters": None,
    "keep_last_group_by": None
}, {
    "airflow_db_model": XCom,
    "age_check_column": XCom.execution_date,
    "keep_last": False,
    "keep_last_filters": None,
    "keep_last_group_by": None
}, {
    "airflow_db_model": SlaMiss,
    "age_check_column": SlaMiss.execution_date,
    "keep_last": False,
    "keep_last_filters": None,
    "keep_last_group_by": None
}, {
    "airflow_db_model": DagModel,
    "age_check_column": DagModel.last_scheduler_run,
    "keep_last": False,
    "keep_last_filters": None,
    "keep_last_group_by": None
}]

# Check for TaskReschedule model
try:
    from airflow.models import TaskReschedule
    DATABASE_OBJECTS.append({
        "airflow_db_model": TaskReschedule,
        "age_check_column": TaskReschedule.execution_date,
        "keep_last": False,
        "keep_last_filters": None,
        "keep_last_group_by": None
    })

except Exception as e:
    logging.error(e)

# Check for TaskFail model
try:
    from airflow.models import TaskFail
    DATABASE_OBJECTS.append({
        "airflow_db_model": TaskFail,
        "age_check_column": TaskFail.execution_date,
        "keep_last": False,
        "keep_last_filters": None,
        "keep_last_group_by": None
    })

except Exception as e:
    logging.error(e)

# Check for RenderedTaskInstanceFields model
try:
    from airflow.models import RenderedTaskInstanceFields
    DATABASE_OBJECTS.append({
        "airflow_db_model": RenderedTaskInstanceFields,
        "age_check_column": RenderedTaskInstanceFields.execution_date,
        "keep_last": False,
        "keep_last_filters": None,
        "keep_last_group_by": None
    })

except Exception as e:
    logging.error(e)

# Check for ImportError model
try:
    from airflow.models import ImportError
    DATABASE_OBJECTS.append({
        "airflow_db_model": ImportError,
        "age_check_column": ImportError.timestamp,
        "keep_last": False,
        "keep_last_filters": None,
        "keep_last_group_by": None
    })

except Exception as e:
    logging.error(e)

# Check for celery executor
airflow_executor = str(conf.get("core", "executor"))
logging.info("Airflow Executor: " + str(airflow_executor))
if (airflow_executor == "CeleryExecutor"):
    logging.info("Including Celery Modules")
    try:
        from celery.backends.database.models import Task, TaskSet
        DATABASE_OBJECTS.extend(({
            "airflow_db_model": Task,
            "age_check_column": Task.date_done,
            "keep_last": False,
            "keep_last_filters": None,
            "keep_last_group_by": None
        }, {
            "airflow_db_model": TaskSet,
            "age_check_column": TaskSet.date_done,
            "keep_last": False,
            "keep_last_filters": None,
            "keep_last_group_by": None
        }))

    except Exception as e:
        logging.error(e)

session = settings.Session()

default_args = {
    "owner": DAG_OWNER_NAME,
    "depends_on_past": False,
    "email": ALERT_EMAIL_ADDRESSES,
    "email_on_failure": True,
    "email_on_retry": False,
    "start_date": START_DATE,
    "retries": 1,
    "retry_delay": timedelta(minutes=1)
}

dag = DAG(
    DAG_ID,
    default_args=default_args,
    schedule_interval=SCHEDULE_INTERVAL,
    start_date=START_DATE)
if hasattr(dag, "doc_md"):
    dag.doc_md = __doc__
if hasattr(dag, "catchup"):
    dag.catchup = False

def print_configuration_function(**context):
    logging.info("Loading Configurations...")
    dag_run_conf = context.get("dag_run").conf
    logging.info("dag_run.conf: " + str(dag_run_conf))
    max_db_entry_age_in_days = None
    if dag_run_conf:
        max_db_entry_age_in_days = dag_run_conf.get(
            "maxDBEntryAgeInDays", None)
    logging.info("maxDBEntryAgeInDays from dag_run.conf: " + str(dag_run_conf))
    if (max_db_entry_age_in_days is None or max_db_entry_age_in_days < 1):
        logging.info(
            "maxDBEntryAgeInDays conf variable isn't included or Variable " +
            "value is less than 1. Using Default '" +
            str(DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS) + "'")
        max_db_entry_age_in_days = DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS
    max_date = now() + timedelta(-max_db_entry_age_in_days)
    logging.info("Finished Loading Configurations")
    logging.info("")

    logging.info("Configurations:")
    logging.info("max_db_entry_age_in_days: " + str(max_db_entry_age_in_days))
    logging.info("max_date:                 " + str(max_date))
    logging.info("enable_delete:            " + str(ENABLE_DELETE))
    logging.info("session:                  " + str(session))
    logging.info("")

    logging.info("Setting max_execution_date to XCom for Downstream Processes")
    context["ti"].xcom_push(key="max_date", value=max_date.isoformat())

print_configuration = PythonOperator(
    task_id="print_configuration",
    python_callable=print_configuration_function,
    provide_context=True,
    dag=dag)

def cleanup_function(**context):

    logging.info("Retrieving max_execution_date from XCom")
    max_date = context["ti"].xcom_pull(
        task_ids=print_configuration.task_id, key="max_date")
    max_date = dateutil.parser.parse(max_date)  # stored as iso8601 str in xcom

    airflow_db_model = context["params"].get("airflow_db_model")
    state = context["params"].get("state")
    age_check_column = context["params"].get("age_check_column")
    keep_last = context["params"].get("keep_last")
    keep_last_filters = context["params"].get("keep_last_filters")
    keep_last_group_by = context["params"].get("keep_last_group_by")

    logging.info("Configurations:")
    logging.info("max_date:                 " + str(max_date))
    logging.info("enable_delete:            " + str(ENABLE_DELETE))
    logging.info("session:                  " + str(session))
    logging.info("airflow_db_model:         " + str(airflow_db_model))
    logging.info("state:                    " + str(state))
    logging.info("age_check_column:         " + str(age_check_column))
    logging.info("keep_last:                " + str(keep_last))
    logging.info("keep_last_filters:        " + str(keep_last_filters))
    logging.info("keep_last_group_by:       " + str(keep_last_group_by))

    logging.info("")

    logging.info("Running Cleanup Process...")

    try:
        query = session.query(airflow_db_model).options(
            load_only(age_check_column))

        logging.info("INITIAL QUERY : " + str(query))

        if keep_last:

            subquery = session.query(func.max(DagRun.execution_date))
            # workaround for MySQL "table specified twice" issue
            # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
            if keep_last_filters is not None:
                for entry in keep_last_filters:
                    subquery = subquery.filter(entry)

                logging.info("SUB QUERY [keep_last_filters]: " + str(subquery))

            if keep_last_group_by is not None:
                subquery = subquery.group_by(keep_last_group_by)
                logging.info(
                    "SUB QUERY [keep_last_group_by]: " +
                    str(subquery))

            subquery = subquery.from_self()

            query = query.filter(
                and_(age_check_column.notin_(subquery)),
                and_(age_check_column <= max_date))

        else:
            query = query.filter(age_check_column <= max_date,)

        if PRINT_DELETES:
            entries_to_delete = query.all()

            logging.info("Query: " + str(query))
            logging.info("Process will be Deleting the following " +
                         str(airflow_db_model.__name__) + "(s):")
            for entry in entries_to_delete:
                date = str(entry.__dict__[str(age_check_column).split(".")[1]])
                logging.info("\tEntry: " + str(entry) + ", Date: " + date)

            logging.info("Process will be Deleting "
                         + str(len(entries_to_delete)) + " "
                         + str(airflow_db_model.__name__) + "(s)")
        else:
            logging.warn(
                "You've opted to skip printing the db entries to be deleted. "
                "Set PRINT_DELETES to True to show entries!!!")

        if ENABLE_DELETE:
            logging.info("Performing Delete...")
            # using bulk delete
            query.delete(synchronize_session=False)
            session.commit()
            logging.info("Finished Performing Delete")
        else:
            logging.warn("You've opted to skip deleting the db entries. "
                         "Set ENABLE_DELETE to True to delete entries!!!")

        logging.info("Finished Running Cleanup Process")

    except ProgrammingError as e:
        logging.error(e)
        logging.error(
            str(airflow_db_model) + " is not present in the metadata."
            "Skipping...")

for db_object in DATABASE_OBJECTS:

    cleanup_op = PythonOperator(
        task_id="cleanup_" + str(db_object["airflow_db_model"].__name__),
        python_callable=cleanup_function,
        params=db_object,
        provide_context=True,
        dag=dag)

    print_configuration.set_downstream(cleanup_op)

Vous pouvez également supprimer les entrées liées aux DAG dont vous n'avez plus besoin, tel que décrit dans la section Supprimer un DAG.

Limites

  • Vous ne pouvez pas revenir à une version antérieure de Cloud Composer ou Airflow.
  • Vous ne pouvez effectuer une mise à niveau que vers la dernière version de Cloud Composer au sein de la même version majeure, par exemple de composer-1.12.4-airflow-1.10.10 vers composer-1.13.0-airflow-1.10.10. La mise à niveau de composer-1.4.0-airflow-1.10.0 vers composer-2.0.0-airflow-1.10.0 n'est pas autorisée, car la version majeure de Cloud Composer passe de 1 à 2.
  • La version d'image que vous mettez à niveau doit être compatible avec la version Python actuelle de votre environnement.
  • Les mises à niveau ne peuvent pas être effectuées si la base de données Airflow contient plus de 16 Go de données. Lors de la mise à niveau, un avertissement s'affiche si c'est le cas et que vous devez effectuer une maintenance de base de données telle que décrite dans la section Maintenance de bases de données Airflow.

Afficher les mises à niveau disponibles

Pour afficher les versions Cloud Composer que vous pouvez mettre à niveau, procédez comme suit :

Console

  1. Ouvrez la page Environnements dans Google Cloud.

    Ouvrir la page Environnements

  2. Cliquez sur le Nom de l'environnement.

  3. Dans l'onglet Configuration de l'environnement, cliquez sur Mettre à niveau la version d'image.

  4. Pour les versions disponibles, cliquez sur le menu déroulant Version de l'image de Cloud Composer.

gcloud

gcloud beta composer environments list-upgrades ENVIRONMENT_NAME \
    --location LOCATION 

Où :

  • ENVIRONMENT_NAME est le nom de l'environnement.
  • LOCATION est la région Compute Engine dans laquelle se trouve l'environnement.

Exemple :

gcloud beta composer environments list-upgrades test-environment \
    --location us-central1
┌─────────────────────────────────────────────────────────────────────────────┐
│                              AVAILABLE UPGRADES                             │
├──────────────────────────────┬──────────────────┬───────────────────────────┤
│        IMAGE VERSION         │ COMPOSER DEFAULT │ SUPPORTED PYTHON VERSIONS │
├──────────────────────────────┼──────────────────┼───────────────────────────┤
│ composer-1.4.0-airflow-1.9.0 │ True             │ 2,3                       │
└──────────────────────────────┴──────────────────┴───────────────────────────┘

API

Pour afficher les versions disponibles à l'aide de l'API REST Cloud Composer, effectuez une requête API imageVersions.list, et indiquez le projet et l'emplacement au format projects/{projectId}/locations/{locationId}.

Exemple :

GET https://composer.googleapis.com/v1/projects/test-project-id/locations/us-central1/imageVersions

{
  "imageVersions": [
    {
      "imageVersionId": "composer-1.4.2-airflow-1.10.0",
      "supportedPythonVersions": [
        "2",
        "3"
      ]
    },
    {
      "imageVersionId": "composer-1.4.2-airflow-1.9.0",
      "isDefault": true,
      "supportedPythonVersions": [
        "2",
        "3"
      ]
    }
  ]
} 

Mettre à niveau la version Cloud Composer

Pour mettre à niveau la version Cloud Composer que votre environnement exécute, procédez comme suit :

Console

  1. Ouvrez la page Environnements dans Google Cloud.

    Ouvrir la page Environnements

  2. Cliquez sur le Nom de l'environnement à modifier.

  3. Dans l'onglet Configuration de l'environnement, cliquez sur Mettre à niveau la version d'image.

  4. Cliquez sur le menu déroulant Version de l'image de Cloud Composer et sélectionnez une version.

  5. Cliquez sur Envoyer.

gcloud

gcloud beta composer environments update ENVIRONMENT_NAME \
    --location LOCATION --image-version VERSION

Où :

  • ENVIRONMENT_NAME est le nom de l'environnement.
  • LOCATION est la région Compute Engine dans laquelle se trouve l'environnement.
  • VERSION est la version de Cloud Composer et la version d'Airflow à utiliser pour votre environnement, au format composer-a.b.c-airflow-x.y.z ou composer-a.b.c-airflow-x.y. Si vous ne spécifiez pas le correctif Airflow, c'est la version de correctif la plus élevée disponible pour la version majeure et mineure donnée qui est utilisée.

Exemple :

gcloud beta composer environments update test-environment \
    --location us-central1 --image-version composer-latest-airflow-1.10.1 

API

Pour procéder à la mise à niveau à l'aide de l'API REST Cloud Composer, effectuez une requête API environments.patch. Indiquez la version au format composer-a.b.c-airflow-x.y.z.

Exemple :

PATCH https://composer.googleapis.com/v1beta1/projects/test-project/locations/us-central1/environments/test-environment?updateMask=config.software_config.image_version

Le corps de la requête inclut le paramètre imageVersion :

{
  "config": {
    "softwareConfig": {
      "imageVersion": "composer-1.6.0-airflow-1.10.1"
    }
  }
}

Mettre à niveau la version d'Airflow

Lorsque votre environnement utilise la dernière version de Cloud Composer, vous pouvez utiliser le SDK Cloud pour ne mettre à niveau que la version d'Airflow, par exemple pour passer de composer-1.6.1-airflow-1.9.0 à composer-1.6.1-airflow-1.10.0.

gcloud beta composer environments update ENVIRONMENT_NAME \
--location LOCATION --airflow-version VERSION

où :

  • ENVIRONMENT_NAME est le nom de l'environnement.
  • LOCATION est la région Compute Engine dans laquelle se trouve l'environnement.
  • VERSION est la version d'Airflow à utiliser pour votre environnement, au format x.y.z ou x.y.

Exemple :

gcloud beta composer environments update test-environment \
--location us-central1 --airflow-version=1.10.1