Memecahkan masalah terkait update dan upgrade lingkungan

Cloud Composer 1 | Cloud Composer 2

Halaman ini memberikan informasi pemecahan masalah untuk masalah yang mungkin Anda alami saat memperbarui atau mengupgrade lingkungan Cloud Composer.

Untuk mengetahui informasi pemecahan masalah terkait pembuatan lingkungan, lihat Memecahkan masalah pembuatan lingkungan.

Saat lingkungan Cloud Composer diperbarui, sebagian besar masalah terjadi karena alasan berikut:

  • Masalah izin akun layanan
  • Masalah dependensi PyPI
  • Ukuran database Airflow

Izin tidak memadai untuk memperbarui atau mengupgrade lingkungan

Jika Cloud Composer tidak dapat memperbarui atau mengupgrade lingkungan karena izin yang tidak memadai, pesan error berikut akan ditampilkan:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Solusi: Tetapkan peran ke akun Anda dan akun layanan lingkungan Anda seperti yang dijelaskan di Kontrol akses.

Akun layanan lingkungan tidak memiliki izin yang memadai

Saat membuat lingkungan Cloud Composer, Anda menentukan akun layanan yang menjalankan node cluster GKE lingkungan tersebut. Jika akun layanan ini tidak memiliki izin yang memadai untuk operasi yang diminta, Cloud Composer akan menampilkan error:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Solusi: Tetapkan peran ke akun Anda dan akun layanan lingkungan Anda seperti yang dijelaskan di Kontrol akses.

Ukuran database Airflow terlalu besar untuk menjalankan operasi

Operasi upgrade Cloud Composer mungkin tidak berhasil karena ukuran database Airflow terlalu besar untuk keberhasilan operasi upgrade.

Jika ukuran database Airflow lebih dari 16 GB, Cloud Composer akan menampilkan error berikut:

Airflow database uses more than 16 GB. Please clean the database before upgrading.

Solusi: Lakukan pembersihan database Airflow, seperti yang dijelaskan dalam pemeliharaan database Airflow.

Upgrade ke versi Cloud Composer baru gagal karena konflik paket PyPI

Saat mengupgrade lingkungan dengan paket PyPI kustom yang terinstal, Anda mungkin mengalami error yang terkait dengan konflik paket PyPI. Hal ini mungkin terjadi karena image Cloud Composer yang baru berisi versi paket bawaan yang lebih baru, yang menyebabkan konflik dependensi dengan paket PyPI yang diinstal di lingkungan Anda.

Solusi:

  • Untuk mendapatkan informasi mendetail tentang konflik paket, jalankan pemeriksaan upgrade.
  • Melonggarkan batasan versi untuk paket PyPI kustom yang diinstal. Misalnya, bukan menetapkan versi sebagai ==1.0.1, tetapkan versi sebagai >=1.0.1.
  • Untuk mengetahui informasi selengkapnya tentang mengubah persyaratan versi guna menyelesaikan dependensi yang berkonflik, lihat dokumentasi pip.

Kurangnya konektivitas ke DNS dapat menyebabkan masalah saat melakukan peningkatan atau pembaruan

Masalah konektivitas tersebut dapat mengakibatkan entri log seperti ini:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

Hal ini biasanya berarti tidak ada rute ke DNS. Jadi, pastikan nama DNS metadata.google.internal dapat di-resolve ke alamat IP dari dalam jaringan Cluster, Pod, dan Layanan. Periksa apakah Anda telah mengaktifkan Akses Google Pribadi dalam VPC (di project host atau layanan) tempat lingkungan Anda dibuat.

Informasi selengkapnya:

CPU pemicu melebihi batas 1 vCPU

Cloud Composer 2 di versi 2.4.4 dan yang lebih baru memperkenalkan strategi alokasi resource pemicu yang berbeda untuk meningkatkan penskalaan performa. Jika Anda mengalami error terkait CPU pemicu saat melakukan update lingkungan, artinya pemicu saat ini dikonfigurasi untuk menggunakan lebih dari 1 vCPU per pemicu.

Solusi:

Memeriksa peringatan migrasi yang gagal

Saat mengupgrade Airflow ke versi yang lebih baru, terkadang batasan baru diterapkan ke database Airflow. Jika batasan ini tidak dapat diterapkan, Airflow akan membuat tabel baru untuk menyimpan baris dengan batasan yang tidak dapat diterapkan. UI Airflow menampilkan pesan peringatan hingga tabel data yang dipindahkan diganti namanya atau dihapus.

Solusi:

Anda dapat menggunakan dua DAG berikut untuk memeriksa data yang dipindahkan dan mengganti nama tabel.

DAG list_moved_tables_after_upgrade_dag mencantumkan baris yang dipindahkan dari setiap tabel tempat batasan tidak dapat diterapkan. Periksa data dan putuskan apakah Anda ingin menyimpannya. Untuk menyimpannya, Anda harus memperbaiki data di database Airflow secara manual. Misalnya, dengan menambahkan baris kembali dengan data yang benar.

Jika tidak memerlukan data atau telah memperbaikinya, Anda dapat menjalankan DAG rename_moved_tables_after_upgrade_dag. DAG ini mengganti nama tabel yang dipindahkan. Tabel dan datanya tidak dihapus, jadi Anda dapat meninjau data tersebut di lain waktu.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX

def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )

def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())

def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")

with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

Langkah selanjutnya