安装 Python 依赖项

本页面介绍如何通过几个常见应用安装 Python 软件包并连接到 Cloud Composer 环境。

依赖项会随基础环境中的现有 Python 依赖项一起安装。

如果您的环境需要特定的软件包,建议您明确安装该软件包,以避免由于不同 Cloud Composer 映像版本之间的软件包更改而导致的问题。请勿依赖环境中运行的 Cloud Composer 版本中的预装软件包。

用于管理依赖项的选项

如果您的 Python 依赖项没有外部依赖项,且不与 Cloud Composer 的依赖项冲突,您可以从 Python Package Index 安装 Python 依赖项。您还可以从私有软件包代码库安装 Python 依赖项

如需了解其他要求,请参阅以下几个选项。

选项 符合以下条件时使用
本地 Python 库 在 Python Package Index 中找不到您的 Python 依赖项,并且该库没有任何外部依赖项,例如 dist-packages。
插件功能 您希望使用插件特有的功能,例如修改 Airflow 网页界面。
PythonVirtualenvOperator 您的 Python 依赖项可以在 Python 软件包索引中找到,并且没有外部依赖项。但是,您不希望所有工作器都安装您的 Python 依赖项,或者该依赖项与 Cloud Composer 所需的依赖项相冲突。
KubernetesPodOperator

您所需的外部依赖项无法通过 pip 安装(例如 dist-packages),或者这些外部依赖项位于内部 pip 服务器上。

这种方式需要的设置和维护较多,因此,除非其他方式都不适用,否则通常不建议使用。

准备工作

  • 如需在 Cloud Composer 环境中安装 Python 软件包,必须有以下权限:composer.environments.update。如需了解详情,请参阅 Cloud Composer 访问权限控制
  • 如果您的环境受 VPC Service Controls 边界的保护,则在安装 PyPI 依赖项之前,您必须授予其他用户身份对服务边界保护的服务的访问权限,并支持私有 PyPI 代码库。
  • 要求必须遵循 PEP-508 中指定的格式:每项要求均以小写形式指定,并且包含软件包名称(您还可以选择添加 extras 和版本说明符)。
  • 使用 API 安装自定义 Python 依赖项时,所有 Cloud Composer 进程都将使用新安装的 PyPI 依赖项运行。
  • 自定义 PyPI 依赖项可能会导致与 Airflow 所需的依赖项发生冲突,从而导致不稳定。
  • 在将您的 PyPI 软件包部署到生产环境之前,我们建议您先在 Airflow 工作器容器中对该软件包进行本地测试

查看已安装的 Python 软件包

如需查看您的环境中已安装的 Python 软件包,请执行以下操作:

  1. 连接到环境的 GKE 集群
  2. 连接到一个 Pod。如需访问 GKE 集群中的 pod,请使用支持命名空间的 kubectl 命令。 如需查看所有命名空间,请使用 kubectl get pods --all-namespaces
  3. 运行 pip freeze

例如:

gcloud container clusters get-credentials projects/composer-test-1/zones/us-central1-f/clusters/us-central1-1223232-gke --zone us-central1-f
Fetching cluster endpoint and auth data.
kubeconfig entry generated for us-central1-quickstart-f5da909c-gke.
~ (composer-test-1)$ kubectl exec -itn composer-1-7-2-airflow-1-9-0-0a9f265b airflow-worker-7858d4fb79-2dr9j -- /bin/bash
...
~ (composer-test-1)$ pip freeze

absl-py==0.7.1
adal==1.2.0
asn1crypto==0.24.0
astor==0.8.0
attrs==19.1.0
autopep8==1.4.4
...

从 PyPI 安装 Python 依赖项

您的 Python 依赖项不得有外部依赖,也不得与 Cloud Composer 依赖项存在冲突,才能从 Python Package Index 安装 Python 依赖项。

如需为您的环境添加、更新或删除 Python 依赖项,请按如下所述操作:

控制台

按如下方式指定软件包名称和版本说明符:

  • "pi-python-client", "==1.1.post1"
  • "go-api-python-client", "==1.0.0.dev187"

对于没有版本说明符的软件包,请使用空字符串作为值,例如 "glob2", " "

如需访问环境的 Python 依赖项,请执行以下步骤以导航到 PyPI 依赖项 (PyPI dependencies) 页面:

  1. 打开 Google Cloud Platform Console 中的环境页面。

    打开“环境”页面

  2. 点击要为其安装、更新或删除 Python 依赖项的环境的名称

  3. 选择 PyPI 依赖项 (PyPI dependencies) 标签页。

  4. 点击修改按钮。

  5. 如需添加新的依赖项,请执行以下操作:

    1. 点击添加依赖项按钮。

    2. 名称版本字段中输入您的库的名称和版本。

  6. 如需更新现有依赖项,请执行以下操作:

    1. 选择要更新的库的名称和/或版本字段。

    2. 输入新值。

  7. 如需删除依赖项,请执行以下操作:

    1. 将鼠标悬停在待删除依赖项的名称上。

    2. 点击出现的回收站图标。

