Airflow-Verbindungen verwalten

Auf dieser Seite wird die Verwendung von Airflow-Verbindungen beschrieben.

Mit Airflow-Verbindungen können Sie über eine Cloud Composer-Umgebung auf Ressourcen in Google Cloud-Projekten zugreifen. Sie erstellen Airflow-Verbindungs-IDs zum Speichern von Informationen wie Anmeldedaten und Hostnamen, und Ihre Workflows verweisen auf die Verbindungs-IDs. Airflow-Verbindungen sind die empfohlene Methode zum Speichern von Secrets und Anmeldedaten, die in Workflows verwendet werden.

Mit Airflow-Verbindungen können Sie die Verbindungsinformationen speichern, die für eine Cloud Composer-Umgebung erforderlich sind, um mit anderen APIs zu kommunizieren, z. B. mit Google Cloud-Projekten, anderen Cloud-Anbietern oder Diensten von Drittanbietern.

Eine Airflow-Verbindung kann Details speichern, z. B. Anmeldedaten, Hostnamen oder zusätzliche API-Parameter. Jede Verbindung ist mit einer ID verknüpft, die Sie in Workflow-Aufgaben verwenden können, um auf die Voreinstellungen zu verweisen. Wir empfehlen zum Speichern von Secrets und Anmeldedaten für Workflowaufgaben Airflow-Verbindungen.

Der Google Cloud-Verbindungstyp aktiviert Google Cloud-Integrationen.

Farnet-Schlüssel und gesicherte Verbindungen

Beim Erstellen einer neuen Umgebung generiert Cloud Composer einen eindeutigen, dauerhaften Farnet-Schlüssel für die Umgebung und sichert die Verbindungszustände standardmäßig. Sie können fernet_key in der Airflow-Konfiguration aufrufen. Weitere Informationen zum Schutz von Verbindungen finden Sie unter Verbindungen sichern.

Standardverbindungen verwenden

Cloud Composer konfiguriert standardmäßig die folgenden Airflow-Verbindungen für die Google Cloud Platform:

  • bigquery_default
  • google_cloud_default
  • google_cloud_datastore_default
  • google_cloud_storage_default

Sie können diese Verbindungen aus Ihren DAGs mithilfe der Standardverbindungs-ID verwenden. Im folgenden Beispiel wird der BigQueryOperator mit der Standardverbindung verwendet.

Airflow 2

task_default = bigquery.BigQueryInsertJobOperator(
    task_id='task_default_connection',
    configuration={
        "query": {
            "query": 'SELECT 1',
            "useLegacySql": False
        }
    }
)

Airflow 1

task_default = bigquery_operator.BigQueryOperator(
    task_id='task_default_connection',
    sql='SELECT 1', use_legacy_sql=False)

Sie können die Verbindungs-ID auch explizit angeben, wenn Sie den Operator erstellen.

Airflow 2

# Composer creates a 'google_cloud_default' connection by default.
task_explicit = bigquery.BigQueryInsertJobOperator(
    task_id='task_explicit_connection',
    gcp_conn_id='google_cloud_default',
    configuration={
        "query": {
            "query": 'SELECT 1',
            "useLegacySql": False
        }
    }
)

Airflow 1

task_explicit = bigquery_operator.BigQueryOperator(
    task_id='task_explicit_connection',
    sql='SELECT 1', use_legacy_sql=False,
    # Composer creates a 'google_cloud_default' connection by default.
    bigquery_conn_id='google_cloud_default')

Auf Ressourcen in einem anderen Projekt zugreifen

Für den Zugriff auf Ressourcen in einem Google Cloud-Projekt in der Cloud Composer-Umgebung wird empfohlen, die Standardverbindungen zu verwenden und dem mit Ihrer Umgebung verknüpften Dienstkonto die entsprechenden Identitäts- und Zugriffsverwaltungsberechtigungen zuzuweisen.

In den folgenden Abschnitten finden Sie Beispiele dafür, wie Sie Lese- und Schreibvorgänge in Cloud Storage-Buckets in your-storage-project für eine in der Projekt-ID bereitgestellte Cloud Composer-Umgebungyour-composer-project ermöglichen.

Das Ihrer Umgebung zugeordnete Dienstkonto ermitteln

Console

  1. Öffnen Sie in der Cloud Console die Seite Umgebungen.

    Zur Seite „Umgebungen“

  2. Klicken Sie in der Spalte Name auf den Namen der Umgebung, um deren Seite Umgebungsdetails zu öffnen.
  3. Beachten Sie das Dienstkonto. Dieser Wert ist eine E-Mail-Adresse wie service-account-name@your-composer-project.iam.gserviceaccount.com.

gcloud

Geben Sie den folgenden Befehl ein und ersetzen Sie VARIABLES durch die entsprechenden Werte:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.nodeConfig.serviceAccount)" 

Die Ausgabe zeigt eine Adresse, z. B. service-account-name@your-composer-project.iam.gserviceaccount.com.

Dem Dienstkonto die entsprechenden IAM-Berechtigungen gewähren

