Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Auf dieser Seite wird gezeigt, wie Sie Daten aus anderen Diensten mit Google Transfer Operators in Ihren DAGs übertragen.
Google-Transferbetreiber
Google Transfer Operators sind eine Reihe von Airflow-Operatoren, mit denen Sie Daten aus anderen Diensten inGoogle Cloudabrufen können.
In dieser Anleitung werden Operatoren für Azure FileShare Storage und Amazon S3 beschrieben, die mit Cloud Storage funktionieren. Es gibt viele weitere Übertragungsoperatoren, die mit Diensten innerhalb von Google Cloud und mit anderen Diensten alsGoogle Cloudfunktionieren.
Amazon S3 für Cloud Storage
In diesem Abschnitt wird gezeigt, wie Daten von Amazon S3 in einen Cloud Storage-Bucket synchronisiert werden.
Amazon-Anbieterpaket installieren
Das apache-airflow-providers-amazon
-Paket enthält die Verbindungstypen und Funktionen für die Interaktion mit Amazon S3.
Installieren Sie dieses PyPI-Paket in Ihrer Umgebung.
Verbindung zu Amazon S3 konfigurieren
Das Amazon-Anbieterpaket enthält einen Verbindungstyp für Amazon S3. Sie erstellen eine Verbindung dieses Typs. Die Verbindung für Cloud Storage mit dem Namen google_cloud_default
ist bereits in Ihrer Umgebung eingerichtet.
So richten Sie eine Verbindung zu Amazon S3 ein:
- Wechseln Sie in der Airflow-UI zu Admin > Verbindungen.
- Neue Verbindung erstellen
- Wählen Sie
Amazon S3
als Verbindungstyp aus. - Im folgenden Beispiel wird eine Verbindung mit dem Namen
aws_s3
verwendet. Sie können diesen oder einen beliebigen anderen Namen für die Verbindung verwenden. - Geben Sie die Verbindungsparameter an, wie in der Airflow-Dokumentation für Amazon Web Services Connection beschrieben. Wenn Sie beispielsweise eine Verbindung mit AWS-Zugriffsschlüsseln einrichten möchten, generieren Sie einen Zugriffsschlüssel für Ihr Konto in AWS und geben dann die AWS-Zugriffsschlüssel-ID als Anmeldenamen und den geheimen AWS-Zugriffsschlüssel als Passwort für die Verbindung an.
Daten von Amazon S3 übertragen
Wenn Sie die synchronisierten Daten später in einem anderen DAG oder Task verwenden möchten, übertragen Sie sie in den /data
-Ordner des Buckets Ihrer Umgebung. Dieser Ordner wird mit anderen Airflow-Workern synchronisiert, sodass Aufgaben in Ihrem DAG darauf zugreifen können.
Der folgende Beispiel-DAG führt Folgendes aus:
- Synchronisiert den Inhalt des Verzeichnisses
/data-for-gcs
aus einem S3-Bucket mit dem Ordner/data/from-s3/data-for-gcs/
im Bucket Ihrer Umgebung. - Wartet zwei Minuten, bis die Daten mit allen Airflow-Workern in Ihrer Umgebung synchronisiert sind.
- Gibt die Liste der Dateien in diesem Verzeichnis mit dem Befehl
ls
aus. Ersetzen Sie diese Aufgabe durch andere Airflow-Operatoren, die mit Ihren Daten funktionieren.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_aws_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')
transfer_dir_from_s3 >> sleep_2min >> print_dir_files
Azure FileShare zu Cloud Storage
In diesem Abschnitt wird gezeigt, wie Sie Daten aus Azure FileShare mit einem Cloud Storage-Bucket synchronisieren.
Microsoft Azure-Anbieterpaket installieren
Das apache-airflow-providers-microsoft-azure
-Paket enthält die Verbindungstypen und Funktionen, die mit Microsoft Azure interagieren.
Installieren Sie dieses PyPI-Paket in Ihrer Umgebung.
Verbindung zu Azure FileShare konfigurieren
Das Microsoft Azure-Anbieterpaket bietet einen Verbindungstyp für Azure File Share. Sie erstellen eine Verbindung dieses Typs. Die Verbindung für Cloud Storage mit dem Namen google_cloud_default
ist in Ihrer Umgebung bereits eingerichtet.
So richten Sie eine Verbindung zu Azure FileShare ein:
- Wechseln Sie in der Airflow-UI zu Admin > Verbindungen.
- Neue Verbindung erstellen
- Wählen Sie
Azure FileShare
als Verbindungstyp aus. - Im folgenden Beispiel wird eine Verbindung mit dem Namen
azure_fileshare
verwendet. Sie können diesen oder einen beliebigen anderen Namen für die Verbindung verwenden. - Geben Sie die Verbindungsparameter wie in der Airflow-Dokumentation für Microsoft Azure File Share Connection beschrieben an. Sie können beispielsweise eine Verbindungszeichenfolge für den Zugriffsschlüssel Ihres Speicherkontos angeben.
Daten aus Azure FileShare übertragen
Wenn Sie die synchronisierten Daten später in einem anderen DAG oder Task verwenden möchten, übertragen Sie sie in den /data
-Ordner des Buckets Ihrer Umgebung. Dieser Ordner wird mit anderen Airflow-Workern synchronisiert, sodass Aufgaben in Ihrem DAG darauf zugreifen können.
Der folgende DAG führt Folgendes aus:
Der folgende Beispiel-DAG führt Folgendes aus:
- Synchronisiert den Inhalt des Verzeichnisses
/data-for-gcs
aus der Azure-Dateifreigabe mit dem Ordner/data/from-azure
im Bucket Ihrer Umgebung. - Wartet zwei Minuten, bis die Daten mit allen Airflow-Workern in Ihrer Umgebung synchronisiert sind.
- Gibt die Liste der Dateien in diesem Verzeichnis mit dem Befehl
ls
aus. Ersetzen Sie diese Aufgabe durch andere Airflow-Operatoren, die mit Ihren Daten funktionieren.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_azure_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_azure = AzureFileShareToGCSOperator(
task_id='transfer_dir_from_azure',
azure_fileshare_conn_id='azure_fileshare',
share_name='example-file-share',
directory_name='data-for-gcs',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-azure/')
transfer_dir_from_azure >> sleep_2min >> print_dir_files