安装 Python 依赖项

本页面介绍如何通过几个常见应用安装 Python 软件包并连接到 Cloud Composer 环境。

安装方式

依赖项会随基础环境中的现有 Python 依赖项一起安装。

如果您的 Python 依赖项没有外部依赖项,并且不与 Cloud Composer 的依赖项冲突,那么您可以使用 GCP Console、Cloud SDK 或 Cloud Composer API 通过 Python 软件包索引安装 Python 依赖项

如果您还有其他要求,以下几种方式可供选择。

方式 使用条件…
本地 Python 库 在 Python 软件包索引中找不到您的 Python 依赖项,并且该库没有任何外部依赖项,例如 dist-packages。
插件功能 您希望使用插件特有的功能,例如修改 Airflow 网页界面。
PythonVirtualenvOperator 您的 Python 依赖项可以在 Python 软件包索引中找到,并且没有外部依赖项。但是,您不希望所有工作器都安装您的 Python 依赖项,或者该依赖项与 Cloud Composer 所需的依赖项相冲突。
KubernetesPodOperator

您所需的外部依赖项无法通过 pip 安装(例如 dist-packages),或者这些外部依赖项位于内部 pip 服务器上。

这种方式需要的设置和维护较多,因此,除非其他方式都不适用,否则通常不建议使用。

准备工作

  • 如需在 Cloud Composer 环境中安装 Python 软件包,您需要具备 composer.environments.update 权限。如需了解详情,请参阅 Cloud Composer 访问权限控制
  • 要求必须遵循 PEP-508 中指定的格式:每项要求均以小写形式指定,并且包含软件包名称(您还可以选择添加 extras 和版本说明符)。
  • 使用 API 安装自定义 Python 依赖项时,所有 Cloud Composer 进程都将使用新安装的 PyPI 依赖项运行。
  • 自定义 PyPI 依赖项可能会导致与 Airflow 所需的依赖项发生冲突,从而导致不稳定。
  • 在将您的 PyPI 软件包部署到生产环境之前,我们建议您先在 Airflow 工作器容器中对该软件包进行本地测试

通过 PyPI 安装 Python 依赖项

如需为您的环境添加、更新或删除 Python 依赖项,请按如下所述操作:

Console

按如下方式指定软件包名称和版本说明符:

  • "pi-python-client", "==1.1.post1"
  • "go-api-python-client", "==1.0.0.dev187"

对于不带版本说明符的软件包,请将该值指定为一个空字符串,例如 "glob2", " "

如需访问环境的 Python 依赖项,请执行以下步骤以导航到 PyPI 依赖项 (PyPI dependencies) 页面:

  1. 打开 Google Cloud Platform Console 中的环境页面。

    打开“环境”页面

  2. 点击要为其安装、更新或删除 Python 依赖项的环境的名称

  3. 选择 PyPI 依赖项 (PyPI dependencies) 标签页。

  4. 点击修改按钮。

  5. 如需添加新的依赖项,请执行以下操作:

    1. 点击添加依赖项按钮。

    2. 名称版本字段中输入您的库的名称和版本。

  6. 如需更新现有依赖项,请执行以下操作:

    1. 选择要更新的库的名称和/或版本字段。

    2. 输入新值。

  7. 如需删除依赖项,请执行以下操作:

    1. 将鼠标悬停在待删除依赖项的名称上。

    2. 点击出现的回收站图标。

gcloud

requirements.txt 文件传递给 gcloud 命令行工具。设置该文件的格式,使每个要求说明符单独列为一行。

示例 requirements.txt 文件

scipy>=0.13.3
scikit-learn
nltk[machine_learning]

requirements.txt 文件传递给 environments.set-python-dependencies 命令,以设置安装依赖项。

gcloud composer environments update ENVIRONMENT-NAME \\
--update-pypi-packages-from-file requirements.txt \\
--location LOCATION

操作完成后,该命令即终止。使用 --async 标志可避免等待。

如果更新因某个依赖项冲突而失败,那么您的环境将继续使用其现有依赖项运行。如果操作成功,您可以开始在 DAG 中使用新安装的 Python 依赖项。

REST

使用 projects.locations.environments.patch 方法,并将 updateMask 查询参数的前缀指定为 config.softwareConfig.pypiPackages

安装本地 Python 库

如需安装内部或本地 Python 库,请执行以下操作:

  1. 将依赖项放入 dags/ 文件夹下的一个子目录中。如需从子目录导入某个模块,该模块的路径中的每个子目录都必须包含一个 __init__.py 软件包标记文件。

    在此示例中,依赖项为 coin_module.py

    dags/
      use_local_deps.py  # A DAG file.
      dependencies/
        __init__.py
        coin_module.py
    
  2. 从 DAG 定义文件导入该依赖项。

    例如:

    from dependencies import coin_module

连接到 Flower 网页界面

Flower 是一款适用于 Celery 集群的网页工具。Flower 已预安装在您的环境中。您可以使用其网页界面来监控您的环境的 Apache Airflow 工作器。