Wenn Sie Lese- und Schreibzugriffe auf Cloud Storage-Buckets in your-storage-project gewähren möchten, weisen Sie dem mit Ihrer Cloud Composer-Umgebung verknüpften Dienstkonto die Rolle roles/storage.objectAdmin zu.

Console

  1. Auf der Seite IAM & Verwaltung für Ihr Speicherprojekt.

    Zur Seite „IAM & Verwaltung“

  2. Klicken Sie auf Mitglieder hinzufügen.

  3. Geben Sie im Dialogfeld Mitglieder hinzufügen die vollständige E-Mail-Adresse des mit Ihrer Cloud Composer-Umgebung verknüpften Dienstkontos an.

  4. Wählen Sie im Drop-down-Menü Rolle auswählen die entsprechenden Berechtigungen aus. Wählen Sie für dieses Beispiel die Rolle Storage > Objektadministrator aus.

  5. Klicken Sie auf Hinzufügen.

gcloud

Verwenden Sie den Befehl gcloud projects add-iam-policy-binding, um IAM-Berechtigungen auf Projektebene hinzuzufügen. Ersetzen Sie VARIABLES dabei durch die entsprechenden Werte:

gcloud projects add-iam-policy-binding YOUR_STORAGE_PROJECT \
    --member=serviceAccount:SERVICE_ACCOUNT_EMAIL \
    --role=roles/storage.objectAdmin 

Nachdem die entsprechenden Berechtigungen gewährt wurden, können Sie auf Ressourcen im Projekt your-storage-project mit denselben Airflow-Standardverbindungen zugreifen, mit denen Sie auf Ressourcen im Projekt your-composer-project zugreifen.

Neue Airflow-Verbindungen erstellen

Hinweis

Weisen Sie dem Dienstkonto, das Ihrer Cloud Composer-Umgebung zugeordnet ist, die entsprechenden IAM-Berechtigungen zu und verwenden Sie die Standardverbindungen in Ihren DAG-Definitionen. Führen Sie die Schritte in diesem Abschnitt aus, falls das nicht möglich ist.

Verbindung zu einem anderen Projekt erstellen

