Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite wird beschrieben, wie Daten aus anderen Diensten mit Google übertragen werden. Übertragungsoperatoren in Ihren DAGs.
Google-Übertragungsoperatoren
Google-Übertragungsoperatoren sind ein Airflow-Operatoren, mit denen Sie Daten aus anderen Diensten abrufen können, Google Cloud
In diesem Leitfaden werden Operatoren für Azure FileShare Storage und Amazon S3 gezeigt, die mit Cloud Storage funktionieren. Es gibt viele weitere Übertragungsoperatoren, die mit Diensten innerhalb von Google Cloud und mit anderen Diensten als Google Cloud arbeiten.
Amazon S3 für Cloud Storage
In diesem Abschnitt wird gezeigt, wie Daten von Amazon S3 mit einem Cloud Storage-Bucket.
Amazon-Anbieterpaket installieren
Das apache-airflow-providers-amazon
-Paket enthält die Verbindungstypen und Funktionen, die mit Amazon S3 interagieren.
Installieren Sie dieses PyPI-Paket in Ihrem
zu verbessern.
Verbindung zu Amazon S3 konfigurieren
Das Amazon-Anbieterpaket bietet einen Verbindungstyp für Amazon S3. Ich
eine Verbindung dieses Typs erstellen. Die Verbindung für Cloud Storage mit dem Namen google_cloud_default
ist bereits in Ihrer Umgebung eingerichtet.
So richten Sie eine Verbindung zu Amazon S3 ein:
- Klicken Sie in der Airflow-Benutzeroberfläche auf Verwaltung > Verbindungen.
- Erstellen Sie eine neue Verbindung.
- Wählen Sie als Verbindungstyp
Amazon S3
aus. - Im folgenden Beispiel wird eine Verbindung mit dem Namen
aws_s3
verwendet. Sie können diese Name oder einen anderen Namen für die Verbindung. - Geben Sie die Verbindungsparameter wie in der Airflow-Dokumentation für die Amazon Web Services-Verbindung beschrieben an. Wenn Sie beispielsweise eine Verbindung mit AWS-Zugriffsschlüsseln einrichten möchten, generieren Sie einen Zugriffsschlüssel für Ihr Konto bei AWS und geben dann die AWS-Zugriffsschlüssel-ID als Anmeldedaten und den geheimen AWS-Zugriffsschlüssel als Passwort für die Verbindung an.
Daten von Amazon S3 übertragen
Wenn Sie die synchronisierten Daten später in einem anderen DAG oder einer anderen Aufgabe bearbeiten möchten,
ziehen Sie sie in den Ordner /data
Ihres Umgebungs-Buckets. Dieser Ordner wird mit anderen Airflow-Workern synchronisiert, damit Aufgaben in Ihrem DAG darauf ausgeführt werden können.
Im folgenden Beispiel-DAG wird Folgendes ausgeführt:
- Synchronisiert den Inhalt des Verzeichnisses
/data-for-gcs
aus einem S3-Bucket in den Ordner/data/from-s3/data-for-gcs/
im Bucket Ihrer Umgebung. - Es wird zwei Minuten gewartet, bis die Daten mit allen Airflow-Workern in für Ihre Umgebung.
- Mit dem Befehl
ls
wird die Liste der Dateien in diesem Verzeichnis ausgegeben. Ersetzen Sie diese Aufgabe durch andere Airflow-Operatoren, die mit Ihren Daten funktionieren.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_aws_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')
transfer_dir_from_s3 >> sleep_2min >> print_dir_files
Azure FileShare zu Cloud Storage
In diesem Abschnitt wird gezeigt, wie Sie Daten aus Azure FileShare mit einem Cloud Storage-Bucket synchronisieren.
Microsoft Azure-Anbieterpaket installieren
Das Paket apache-airflow-providers-microsoft-azure
enthält die Verbindung
die mit Microsoft Azure interagieren.
Installieren Sie dieses PyPI-Paket in Ihrer Umgebung.
Verbindung zu Azure FileShare konfigurieren
Das Microsoft Azure-Anbieterpaket bietet einen Verbindungstyp für die Azure-Datei
Teilen. Sie erstellen eine Verbindung dieses Typs. Die Verbindung für
Cloud Storage mit dem Namen google_cloud_default
ist bereits eingerichtet in
für Ihre Umgebung.
Richten Sie auf folgende Weise eine Verbindung zu Azure FileShare ein:
- Klicken Sie in der Airflow-Benutzeroberfläche auf Verwaltung > Verbindungen.
- Erstellen Sie eine neue Verbindung.
- Wählen Sie
Azure FileShare
als Verbindungstyp aus. - Im folgenden Beispiel wird eine Verbindung mit dem Namen
azure_fileshare
verwendet. Sie können diesen oder einen anderen Namen für die Verbindung verwenden. - Geben Sie Verbindungsparameter wie in der Airflow-Dokumentation für Microsoft Azure-Verbindung zur Dateifreigabe Sie können beispielsweise einen Verbindungsstring für den Zugriffsschlüssel Ihres Speicherkontos angeben.
Daten aus Azure FileShare übertragen
Wenn Sie die synchronisierten Daten später in einem anderen DAG oder einer anderen Aufgabe verarbeiten möchten, ziehen Sie sie in den Ordner /data
des Buckets Ihrer Umgebung. Dieser Ordner wird mit anderen Airflow-Workern synchronisiert, damit Aufgaben in Ihrem DAG darauf ausgeführt werden können.
Der folgende DAG führt Folgendes aus:
Der folgende Beispiel-DAG führt Folgendes aus:
- Synchronisiert den Inhalt des Verzeichnisses
/data-for-gcs
aus Azure File Share in den Ordner/data/from-azure
im Bucket Ihrer Umgebung. - Es wird zwei Minuten gewartet, bis die Daten mit allen Airflow-Workern in für Ihre Umgebung.
- Mit dem Befehl
ls
wird die Liste der Dateien in diesem Verzeichnis ausgegeben. Ersetzen mit anderen Airflow-Operatoren, die mit Ihren Daten arbeiten.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_azure_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_azure = AzureFileShareToGCSOperator(
task_id='transfer_dir_from_azure',
azure_fileshare_conn_id='azure_fileshare',
share_name='example-file-share',
directory_name='data-for-gcs',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-azure/')
transfer_dir_from_azure >> sleep_2min >> print_dir_files