Daten aus anderen Diensten mit Google-Übertragungsanbietern übertragen

Cloud Composer 1 Cloud Composer 2

Auf dieser Seite wird gezeigt, wie Sie Daten aus anderen Diensten mit Google Transfer Operators in Ihren DAGs übertragen.

Informationen zu Google Transfer Operators

Google Transfer Operators bestehen aus einer Reihe von Airflow-Operatoren, mit denen Sie Daten aus anderen Diensten in Google Cloud abrufen können.

In diesem Leitfaden werden Operatoren für Azure FileShare Storage und Amazon S3 beschrieben, die mit Cloud Storage funktionieren. Es gibt viele weitere Übertragungsoperatoren, die mit Diensten in Google Cloud und mit anderen Diensten als Google Cloud funktionieren.

Amazon S3 für Cloud Storage

In diesem Abschnitt wird gezeigt, wie Sie Daten aus Amazon S3 mit einem Cloud Storage-Bucket synchronisieren.

Amazon-Anbieterpaket installieren

Das Paket apache-airflow-providers-amazon enthält die Verbindungstypen und Funktionen für die Interaktion mit Amazon S3. Installieren Sie dieses PyPI-Paket in Ihrer Umgebung.

Verbindung zu Amazon S3 konfigurieren

Das Amazon-Anbieterpaket bietet einen Verbindungstyp für Amazon S3. Sie erstellen eine Verbindung dieses Typs. Die Verbindung für Cloud Storage mit dem Namen google_cloud_default ist bereits in Ihrer Umgebung eingerichtet.

So richten Sie eine Verbindung zu Amazon S3 ein:

  1. Wechseln Sie in der Airflow-UI zu Admin > Connections (Verwaltung > Verbindungen).
  2. Erstellen Sie eine neue Verbindung.
  3. Wählen Sie als Verbindungstyp Amazon S3 aus.
  4. Im folgenden Beispiel wird eine Verbindung mit dem Namen aws_s3 verwendet. Sie können diesen oder einen anderen Namen für die Verbindung verwenden.
  5. Geben Sie Verbindungsparameter an, wie in der Airflow-Dokumentation für die Amazon Web Services-Verbindung beschrieben. Wenn Sie beispielsweise eine Verbindung mit AWS-Zugriffsschlüsseln einrichten möchten, generieren Sie einen Zugriffsschlüssel für Ihr AWS-Konto und geben dann die AWS-Zugriffsschlüssel-ID als Log-in an. Den geheimen AWS-Zugriffsschlüssel als Passwort für die Verbindung verwenden.

Daten von Amazon S3 übertragen

Wenn Sie die synchronisierten Daten später in einem anderen DAG oder einer anderen Aufgabe bearbeiten möchten, verschieben Sie sie in den Ordner /data des Buckets Ihrer Umgebung. Dieser Ordner wird mit anderen Airflow-Workern synchronisiert, sodass Aufgaben in Ihrem DAG darauf ausgeführt werden können.

Der folgende Beispiel-DAG führt Folgendes aus:

  • Synchronisiert den Inhalt des Verzeichnisses /data-for-gcs aus einem S3-Bucket mit dem Ordner /data/from-s3/data-for-gcs/ im Bucket Ihrer Umgebung.
  • Es wird zwei Minuten gewartet, bis die Daten mit allen Airflow-Workern in Ihrer Umgebung synchronisiert wurden.
  • Gibt die Liste der Dateien in diesem Verzeichnis mit dem Befehl ls aus. Ersetzen Sie diese Aufgabe durch andere Airflow-Operatoren, die mit Ihren Daten arbeiten.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')

    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Azure-Dateifreigabe für Cloud Storage

In diesem Abschnitt wird gezeigt, wie Sie Daten aus Azure FileShare mit einem Cloud Storage-Bucket synchronisieren.

Microsoft Azure-Anbieterpaket installieren

Das Paket apache-airflow-providers-microsoft-azure enthält die Verbindungstypen und Funktionen für die Interaktion mit Microsoft Azure. Installieren Sie dieses PyPI-Paket in Ihrer Umgebung.

Verbindung zu Azure FileShare konfigurieren

Das Microsoft Azure-Anbieterpaket enthält einen Verbindungstyp für die Azure-Dateifreigabe. Sie erstellen eine Verbindung dieses Typs. Die Verbindung für Cloud Storage mit dem Namen google_cloud_default ist bereits in Ihrer Umgebung eingerichtet.

So richten Sie eine Verbindung zu Azure FileShare ein:

  1. Wechseln Sie in der Airflow-UI zu Admin > Connections (Verwaltung > Verbindungen).
  2. Erstellen Sie eine neue Verbindung.
  3. Wählen Sie als Verbindungstyp Azure FileShare aus.
  4. Im folgenden Beispiel wird eine Verbindung mit dem Namen azure_fileshare verwendet. Sie können diesen oder einen anderen Namen für die Verbindung verwenden.
  5. Geben Sie Verbindungsparameter wie in der Airflow-Dokumentation für Microsoft Azure File Share Connection beschrieben an. Sie können beispielsweise einen Verbindungsstring für den Zugriffsschlüssel Ihres Speicherkontos angeben.

Daten aus Azure FileShare übertragen

Wenn Sie die synchronisierten Daten später in einem anderen DAG oder einer anderen Aufgabe bearbeiten möchten, verschieben Sie sie in den Ordner /data des Buckets Ihrer Umgebung. Dieser Ordner wird mit anderen Airflow-Workern synchronisiert, sodass Aufgaben in Ihrem DAG darauf ausgeführt werden können.

Der folgende DAG führt Folgendes aus:

Der folgende Beispiel-DAG führt Folgendes aus:

  • Synchronisiert den Inhalt des Verzeichnisses /data-for-gcs aus der Azure-Dateifreigabe mit dem Ordner /data/from-azure im Bucket Ihrer Umgebung.
  • Es wird zwei Minuten gewartet, bis die Daten mit allen Airflow-Workern in Ihrer Umgebung synchronisiert wurden.
  • Gibt die Liste der Dateien in diesem Verzeichnis mit dem Befehl ls aus. Ersetzen Sie diese Aufgabe durch andere Airflow-Operatoren, die mit Ihren Daten arbeiten.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')

    transfer_dir_from_azure >> sleep_2min >> print_dir_files

Nächste Schritte