Memigrasikan lingkungan ke Airflow 2

Cloud Composer 1 | Cloud Composer 2

Halaman ini menjelaskan cara mentransfer DAG, data, dan konfigurasi dari lingkungan Airflow 1.10.* yang ada ke lingkungan dengan Airflow 2 dan versi Airflow yang lebih baru.

Panduan migrasi lainnya

Dari Kepada Metode Panduan
Cloud Composer 1, Airflow 2 Cloud Composer 2, Airflow 2 Berdampingan, menggunakan snapshot Panduan migrasi (snapshot)
Cloud Composer 1, Airflow 1 Cloud Composer 2, Airflow 2 Berdampingan, menggunakan snapshot Panduan migrasi (snapshot)
Cloud Composer 1, Airflow 2 Cloud Composer 2, Airflow 2 Transfer manual secara berdampingan Panduan migrasi manual
Cloud Composer 1, Airflow 1 Cloud Composer 2, Airflow 2 Transfer manual secara berdampingan Panduan migrasi manual
Aliran Udara 1 Aliran Udara 2 Transfer manual secara berdampingan Panduan ini (migrasi manual)

Upgrade berdampingan

Cloud Composer menyediakan skrip transfer database Cloud Composer untuk memigrasikan database metadata, DAG, data, dan plugin dari lingkungan Cloud Composer dengan Airflow 1.10.14 dan Airflow 1.10.15 ke lingkungan Cloud Composer yang ada dengan Airflow 2.0.1 dan versi Airflow yang lebih baru.

Ini adalah jalur alternatif ke jalur yang dijelaskan dalam panduan ini. Beberapa bagian dari panduan ini masih berlaku saat menggunakan skrip yang disediakan. Misalnya, sebaiknya Anda memeriksa kompatibilitas DAG dengan Airflow 2 sebelum memigrasikannya, atau memastikan bahwa proses DAG serentak tidak terjadi, dan tidak ada operasi DAG tambahan atau yang hilang.

Sebelum memulai

Sebelum Anda mulai menggunakan lingkungan Cloud Composer dengan Airflow 2, pertimbangkan perubahan yang dihadirkan Airflow 2 ke lingkungan Cloud Composer.

Penjadwal dengan ketersediaan tinggi (HA)

Anda dapat menggunakan lebih dari satu penjadwal Airflow di lingkungan Anda. Anda dapat menyetel jumlah penjadwal saat membuat lingkungan, atau dengan memperbarui lingkungan yang ada.

Eksekutor Celery+Kubernetes

Airflow 2 Celery+Kubernetes Executor tidak didukung dalam versi Cloud Composer saat ini.

Perubahan yang dapat menyebabkan gangguan

Airflow 2 memperkenalkan banyak perubahan besar yang beberapa di antaranya salah satunya:

Perbedaan antara lingkungan dengan Airflow 2 dan Airflow 1.10.*

Perbedaan utama antara lingkungan Cloud Composer dengan Airflow 1.10.* dan lingkungan dengan Airflow 2:

  • Lingkungan dengan Airflow 2 menggunakan Python 3.8. Ini adalah versi yang lebih baru daripada yang digunakan di lingkungan Airflow 1.10.*. Python 2, Python 3.6, Python 3.7 tidak didukung.
  • Airflow 2 menggunakan format CLI yang berbeda. Cloud Composer mendukung format baru di lingkungan dengan Airflow 2 melalui perintah gcloud composer environments run.
  • Paket PyPI bawaan berbeda di lingkungan Airflow 2. Untuk mengetahui daftar paket PyPI bawaan, lihat daftar versi Cloud Composer.
  • Serialisasi DAG selalu diaktifkan di Airflow 2. Akibatnya, pemuatan DAG asinkron tidak diperlukan lagi, dan tidak didukung di Airflow 2. Akibatnya, konfigurasi parameter [core]store_serialized_dags dan [core]store_dag_code tidak didukung untuk Airflow 2, dan upaya untuk menyetelnya akan dilaporkan sebagai error.
  • Plugin server web Airflow tidak didukung. Hal ini tidak memengaruhi penjadwal atau plugin pekerja, termasuk operator dan sensor Airflow.
  • Di lingkungan Airflow 2, peran pengguna Airflow default adalah Op. Untuk lingkungan dengan Airflow 1.10.*, peran defaultnya adalah Admin.

Langkah 1: Periksa kompatibilitas dengan Airflow 2

Untuk memeriksa potensi konflik dengan Airflow 2, gunakan skrip pemeriksaan upgrade yang disediakan oleh Airflow di lingkungan Airflow 1.10.* Anda yang sudah ada.

