Mengelola koneksi Airflow

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Halaman ini menjelaskan cara mengelola koneksi Airflow di lingkungan Anda dan mengaksesnya dari DAG.

Tentang koneksi Airflow

Koneksi Aiflow menyimpan kredensial dan informasi koneksi lainnya, seperti nama pengguna, string koneksi, dan sandi. DAG Anda menggunakan koneksi untuk berkomunikasi dan mengakses resource di Google Cloud dan layanan lainnya dari DAG Anda.

Operator Airflow di DAG Anda menggunakan koneksi default untuk operator, atau Anda menentukan nama koneksi kustom.

Tentang keamanan koneksi

Sebagian besar operator Airflow tidak menerima kredensial secara langsung. Sebagai gantinya, mereka menggunakan koneksi Airflow.

Saat Anda membuat lingkungan baru, Cloud Composer akan menghasilkan kunci fernet permanen yang unik untuk lingkungan tersebut dan mengamankan tambahan koneksi secara default. Anda dapat melihat fernet_key di halaman Konfigurasi di UI Airflow.

Untuk informasi selengkapnya tentang cara mengamankan koneksi dan sandi di Airflow, lihat Mengamankan Koneksi dan Menyamarkan data sensitif.

Tentang jenis koneksi

Airflow menggunakan koneksi dari berbagai jenis untuk terhubung ke layanan tertentu. Misalnya, jenis koneksi Google Cloud terhubung ke layanan lain di Google Cloud. Sebagai contoh lain, jenis koneksi S3 terhubung ke bucket Amazon S3.

Untuk menambahkan jenis koneksi ke Airflow, instal paket PyPI dengan jenis koneksi tersebut. Beberapa paket sudah diinstal sebelumnya di lingkungan Anda. Misalnya, Anda dapat menggunakan koneksi dari paket apache-airflow-providers-google tanpa menginstal paket PyPI kustom.

Koneksi yang telah dikonfigurasi sebelumnya

Cloud Composer mengonfigurasi koneksi default berikut di lingkungan Anda. Anda dapat menggunakan koneksi ini untuk mengakses resource dalam project tanpa mengonfigurasinya.

  • google_cloud_default
  • bigquery_default
  • google_cloud_datastore_default
  • google_cloud_storage_default

Menambahkan koneksi di Secret Manager

Anda dapat menyimpan koneksi di Secret Manager, tanpa menambahkannya ke Airflow. Sebaiknya gunakan pendekatan ini saat menyimpan kredensial dan informasi sensitif lainnya.

Untuk menambahkan koneksi di Secret Manager:

  1. Mengonfigurasi Secret Manager untuk lingkungan Anda.

  2. Tambahkan secret dengan nama yang cocok dengan pola untuk koneksi.

    Misalnya: airflow-connections-example_connection. Di DAG, gunakan nama koneksi tanpa awalan: example_connection.

  3. Tambahkan parameter untuk koneksi:

    Format JSON

    Tambahkan representasi JSON koneksi Anda sebagai nilai secret. Contoh:

    {
      "conn_type": "mysql",
      "host": "example.com",
      "login": "login",
      "password": "password",
      "port": "9000"
    }
    

    Untuk informasi selengkapnya tentang format koneksi JSON, lihat dokumentasi Airflow.

    Format URI

    Tambahkan representasi URI koneksi Anda sebagai nilai secret:

    • Secret harus menyimpan representasi URI koneksi. Contoh, mysql://login:password@example.com:9000.

    • URI harus berenkode URL. Misalnya, sandi yang memiliki simbol spasi di dalamnya harus dienkode URL sebagai berikut: mysql://login:secret%20password@example.com:9000.

    Airflow memiliki metode praktis untuk membuat URI koneksi. Contoh cara mengenkode URL kompleks dengan tambahan JSON tersedia di dokumentasi Airflow.

  4. Pastikan semua parameter koneksi dibaca dengan benar dari Secret Manager.

Menambahkan koneksi di Airflow