如需访问 Flower,请执行以下操作:

  1. 如需确定 Kubernetes Engine 集群,请查看您的环境:

    gcloud composer environments describe ENVIRONMENT-NAME /
        --location LOCATION

    该群集被列为 gkeCluster。部署了该集群的区域被列为 location

    例如:

          gcloud composer environments describe environment-name --location us-central1
          config:
            airflowUri: https://uNNNNe0aNNbcd3fff-tp.appspot.com
            dagGcsPrefix: gs://us-central1-may18-test-00a47695-bucket/dags
            gkeCluster: projects/example-project/zones/us-central1-a/clusters/us-central1-environment-name-00a47695-gke
            nodeConfig:
              diskSizeGb: 100
              location: projects/example-project/zones/us-central1-a

    在该示例中,集群为 us-central1-environment-name-00a47695-gke,地区为 us-central1-a。此信息也可通过 GCP Console 的环境详情页面获取。

  2. 连接到该 Kubernetes Engine 集群:

    gcloud container clusters get-credentials CLUSTER_NAME /
    --zone CLUSTER_ZONE

    例如:

    gcloud container clusters get-credentials us-central1-environment-name-00a47695-gke --zone us-central1-a
    Fetching cluster endpoint and auth data.
    kubeconfig entry generated for us-central1-environment-name-00a47695-gke.
  3. 查看工作器 pod,并选择用于运行 Flower 的 pod:

    kubectl get pods

    例如:

    kubectl get pods
    NAME                                 READY     STATUS    RESTARTS   AGE
    airflow-redis-67f555bdb8-n6m9k       1/1       Running   0          13d
    airflow-scheduler-6cdf4f4ff7-dm4dm   2/2       Running   0          1h
    airflow-sqlproxy-54497bd557-nlqtg    1/1       Running   0          13d
    airflow-worker-c5c4b58c7-bl5bf       2/2       Running   0          1h
    airflow-worker-c5c4b58c7-szqhm       2/2       Running   0          1h
    airflow-worker-c5c4b58c7-zhmkv       2/2       Running   0          1h

    pod 名称与正则表达式 "airflow-(worker|scheduler)-[-a-f0-9]+") 匹配。

  4. 在该工作器 pod 上运行 Flower:

    kubectl exec -it POD_NAME -c airflow-worker -- celery flower /
        --broker=redis://airflow-redis-service:6379/0 --port=5555

    例如:

    kubectl exec -it airflow-worker-c5c4b58c7-zhmkv -c airflow-worker -- celery flower
    --broker=redis://airflow-redis-service:6379/0 --port=5555
    [I 180601 20:35:55 command:139] Visit me at http://localhost:5555
    [I 180601 20:35:55 command:144] Broker: redis://airflow-redis-service:6379/0
  5. 在一个单独的终端会话中,将本地端口转发到 Flower:

    kubectl port-forward POD_NAME 5555

    例如:

    kubectl port-forward airflow-worker-c5c4b58c7-zhmkv 5555
    Forwarding from 127.0.0.1:5555 -> 5555
  6. 如需访问该网页界面,请通过本地浏览器访问 http://localhost:5555

安装 SQLAlchemy 以访问 Airflow 数据库

SQLAlchemy 即可用作 Python SQL 工具包,又可用作对象关系映射器 (ORM)。您可以安装 SQLAlchemy,然后使用它来访问 Cloud Composer 的 Cloud SQL 实例。在安装过程中,Cloud Composer 会配置 Airflow 环境变量 AIRFLOW__CORE__SQL_ALCHEMY_CONN

如需安装 SQLAlchemy,请执行以下操作:

  1. 在您的环境中安装 sqlalchemy

    gcloud composer environments update ENVIRONMENT-NAME /
        --location LOCATION /
        --update-pypi-package "sqlalchemy"
    
  2. 如需确定 Kubernetes Engine 集群,请查看您的环境:

    gcloud composer environments describe ENVIRONMENT-NAME /
        --location LOCATION
  3. 连接到该 Kubernetes Engine 集群:

    gcloud container clusters get-credentials CLUSTER_NAME /
        --zone CLUSTER_LOCATION
  4. 查看工作器 pod,并选择要连接到的 pod:

    kubectl get pods
  5. 使用 SSH 连接到该工作器 pod:

    kubectl exec -it POD_NAME `-- /bin/bash`

    例如:

    kubectl exec -it airflow-worker-54c6b57789-66pnr -- /bin/bash
    Defaulting container name to airflow-worker.
    Use 'kubectl describe pod/airflow-worker-54c6b57789-66pnr' to see all of the containers in this pod.
    airflow@airflow-worker-54c6b57789-66pnr:/$

  6. 使用 sqlalchemy 库与 Airflow 数据库互动:

    python
    import airflow.configuration as config
    config.conf.get('core', 'sql_alchemy_conn')
此页内容是否有用?请给出您的反馈和评价:

发送以下问题的反馈:

此网页
Cloud Composer