gcloud

  1. Jika lingkungan Anda menggunakan Airflow 1.10.14 dan versi yang lebih lama, upgrade lingkungan Anda ke versi Cloud Composer yang menggunakan Airflow 1.10.15 dan yang lebih baru. Cloud Composer mendukung perintah pemeriksaan upgrade mulai dari Airflow 1.10.15.

  2. Jalankan pemeriksaan upgrade melalui perintah gcloud composer environments run. Beberapa pemeriksaan upgrade yang relevan untuk Airflow 1.10.15 mandiri tidak relevan untuk Cloud Composer. Perintah berikut mengecualikan pemeriksaan ini.

    gcloud composer environments run \
        AIRFLOW_1_ENV  \
        --location=AIRFLOW_1_LOCATION \
        upgrade_check \
        -- --ignore VersionCheckRule --ignore LoggingConfigurationRule \
        --ignore PodTemplateFileRule --ignore SendGridEmailerMovedRule
    

    Ganti:

    • AIRFLOW_1_ENV dengan nama lingkungan Airflow 1.10.* Anda.
    • AIRFLOW_1_LOCATION dengan wilayah tempat lingkungan berada.
  3. Periksa output perintah. Update skrip pemeriksaan melaporkan potensi masalah kompatibilitas di lingkungan yang ada.

  4. Terapkan perubahan lain pada DAG, seperti yang dijelaskan dalam panduan Upgrade ke Airflow 2.0+, di bagian tentang mengupgrade DAG.

Langkah 2: Buat lingkungan Airflow 2, penggantian konfigurasi transfer, dan variabel lingkungan

Buat lingkungan Airflow 2 serta penggantian konfigurasi transfer dan variabel lingkungan:

  1. Ikuti langkah-langkah untuk membuat lingkungan. Sebelum membuat lingkungan, tentukan juga penggantian konfigurasi dan variabel lingkungan, seperti yang dijelaskan lebih lanjut.

  2. Saat memilih gambar, pilih gambar dengan Airflow 2.

  3. Transfer parameter konfigurasi secara manual dari lingkungan Airflow 1.10.* ke lingkungan Airflow 2 yang baru.

    Konsol

    1. Saat Anda membuat lingkungan, luaskan bagian Networking, penggantian konfigurasi Airflow, dan fitur tambahan.

    2. Di bagian Airflow configuration override, klik Add Airflow configuration override.

    3. Salin semua penggantian konfigurasi dari lingkungan Airflow 1.10.* Anda.

      Beberapa opsi konfigurasi menggunakan nama dan bagian yang berbeda di Airflow 2. Untuk mengetahui informasi selengkapnya, lihat Perubahan konfigurasi.

    4. Di bagian Variabel lingkungan, klik Tambahkan variabel lingkungan

    5. Salin semua variabel lingkungan dari lingkungan Airflow 1.10.* Anda.

    6. Klik Create untuk membuat lingkungan.

Langkah 3: Instal paket PyPI ke lingkungan Airflow 2

Setelah lingkungan Airflow 2 Anda dibuat, instal paket PyPI ke lingkungan tersebut:

Konsol

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Pilih lingkungan Airflow 2 Anda.

  3. Buka tab Paket PyPI dan klik Edit.

  4. Salin persyaratan paket PyPI dari lingkungan Airflow 1.10.* Anda. Klik Save dan tunggu hingga lingkungan diperbarui.

    Karena lingkungan Airflow 2 menggunakan kumpulan paket bawaan yang berbeda dan versi Python yang berbeda, Anda mungkin mengalami konflik paket PyPI yang sulit diselesaikan. Salah satu cara untuk mendiagnosis masalah dependensi paket adalah dengan memeriksa error paket PyPI dengan menginstal paket di pod pekerja Airflow.

Langkah 4: Transfer variabel dan kumpulan ke Airflow 2

Airflow 1.10.* mendukung ekspor variabel dan kumpulan ke file JSON. Kemudian, Anda dapat mengimpor file ini ke lingkungan Airflow 2.

Anda hanya perlu mentransfer kumpulan jika memiliki kumpulan kustom selain default_pool. Jika tidak, lewati perintah yang mengekspor dan mengimpor kumpulan.