Sebagai alternatif untuk menyimpan koneksi di Secret Manager, Anda dapat menyimpannya di Airflow.

Untuk menambahkan koneksi di Airflow:

Airflow CLI

Jalankan perintah Airflow CLI connections add melalui Google Cloud CLI. Contoh:

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  connections add -- \
    --conn-type "mysql" \
    --conn-host "example.com" \
    --conn-port "9000" \
    --conn-login "login" \
    --conn-password "password" \
    example_connection

Anda juga dapat menggunakan argumen --conn-uri:

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  connections add -- \
    --conn-uri "mysql://login:password@example.com:9000" \
    example_connection

Ganti kode berikut:

  • ENVIRONMENT_NAME: nama lingkungan Anda.
  • LOCATION: region tempat lingkungan berada.

UI Airflow

Ikuti dokumentasi Airflow tentang cara membuat koneksi.

Memastikan Airflow membaca koneksi dengan benar

Anda dapat menjalankan perintah Airflow CLI connections get melalui Google Cloud CLI untuk memeriksa apakah koneksi dibaca dengan benar. Misalnya, jika Anda menyimpan koneksi di Secret Manager, tindakan ini akan memberikan cara untuk memeriksa apakah semua parameter koneksi dibaca oleh Airflow dari secret.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    connections get \
    -- CONNECTION_NAME

Ganti:

  • ENVIRONMENT_NAME dengan nama lingkungan.
  • LOCATION dengan region tempat lingkungan tersebut berada.
  • CONNECTION_NAME dengan nama koneksi. Jika koneksi Anda disimpan di Secret Manager, gunakan nama koneksi tanpa awalan koneksi. Misalnya, tentukan example_connection, bukan airflow-connections-example_connection_json.

Contoh:

gcloud composer environments run example-environment \
    --location us-central1 \
    connections get \
    -- example_connection -o json

Menggunakan koneksi Airflow di DAG

Bagian ini menunjukkan cara mengakses koneksi Anda dari DAG.

Menggunakan koneksi Secret Manager

Gunakan nama koneksi tanpa awalan. Misalnya, jika secret Anda bernama airflow-connections-aws_s3, tentukan aws_s3.

transfer_dir_from_s3 = S3ToGCSOperator(
    task_id='transfer_dir_from_s3',
    aws_conn_id='aws_s3',
    prefix='data-for-gcs',
    bucket='example-s3-bucket-transfer-operators',
    dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')

Jika menyimpan koneksi default di Secret Manager, Anda dapat menghilangkan nama koneksi. Lihat dokumentasi Airflow untuk operator tertentu guna mendapatkan nama koneksi default yang digunakan oleh operator. Misalnya, operator Airflow S3ToGCSOperator menggunakan koneksi aws_default secara default. Anda dapat menyimpan koneksi default ini dalam secret bernama airflow-connections-aws_default.

Menggunakan koneksi yang disimpan di Airflow

Gunakan nama koneksi, seperti yang ditentukan di Airflow:

transfer_dir_from_s3 = S3ToGCSOperator(
    task_id='transfer_dir_from_s3',
    aws_conn_id='aws_s3',
    prefix='data-for-gcs',
    bucket='example-s3-bucket-transfer-operators',
    dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')

Untuk menggunakan koneksi default bagi operator, hapus nama koneksi. Lihat dokumentasi Airflow untuk operator tertentu guna mendapatkan nama koneksi default yang digunakan oleh operator. Misalnya, operator Airflow S3ToGCSOperator menggunakan koneksi aws_default secara default.

Pemecahan masalah

Jika lingkungan Anda tidak dapat mengakses secret yang disimpan di Secret Manager:

  1. Pastikan Secret Manager dikonfigurasi di lingkungan Anda.

  2. Pastikan nama koneksi di Secret Manager sesuai dengan koneksi yang digunakan oleh Airflow. Misalnya, untuk koneksi bernama example_connection, nama secret-nya adalah airflow-connections-example_connection.

  3. Pastikan Airflow membaca koneksi dengan benar.

Langkah selanjutnya