Daten mit Google-Übertragungsoperatoren aus anderen Diensten übertragen

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Auf dieser Seite wird gezeigt, wie Sie Daten aus anderen Diensten mit Google-Übertragungsoperatoren in Ihren DAGs übertragen.

Google-Übertragungsoperatoren

Google-Übertragungsoperatoren sind Airflow-Operatoren, mit denen Sie Daten aus anderen Diensten in Google Cloud abrufen können.

In diesem Leitfaden werden Operatoren für Azure FileShare Storage und Amazon S3 aufgeführt, die mit Cloud Storage. Es gibt noch viele weitere Übertragungsoperatoren, mit Diensten innerhalb von Google Cloud und mit anderen Diensten als Google Cloud

Hinweise

  • Dieses Handbuch bezieht sich auf Airflow 2. Wenn in Ihrer Umgebung Airflow 1 verwendet wird, verwenden Sie Backport-Anbieterpakete, um Operatoren zu importieren und erforderliche Verbindungstypen in Ihrer Umgebung verfügbar zu machen.

Amazon S3 für Cloud Storage

In diesem Abschnitt wird gezeigt, wie Sie Daten von Amazon S3 mit einem Cloud Storage-Bucket synchronisieren.

Amazon-Anbieterpaket installieren

Das apache-airflow-providers-amazon-Paket enthält die Verbindungstypen und Funktionen, die mit Amazon S3 interagieren. Installieren Sie dieses PyPI-Paket in Ihrem zu verbessern.

Verbindung zu Amazon S3 konfigurieren

Das Amazon-Anbieterpaket bietet einen Verbindungstyp für Amazon S3. Ich eine Verbindung dieses Typs erstellen. Die Verbindung für Cloud Storage, namens google_cloud_default ist in Ihrer Umgebung bereits eingerichtet.

So richten Sie eine Verbindung zu Amazon S3 ein:

  1. Klicken Sie in der Airflow-Benutzeroberfläche auf Verwaltung > Verbindungen.
  2. Erstellen Sie eine neue Verbindung.
  3. Wählen Sie als Verbindungstyp Amazon S3 aus.
  4. Im folgenden Beispiel wird eine Verbindung mit dem Namen aws_s3 verwendet. Sie können diesen oder einen anderen Namen für die Verbindung verwenden.
  5. Geben Sie die Verbindungsparameter wie in der Airflow-Dokumentation für die Amazon Web Services-Verbindung beschrieben an. Um beispielsweise eine Verbindung mit AWS-Zugriffsschlüsseln einzurichten, generieren Sie ein Zugriffsschlüssel für Ihr Konto in AWS und geben Sie dann die AWS-Zugriffsschlüssel-ID als den geheimen AWS-Zugriffsschlüssel als Passwort für die Verbindung anmelden.

Daten von Amazon S3 übertragen

Wenn Sie die synchronisierten Daten später in einem anderen DAG oder einer anderen Aufgabe bearbeiten möchten, ziehen Sie sie in den Ordner /data Ihres Umgebungs-Buckets. Dieser Ordner ist mit anderen Airflow-Workern synchronisiert, sodass Aufgaben in Ihrem DAG die Sie bearbeiten können.

Der folgende Beispiel-DAG führt Folgendes aus:

  • Synchronisiert den Inhalt des Verzeichnisses /data-for-gcs aus einem S3-Bucket in den Ordner /data/from-s3/data-for-gcs/ im Bucket Ihrer Umgebung.
  • Es wird zwei Minuten gewartet, bis die Daten mit allen Airflow-Workern in Ihrer Umgebung synchronisiert sind.
  • Gibt die Liste der Dateien in diesem Verzeichnis mit dem Befehl ls aus. Ersetzen Sie diese Aufgabe durch andere Airflow-Operatoren, die mit Ihren Daten funktionieren.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')


    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Azure FileShare für Cloud Storage

In diesem Abschnitt wird gezeigt, wie Sie Daten aus Azure FileShare mit einem Cloud Storage-Bucket.

Microsoft Azure-Anbieterpaket installieren

Das Paket apache-airflow-providers-microsoft-azure enthält die Verbindung die mit Microsoft Azure interagieren. Installieren Sie dieses PyPI-Paket in Ihrem zu verbessern.

Verbindung zu Azure FileShare konfigurieren

Das Microsoft Azure-Anbieterpaket bietet einen Verbindungstyp für die Azure-Datei Teilen. Sie erstellen eine Verbindung dieses Typs. Die Verbindung für Cloud Storage mit dem Namen google_cloud_default ist bereits eingerichtet in für Ihre Umgebung.

Richten Sie auf folgende Weise eine Verbindung zu Azure FileShare ein:

  1. Klicken Sie in der Airflow-Benutzeroberfläche auf Verwaltung > Verbindungen.
  2. Erstellen Sie eine neue Verbindung.
  3. Wählen Sie Azure FileShare als Verbindungstyp aus.
  4. Im folgenden Beispiel wird eine Verbindung mit dem Namen azure_fileshare verwendet. Sie können diesen oder einen anderen Namen für die Verbindung verwenden.
  5. Geben Sie die Verbindungsparameter wie in der Airflow-Dokumentation für die Microsoft Azure-Dateifreigabeverbindung beschrieben an. Sie können beispielsweise einen Verbindungsstring für Ihr Speicherkonto angeben. Zugriffsschlüssel.

Daten aus Azure FileShare übertragen

Wenn Sie die synchronisierten Daten später in einem anderen DAG oder einer anderen Aufgabe verarbeiten möchten, ziehen Sie sie in den Ordner /data des Buckets Ihrer Umgebung. Dieser Ordner ist mit anderen Airflow-Workern synchronisiert, sodass Aufgaben in Ihrem DAG die Sie bearbeiten können.

Der folgende DAG führt Folgendes aus:

Der folgende Beispiel-DAG führt Folgendes aus:

  • Synchronisiert den Inhalt des Verzeichnisses /data-for-gcs aus Azure File Share in den Ordner /data/from-azure im Bucket Ihrer Umgebung.
  • Es wird zwei Minuten gewartet, bis die Daten mit allen Airflow-Workern in für Ihre Umgebung.
  • Gibt die Liste der Dateien in diesem Verzeichnis mit dem Befehl ls aus. Ersetzen Sie diese Aufgabe durch andere Airflow-Operatoren, die mit Ihren Daten funktionieren.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')


    transfer_dir_from_azure >> sleep_2min >> print_dir_files

Nächste Schritte