gcloud

  1. Ekspor variabel dari lingkungan Airflow 1.10.* Anda:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         variables -- -e /home/airflow/gcs/data/variables.json
    

    Ganti:

    • AIRFLOW_1_ENV dengan nama lingkungan Airflow 1.10.* Anda.
    • AIRFLOW_1_LOCATION dengan wilayah tempat lingkungan berada.
  2. Ekspor kumpulan dari lingkungan Airflow 1.10.* Anda:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         pool -- -e /home/airflow/gcs/data/pools.json
    
  3. Dapatkan URI bucket lingkungan Airflow 2.

    1. Jalankan perintah berikut:

      gcloud composer environments describe AIRFLOW_2_ENV \
          --location AIRFLOW_2_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      Ganti:

      • AIRFLOW_2_ENV dengan nama lingkungan Airflow 2 Anda.
      • AIRFLOW_2_LOCATION dengan wilayah tempat lingkungan berada.
    2. Di output, hapus folder /dags. Hasilnya adalah URI bucket lingkungan Airflow 2 Anda.

      Misalnya, ubah gs://us-central1-example-916807e1-bucket/dags menjadi gs://us-central1-example-916807e1-bucket.

  4. Transfer file JSON dengan variabel dan kumpulan ke lingkungan Airflow 2 Anda:

    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=variables.json
    
    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=pools.json
    

    Ganti AIRFLOW_2_BUCKET dengan URI bucket lingkungan Airflow 2 Anda, yang diperoleh di langkah sebelumnya.

  5. Impor variabel dan kumpulan ke Airflow 2:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables import \
        -- /home/airflow/gcs/data/variables.json
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools import \
        -- /home/airflow/gcs/data/pools.json
    
  6. Pastikan variabel dan kumpulan diimpor:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables list
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools list
    
  7. Hapus file JSON dari bucket:

    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    

Langkah 5: Transfer data lainnya dari bucket lingkungan Airflow 1.10.* Anda

gcloud

  1. Transfer plugin ke lingkungan Airflow 2 Anda. Untuk melakukannya, ekspor plugin dari bucket lingkungan Airflow 1.10.* ke folder /plugins di bucket lingkungan Airflow 2 Anda:

    gcloud composer environments storage plugins export \
        --destination=AIRFLOW_2_BUCKET/plugins \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
  2. Pastikan folder /plugins berhasil diimpor:

    gcloud composer environments storage plugins list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
  3. Ekspor folder /data dari lingkungan Airflow 1.10.* Anda ke lingkungan Airflow 2:

        gcloud composer environments storage data export \
            --destination=AIRFLOW_2_BUCKET/data \
            --environment=AIRFLOW_1_ENV \
            --location=AIRFLOW_1_LOCATION
    
  4. Pastikan folder /data berhasil diimpor:

    gcloud composer environments storage data list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    

Langkah 6: Transfer koneksi dan pengguna

Airflow 1.10.* tidak mendukung ekspor pengguna dan koneksi. Untuk mentransfer pengguna dan koneksi, buat akun pengguna dan koneksi baru secara manual di lingkungan Airflow 2 Anda.

gcloud

  1. Untuk mendapatkan daftar koneksi di lingkungan Airflow 1.10.* Anda, jalankan:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         connections -- --list
    
  2. Untuk membuat koneksi baru di lingkungan Airflow 2 Anda, jalankan perintah CLI Airflow connections melalui gcloud. Contoh:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        connections add \
        -- --conn-host postgres.example.com \
        --conn-port 5432 \
        --conn-type postgres \
        --conn-login example_user \
        --conn-password example_password \
        --conn-description "Example connection" \
        example_connection
    
  3. Untuk melihat daftar pengguna di lingkungan Airflow 1.10.* Anda:

    1. Buka antarmuka web Airflow untuk lingkungan Airflow 1.10.* Anda.

    2. Buka Admin > Pengguna.

  4. Untuk membuat akun pengguna baru di lingkungan Airflow 2 Anda, jalankan perintah CLI Airflow users create melalui gcloud. Contoh:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        users create \
        -- --username example_username \
        --firstname Example-Name \
        --lastname Example-Surname \
        --email example-user@example.com \
        --use-random-password \
        --role Admin
    

Langkah 7: Pastikan DAG Anda siap untuk Airflow 2

Sebelum mentransfer DAG ke lingkungan Airflow 2 Anda, pastikan:

  1. Skrip pemeriksaan upgrade agar DAG berhasil dijalankan dan tidak ada masalah kompatibilitas yang tersisa.

  2. DAG Anda menggunakan pernyataan impor yang benar.

    Misalnya, pernyataan impor baru untuk BigQueryCreateDataTransferOperator dapat terlihat seperti ini:

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. DAG Anda diupgrade untuk Airflow 2. Perubahan ini kompatibel dengan Airflow 1.10.14 dan versi yang lebih baru.

Langkah 8: Transfer DAG ke lingkungan Airflow 2

Potensi masalah berikut mungkin terjadi saat Anda mentransfer DAG antarlingkungan:

  • Jika DAG diaktifkan (tidak dijeda) di kedua lingkungan, setiap lingkungan akan menjalankan salinan DAG-nya sendiri, sesuai jadwal. Hal ini dapat menyebabkan DAG dijalankan secara serentak untuk data dan waktu eksekusi yang sama.

  • Karena pengejaran DAG, Airflow menjadwalkan operasi DAG tambahan, dimulai dari tanggal mulai yang ditentukan dalam DAG Anda. Hal ini terjadi karena instance Airflow baru tidak mempertimbangkan histori DAG yang berjalan dari lingkungan 1.10.*. Hal ini dapat menyebabkan sejumlah besar eksekusi DAG dijadwalkan mulai dari tanggal mulai yang ditentukan.