Die folgenden Schritte enthalten Beispiele, um Lese- und Schreibvorgänge in Cloud Storage-Buckets in your-storage-project für eine in der Projekt-ID your-composer-project bereitgestellte Cloud Composer-Umgebung zu ermöglichen.

  1. Erstellen Sie ein Dienstkonto in your-storage-project und laden Sie einen JSON-Schlüssel herunter:

    1. Rufen Sie in der Cloud Console die Seite Dienstkonten auf.

      Seite „Dienstkonten“ öffnen

    2. Klicken Sie auf Projekt auswählen.

    3. Wählen Sie Ihr Projekt aus und klicken Sie auf Öffnen.

    4. Klicken Sie auf Dienstkonto erstellen.

    5. Geben Sie einen Namen für das Dienstkonto ein, wählen Sie eine Rolle aus, die Sie dem Dienstkonto zuweisen möchten, z. B. Speicher > Objektadministrator.

    6. Klicken Sie auf Neuen privaten Schlüssel bereitstellen und dann auf Speichern.

    7. Öffnen Sie die JSON-Datei in einem Texteditor. Der Inhalt sollte in etwa so aussehen:

      { "type": "service_account", "project_id": "your-storage-project", ... }

  2. Neue Verbindung herstellen:

    Airflow-UI

    1. Rufen Sie die Airflow-Weboberfläche für Ihre Cloud Composer-Umgebung auf.

    2. Öffnen Sie in der Airflow-Weboberfläche die Seite Admin > Verbindungen.

      Airflow-Screenshot. Öffnen Sie das Menü „Admin-Verbindungen“.

    3. Klicken Sie zum Öffnen des neuen Verbindungsformulars auf den Tab .

      Airflow-Screenshot. Klicken Sie auf den Tab Erstellen.

    4. Neue Verbindung herstellen:

      1. Füllen Sie das Feld Conn Id aus, z. B. my_gcp_connection, um eine Verbindungs-ID auszuwählen. Verwenden Sie diese ID in den DAG-Definitionsdateien.
      2. Wählen Sie im Feld Verbindungstyp die Option Google Cloud Platform aus.
      3. Geben Sie unter Projekt-ID einen Wert für das Projekt ein, zu dem Ihr Dienstkonto gehört.
      4. Führen Sie einen der folgenden Schritte aus:

        1. Kopieren Sie die JSON-Schlüsseldatei des Dienstkontos, die Sie im Verzeichnis data/ des Cloud Storage-Buckets Ihrer Umgebung heruntergeladen haben. Geben Sie dann unter Schlüsseldateipfad den lokalen Dateipfad des Airflow-Workers für den Speicherort der JSON-Schlüsseldatei wie /home/airflow/gcs/data/keyfile.json ein.
        2. Kopieren Sie in Schlüsseldatei (JSON) den Inhalt der JSON-Schlüsseldatei des Dienstkontos, die Sie heruntergeladen haben.

        Nutzer mit Zugriff auf Airflow-Verbindungen über die Befehlszeile oder die Web-UI können Anmeldedaten lesen, die in keyfile_dict gespeichert sind. Zum Schutz dieser Anmeldedaten empfehlen wir die Verwendung des Schlüsseldateipfads und einer Cloud Storage-ACL, um den Zugriff auf die Schlüsseldatei zu beschränken.

      5. Geben Sie einen Wert in das Feld Umfang ein. Es wird empfohlen, https://www.googleapis.com/auth/cloud-platform als Bereich zu verwenden und IAM-Berechtigungen für das Dienstkonto zu verwenden, um den Zugriff auf Google Cloud-Ressourcen einzuschränken.

      6. Klicken Sie auf Speichern, um die Verbindung herzustellen.

        Airflow-Screenshot. Klicken Sie auf den Tab Erstellen.

    gcloud

    Geben Sie den folgenden Befehl ein:

    Airflow 1.10-Befehlszeile

    gcloud composer environments run \
      ENVIRONMENT_NAME \
      --location LOCATION \
      connections -- --add \
      --conn_id=CONNECTION_ID \
      --conn_type=google_cloud_platform \
      --conn_extra '{"extra__google_cloud_platform__CMD_ARGS": "...",
      "extra__google_cloud_platform__CMD_ARGS": "...", ...}'
    

    Airflow 2.0-Befehlszeile

    gcloud beta composer environments run \
    ENVIRONMENT_NAME \
      --location LOCATION \
      connections add -- \
      CONNECTION_ID \
      --conn-type=google_cloud_platform \
      --conn-extra '{"extra__google_cloud_platform__CMD_ARGS": "...",
      "extra__google_cloud_platform__CMD_ARGS": "...", ...}'
    

    wobei

    • ENVIRONMENT_NAME ist der Name der Umgebung.
    • LOCATION ist die Compute Engine-Region, in der sich die Umgebung befindet.
    • CONNECTION_ID ist die Kennung für die Verbindung. Verwenden Sie Kleinbuchstaben und Wörter mit Unterstrichen.
    • CMD_ARGS sind die Folgenden:
      • project ist eine Projekt-ID. Nur extra__google_cloud_platform__project ist erforderlich.
      • key_path ist ein lokaler Dateipfad auf dem Airflow-Worker, der eine JSON-Schlüsseldatei enthält, z. B. /home/airflow/gcs/data/keyfile.json. Wenn angegeben, wird auch scope benötigt. Verwenden Sie key_path oder keyfile_dict, aber nicht beides.
      • keyfile_dict ist ein JSON-Objekt, das den Inhalt der JSON-Schlüsseldatei angibt, die Sie heruntergeladen haben. Wenn angegeben, wird auch scope benötigt. Verwenden Sie keyfile_dict oder key_path, aber nicht beides. Nutzer mit Zugriff auf Airflow-Verbindungen über die Befehlszeile oder die Web-UI können Anmeldedaten lesen, die in keyfile_dict gespeichert sind. Zum Schutz dieser Anmeldedaten empfehlen wir die Verwendung von key_path. Mit einer Cloud Storage-ACL können Sie den Zugriff auf die Schlüsseldatei beschränken.
      • scope ist eine durch Kommas getrennte Liste von OAuth-Bereichen.

    Beispiel:

    Airflow 1.10-Befehlszeile

    gcloud composer environments run test-environment \
       --location us-central1 connections -- --add \
       --conn_id=my_gcp_connection --conn_type=google_cloud_platform \
       --conn_extra '{"extra__google_cloud_platform__project": "your-storage-project",
       "extra__google_cloud_platform__key_path": "/home/airflow/gcs/data/keyfile.json",
       "extra__google_cloud_platform__scope": "https://www.googleapis.com/auth/cloud-platform"}'
    

    Airflow 2.0-Befehlszeile

    gcloud beta composer environments run test-environment \
       --location us-central1 connections add -- \
       --conn-id=my_gcp_connection --conn-type=google_cloud_platform \
       --conn-extra '{"extra__google_cloud_platform__project": "your-storage-project",
       "extra__google_cloud_platform__key_path": "/home/airflow/gcs/data/keyfile.json",
       "extra__google_cloud_platform__scope": "https://www.googleapis.com/auth/cloud-platform"}'
    

Neue Airflow-Verbindung verwenden

Legen Sie das entsprechende ID-Argument für die Verbindung fest, wenn Sie einen Google Cloud Airflow-Operator erstellen, um die von Ihnen erstellte Verbindung zu verwenden.

Airflow 2

# Set a gcp_conn_id to use a connection that you have created.
task_custom = bigquery.BigQueryInsertJobOperator(
    task_id='task_custom_connection',
    gcp_conn_id='my_gcp_connection',
    configuration={
        "query": {
            "query": 'SELECT 1',
            "useLegacySql": False
        }
    }
)

Airflow 1

task_custom = bigquery_operator.BigQueryOperator(
    task_id='task_custom_connection',
    sql='SELECT 1', use_legacy_sql=False,
    # Set a connection ID to use a connection that you have created.
    bigquery_conn_id='my_gcp_connection')