gcloud

将一个 requirements.txt 文件传递到 gcloud 命令行工具。 设置该文件的格式,使每个要求说明符单占一行。

示例 requirements.txt 文件

scipy>=0.13.3
scikit-learn
nltk[machine_learning]

requirements.txt 文件传递到 environments.set-python-dependencies 命令以设置安装依赖项。

gcloud composer environments update ENVIRONMENT-NAME \\
--update-pypi-packages-from-file requirements.txt \\
--location LOCATION

操作完成后,该命令即终止。为了避免等待,可使用 --async 标志。

如果更新因某个依赖项冲突而失败,那么您的环境将继续使用其现有依赖项运行。如果操作成功,您可以开始在 DAG 中使用新安装的 Python 依赖项。

rest

使用 projects.locations.environments.patch 方法,将 config.softwareConfig.pypiPackages 指定为 updateMask 查询参数的前缀。

从私有代码库安装 Python 依赖项

您可以安装公共互联网上可用的私有软件包代码库中托管的软件包。软件包必须是默认 pip 工具可以安装的配置正确的软件包。

要使用公共地址从私有软件包代码库进行安装,请执行以下操作:

  1. 创建一个 pip.conf 文件,如果适用,在文件中包含以下信息:

    • 代码库的访问凭据
    • 非默认 pip 安装选项

      示例:

      [global]
      extra-index-url=https://my-example-private-repo.com/
      

  2. 将 pip.conf 文件上传到环境的 Cloud Storage 存储分区并将其放在 /config/pip/ 文件夹中,例如:gs://us-central1-b1-6efannnn-bucket/config/pip/pip.conf

在专用 IP 环境中安装 Python 依赖项

专用 IP 环境限制对公共互联网的访问,因此安装 Python 依赖项可能需要执行其他步骤。

从公共 PyPI 代码库安装依赖项时,无需特殊配置。您可以按照上述常规流程执行操作。您还可以从具有公共地址的私有代码库请求软件包。

或者,您也可以在 VPC 网络中托管私有 PyPI 代码库。安装依赖项时,Cloud Composer 将在托管您环境的专用 IP GKE 集群中运行操作,而无需通过 Cloud Build 访问任何公共 IP 地址。

要从您的 VPC 网络中托管的私有代码库安装软件包,请执行以下操作:

  1. 如果您的 Cloud Composer 环境的服务帐号没有 project.editor 角色,请为其授予 iam.serviceAccountUser 角色。

  2. 在上传到 Cloud Storage 存储分区中的 /config/pip/ 文件夹的 pip.conf 文件中指定代码库的专用 IP 地址。

在 VPC Service Controls 边界中将 Python 依赖项安装到专用 IP 环境

使用 VPC Service Controls 边界保护项目会导致进一步的安全限制。具体来说,不能将 Cloud Build 用于安装软件包,从而阻止对公共互联网上代码库的访问。

要在边界内为专用 IP Composer 环境安装 Python 依赖项,您有以下一些选项:

  1. 使用托管在您的 VPC 网络中的私有 PyPI 代码库(如上一节所述)。
  2. 使用 VPC 网络中的代理服务器连接到公共互联网上的 PyPI 代码库。在 Cloud Storage 存储分区的 /config/pip/pip.conf 文件中指定代理地址。
  3. 如果您的安全政策允许从外部 IP 地址访问 VPC 网络,则可以通过配置 Cloud NAT 来启用此功能。
  4. 将 Python 依赖项提供给 Cloud Storage 存储分区中的 dags 文件夹,以将其安装为本地库。如果依赖项树很大,则这是一个不错的选项。

安装本地 Python 库

如需安装内部或本地 Python 库,请执行以下操作:

  1. 将依赖项放在 dags/ 文件夹的子目录中。如需从子目录导入某个模块,该模块的路径中的每个子目录都必须包含一个 __init__.py 软件包标记文件。

    在此示例中,依赖项为 coin_module.py

    dags/
      use_local_deps.py  # A DAG file.
      dependencies/
        __init__.py
        coin_module.py
    
  2. 从 DAG 定义文件导入该依赖项。

    例如:

    from dependencies import coin_module

使用依赖于共享对象库的 Python 软件包

某些 PyPI 软件包取决于系统级库。 虽然 Cloud Composer 不支持系统库,但您可以使用以下方法:

  1. 使用 KubernetesPodOperator。 将操作节点映像设置为自定义构建映像。如果在安装过程中由于未满足系统依赖关系而遇到软件包安装失败,请使用此选项。

  2. 将共享对象库上传到环境的 Cloud Storage 存储分区。

    1. 手动查找此 PyPI 依赖项的共享对象库(.so 文件)。
    2. 将该共享对象库上传到 /home/airflow/gcs/plugins
    3. 设置以下 Cloud Composer 环境变量:LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/airflow/gcs/plugins

    如果您的 PyPI 包已成功安装,但在运行时失败,则可以选择此方法。