Mencegah operasi DAG serentak

Di lingkungan Airflow 2 Anda, ganti opsi konfigurasi Airflow dags_are_paused_at_creation. Setelah Anda melakukan perubahan ini, semua DAG baru akan dijeda secara default.

Bagian Kunci Nilai
core dags_are_paused_at_creation True

Mencegah operasi DAG tambahan atau hilang

Tentukan tanggal mulai statis baru di DAG yang Anda transfer ke lingkungan Airflow 2 Anda.

Untuk menghindari kesenjangan dan tumpang-tindih dalam tanggal eksekusi, operasi DAG pertama harus dilakukan di lingkungan Airflow 2 pada kemunculan interval jadwal berikutnya. Untuk melakukannya, tetapkan tanggal mulai baru di DAG Anda menjadi sebelum tanggal operasi terakhir di lingkungan Airflow 1.10.*.

Misalnya, jika DAG berjalan pada pukul 15.00, 17.00, dan 21.00 setiap hari di lingkungan Airflow 1.10.*, proses DAG terakhir terjadi pada pukul 15.00, dan Anda berencana untuk mentransfer DAG pada pukul 15.15, maka tanggal mulai untuk lingkungan Airflow 2 adalah hari ini pada pukul 1.10.4 Setelah Anda mengaktifkan DAG di lingkungan Airflow 2, Airflow akan menjadwalkan proses DAG selama pukul 17.00.

Sebagai contoh lain, jika DAG berjalan pada pukul 00.00 setiap hari di lingkungan Airflow 1.10.*, operasi DAG terakhir terjadi pukul 00.00 pada 26 April 2021, dan Anda berencana mentransfer DAG pukul 13.00 pada 26 April 2021, maka tanggal mulai 20. Setelah Anda mengaktifkan DAG di lingkungan Airflow 2, Airflow akan menjadwalkan operasi DAG untuk pukul 00.00 pada 27 April 2021.

Transfer DAG Anda satu per satu ke lingkungan Airflow 2

Untuk setiap DAG, ikuti prosedur transfer berikut:

  1. Pastikan tanggal mulai yang baru di DAG ditetapkan seperti yang dijelaskan di bagian sebelumnya.

  2. Upload DAG yang telah diupdate ke lingkungan Airflow 2. DAG ini dijeda di lingkungan Airflow 2 karena penggantian konfigurasi, sehingga belum ada operasi DAG yang dijadwalkan.

  3. Di antarmuka web Airflow, buka DAG dan periksa error sintaksis DAG yang dilaporkan.

  4. Jika Anda berencana mentransfer DAG:

    1. Jeda DAG di lingkungan Airflow 1.10.* Anda.

    2. Lanjutkan DAG di lingkungan Airflow 2 Anda.

    3. Periksa apakah proses DAG baru dijadwalkan pada waktu yang tepat.

    4. Tunggu hingga proses DAG terjadi di lingkungan Airflow 2 dan periksa apakah operasi tersebut berhasil.

  5. Bergantung pada apakah operasi DAG berhasil:

    • Jika operasi DAG berhasil, Anda dapat melanjutkan dan menggunakan DAG dari lingkungan Airflow 2 Anda. Akhirnya, pertimbangkan untuk menghapus DAG versi 1.10.* Airflow.

    • Jika DAG gagal dijalankan, coba pecahkan masalah DAG hingga berhasil dijalankan di Airflow 2.

      Jika diperlukan, Anda selalu dapat kembali ke DAG versi Airflow 1.10.*:

      1. Jeda DAG di lingkungan Airflow 2 Anda.

      2. Lanjutkan DAG di lingkungan Airflow 1.10.* Anda. Tindakan ini akan menjadwalkan proses DAG baru untuk tanggal dan waktu yang sama dengan operasi DAG yang gagal.

      3. Saat Anda siap untuk melanjutkan dengan versi DAG Airflow 2, sesuaikan tanggal mulai, upload DAG versi baru ke lingkungan Airflow 2 Anda, dan ulangi prosedurnya.

Langkah 9: Pantau lingkungan Airflow 2 Anda

Setelah mentransfer semua DAG dan konfigurasi ke lingkungan Airflow 2, pantau potensi masalah, kegagalan pengoperasian DAG, dan kesehatan lingkungan secara keseluruhan. Jika lingkungan Airflow 2 berjalan tanpa masalah selama jangka waktu yang cukup, Anda dapat menghapus lingkungan Airflow 1.10.*.

Langkah selanjutnya