连接到 Flower 网页界面

Flower 是一款适用于 Celery 集群的网页工具。Flower 已预安装在您的环境中。您可以使用其网页界面来监控您的环境的 Apache Airflow 工作器。

如需访问 Flower,请执行以下操作:

  1. 如需确定 Kubernetes Engine 集群,请查看您的环境:

    gcloud composer environments describe ENVIRONMENT-NAME /
        --location LOCATION

    该集群列为 gkeCluster。部署了该集群的地区列为 location

    例如:

          gcloud composer environments describe environment-name --location us-central1
          config:
            airflowUri: https://uNNNNe0aNNbcd3fff-tp.appspot.com
            dagGcsPrefix: gs://us-central1-may18-test-00a47695-bucket/dags
            gkeCluster: projects/example-project/zones/us-central1-a/clusters/us-central1-environment-name-00a47695-gke
            nodeConfig:
              diskSizeGb: 100
              location: projects/example-project/zones/us-central1-a

    在该示例中,集群为 us-central1-environment-name-00a47695-gke,地区为 us-central1-a。此信息还可以在 Cloud Console 的环境详情页上找到。

  2. 连接到该 Kubernetes Engine 集群:

    gcloud container clusters get-credentials CLUSTER_NAME /
        --zone CLUSTER_ZONE

    例如:

    gcloud container clusters get-credentials us-central1-environment-name-00a47695-gke --zone us-central1-a
    
    Fetching cluster endpoint and auth data.
    kubeconfig entry generated for us-central1-environment-name-00a47695-gke.
  3. 查看工作器 pod,并选择用于运行 Flower 的 pod:

    kubectl get pods --all-namespaces | grep worker

    例如:

    kubectl get pods --all-namespaces | grep worker
    
    airflow-worker-89696c45f-49rkb      2/2       Running   1          29d
    airflow-worker-89696c45f-gccmm      2/2       Running   1          29d
    airflow-worker-89696c45f-llnnx      2/2       Running   0          29d

    Pod 名称与正则表达式 "airflow-(worker|scheduler)-[-a-f0-9]+") 匹配。

  4. 在该工作器 Pod 上运行 Flower:

    kubectl exec -n NAMESPACE -it POD_NAME -c airflow-worker -- airflow flower

    例如:

    kubectl exec -n composer-1-6-0-airflow-1-10-1-9670c487 -it airflow-worker-89696c45f-llnnx /
        -c airflow-worker -- airflow flower
    
    [I 180601 20:35:55 command:139] Visit me at http://0.0.0.0:5555
    [I 180601 20:35:55 command:144] Broker: redis://airflow-redis-service.default.svc.cluster.local:6379/0
  5. 在单独的终端会话中,使用 kubectl 将本地计算机上的端口转发到运行 Flower 界面的 Pod:

    kubectl -n NAMESPACE port-forward POD_NAME 5555

    例如:

    kubectl -n composer-1-6-0-airflow-1-10-1-9670c487 port-forward airflow-worker-c5c4b58c7-zhmkv 5555
    
    Forwarding from 127.0.0.1:5555 -> 5555
  6. 若要访问网页界面,请在本地浏览器中转到 http://localhost:5555

安装 SQLAlchemy 以访问 Airflow 数据库

SQLAlchemy 即可用作 Python SQL 工具包,又可用作对象关系映射器 (ORM)。您可以安装 SQLAlchemy,然后使用它来访问 Cloud Composer 的 Cloud SQL 实例。在安装过程中,Cloud Composer 会配置 Airflow 环境变量 AIRFLOW__CORE__SQL_ALCHEMY_CONN

如需安装 SQLAlchemy,请执行以下操作:

  1. 在您的环境中安装 sqlalchemy

    gcloud composer environments update ENVIRONMENT-NAME /
        --location LOCATION /
        --update-pypi-package "sqlalchemy"
    
  2. 如需确定 Kubernetes Engine 集群,请查看您的环境:

    gcloud composer environments describe ENVIRONMENT-NAME /
        --location LOCATION
  3. 连接到该 Kubernetes Engine 集群:

    gcloud container clusters get-credentials CLUSTER_NAME /
        --zone CLUSTER_LOCATION
  4. 查看工作器 pod,并选择要连接到的 pod:

    kubectl get pods --all-namespaces | grep worker
  5. 使用 SSH 连接到该工作器 pod:

    kubectl -n NAMESPACE exec -it POD_NAME -c airflow-worker -- /bin/bash

    例如:

    kubectl -n composer-1-6-0-airflow-1-10-1-9670c487 /
        exec -it airflow-worker-54c6b57789-66pnr -c airflow-worker -- /bin/bash
    airflow@airflow-worker-54c6b57789-66pnr:~$

  6. 使用 sqlalchemy 库与 Airflow 数据库交互:

    python
    import airflow.configuration as config
    config.conf.get('core', 'sql_alchemy_conn')