K8S 离线部署 Dask 分布式集群及 JupyterHub(二)

一、Jupyter 执行 Dask-ml

!pip install --upgrade dask-ml dask scikit-learn

1、Setup Dask Client:

Start by connecting to the Dask cluster. Ensure you have the correct address (simple-scheduler.jhub.svc.cluster.local:8786 in your case).

2 Generate Sample Data:

For demonstration purposes, let's generate some synthetic data. You can replace this with your actual dataset.

3. Split Data for Training:

Since Dask-ML's algorithms can work directly with Dask arrays, you don't need to worry about splitting the data explicitly for training and testing. Dask handles this internally when performing computations.

4. Train a Model:

Let's train a simple model using Dask-ML. Here, we'll use a logistic regression classifier.

执行分布式任务:

from dask.distributed import Client

# Connect to the Dask cluster
client = Client("simple-scheduler.jhub.svc.cluster.local:8786")
# ----

from sklearn.datasets import make_classification
import dask.array as da

# Generate synthetic dataset
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)

# Convert data to Dask arrays
X_dask = da.from_array(X, chunks=100)
y_dask = da.from_array(y, chunks=100)

# --------------------

#  pip install --upgrade dask-ml dask scikit-learn
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X_dask, y_dask, random_state=42)

# Initialize the logistic regression model
model = LogisticRegression()

# Fit the model
model.fit(X_train, y_train)

# Predict on the test set
y_pred = model.predict(X_test)

# Calculate accuracy
accuracy = accuracy_score(y_test.compute(), y_pred.compute())
print(f"Accuracy: {accuracy}")

client.close()

二、构建新的环境镜像

问题:

在dask worker 节点 没有相关的依赖,如何在 worker 节点的官方镜像中 ghcr.io/dask/dask:latest 添加 dask-ml scikit-learn 依赖库?请给出好的方法

要在 ghcr.io/dask/dask:latest 官方镜像中的 Dask worker 节点上添加 dask-ml 和 scikit-learn 依赖库,你可以通过创建自定义的 Docker 镜像来实现。下面是具体的步骤:

1、编写 Dockerfile
首先,在本地创建一个名为 Dockerfile 的文件,用于构建自定义的 Dask worker 镜像。

# 基于官方 Dask 镜像构建
FROM ghcr.io/dask/dask:latest

# 在镜像中安装所需的 Python 包
RUN pip install \
    dask-ml \
    scikit-learn \
    xgboost \
    dask_xgboost

# 添加任何其他自定义配置或依赖项的安装步骤

# 设定入口点(可选)
CMD ["dask-worker", "tcp://scheduler:8786"]

2、构建自定义镜像

在包含 Dockerfile 的目录中执行以下命令,将会构建一个新的镜像:

# 构建
docker build -t my-dask-worker:v1.0.0 -f Dockerfile .

# 打tag
docker tag my-dask-worker:v1.0.0 magic-harbor.medemo.com/jupyterhub/my-dask-worker:v1.0.0

docker push magic-harbor.demo.com/jupyterhub/my-dask-worker:v1.0.0

其中,my-dask-worker 是你为新镜像命名的标签。请确保你在构建之前已经登录到 Docker Hub 或者你的容器注册表,这样就可以成功构建和存储镜像。

3、启动 Dask 集群
使用你的自定义镜像启动 Dask 集群。可以使用 docker-compose 或者其他适合你环境的方法启动:

# docker-compose.yml 示例文件
version: '3'
services:
  scheduler:
    image: ghcr.io/dask/dask:latest
    command: ["dask-scheduler"]
    ports:
      - "8786:8786"
      - "8787:8787"
  worker:
    image: my-dask-worker  # 使用你刚刚构建的自定义镜像
    command: ["dask-worker", "tcp://scheduler:8786"]
    depends_on:
      - scheduler

在这个例子中,worker 服务使用了我们刚刚构建的 my-dask-worker 镜像,它包含了 dask-ml 和 scikit-learn。

4、验证依赖安装
启动完毕后,你可以连接到 Dask 的 Web UI 或者使用 docker exec 命令进入到 worker 容器中,验证 dask-ml 和 scikit-learn 是否成功安装和可用。

docker exec -it <worker_container_id> bash

# 然后在容器内部验证:
python -c "import dask_ml; print(dask_ml.__version__)"
python -c "import sklearn; print(sklearn.__version__)"

总结
通过创建一个基于官方 Dask 镜像的自定义 Docker 镜像,并在其中安装所需的 Python 包,你可以在 Dask worker 节点上添加 dask-ml 和 scikit-learn 的依赖库。这种方法允许你灵活地扩展和定制 Dask 集群的功能,以满足特定的数据处理和机器学习需求。

使用KubeCluster 动态创建dask

# 设置 KUBECONFIG 环境变量
import os
os.environ['KUBECONFIG'] = '/home/jovyan/101_kubeconfig'

# 调用k8s资源,必须使用kube-config文件
# from kubernetes import client, config
# config.load_incluster_config()
# print("Before loading kube config")
# config.load_kube_config(config_file="/home/jovyan/magic_k8s_config-deeplearning")
# print("After loading kube config")

from dask_kubernetes.operator import KubeCluster
print("KubeCluster loading success")

# cluster.scale(10)
cluster = KubeCluster(
   name="kube-dask-cluster",
   image="magic-harbor.magic.com/jupyterhub/my-dask-worker:v1.0.0",
   namespace="jhub",
   resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "4Gi"}},
)                            
cluster.scale(3)

# 扩缩容 worker
# Scale down the cluster
# cluster.scale(2)

# 连接集群
from dask.distributed import Client

# Connect Dask to the cluster
client = Client(cluster)

# Finally delete the cluster by running
cluster.close()

101_kubeconfig 文件内容:

apiVersion: v1
clusters:
- cluster:
    certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FUR...QVRFLS0tLS0K
    server: https://192.168.1.248:8443
  name: yc-k8s
contexts:
- context:
    cluster: yc-k8s
    user: admin
  name: context-yc-k8s
current-context: context-yc-k8s
kind: Config
preferences: {}
users:
- name: admin
  user:
    client-certificate-data: LS0tLS1CRUdJTiBDRVJU...UJoTUNRMDR4RVRBUEJnTlZCQWdUQ0VoaGJtZGFhRzkxTVFzd0NRLQo=
    client-key-data: LS0tLS1CRUdJTiBSU0EgUFJ...SBLRVktLS0tLQo=

查看动态创建的 dask worker 服务:

[root@192.168.1.101 ~]#kubectl get pods -A | grep dask
dask-operator            dask-kubernetes-operator-686bfb958f-kscjf                         1/1     Running                 0          30d
jhub                     kube-dask-cluster-default-worker-5d569caaeb-7cfb7884f8-4gjxm      1/1     Running                 0          19s
jhub                     kube-dask-cluster-default-worker-b6acb0fd1d-5b7fc7c9f4-l2skx      1/1     Running                 0          19s
jhub                     kube-dask-cluster-default-worker-e3fbb21383-cc58cf5d-9fbhk        1/1     Running                 0          19s
jhub                     kube-dask-cluster-scheduler-7fcff99b7d-7zxmh                      1/1     Running                 0          36s

在Jupyter 中调试:
file

file

二、实战

分布式建模示例

示例一:Create Scikit-Learn Estimator

https://examples.dask.org/machine-learning.html

# 设置 KUBECONFIG 环境变量
import os
os.environ['KUBECONFIG'] = '/home/jovyan/101_kubeconfig'

# 创建分布式
from dask_kubernetes.operator import KubeCluster
print("KubeCluster loading success")

# cluster.scale(10)
cluster = KubeCluster(
   name="kube-dask-cluster",
   image="magic-harbor.magic.com/jupyterhub/my-dask-worker:v1.0.0",
   namespace="jhub",
   resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "4Gi"}},
)                               
cluster.scale(3)

# 连接集群
from dask.distributed import Client

# Connect Dask to the cluster
client = Client(cluster)

# 建模
from sklearn.datasets import make_classification
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
import pandas as pd

X, y = make_classification(n_samples=1000, random_state=0)
X[:5]

param_grid = {"C": [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
              "kernel": ['rbf', 'poly', 'sigmoid'],
              "shrinking": [True, False]}

grid_search = GridSearchCV(SVC(gamma='auto', random_state=0, probability=True),
                           param_grid=param_grid,
                           return_train_score=False,
                           cv=3,
                           n_jobs=-1)

# To fit that normally, we would call
grid_search.fit(X, y)

# To fit it using the cluster, we just need to use a context manager provided by joblib.
import joblib
# 使用了joblib提供的上下文管理器,并选择了使用Dask作为并行化的后端。Dask是一个开源的并行计算框架,可以在集群上运行分布式任务。
# 通过使用joblib.parallel_backend('dask'),可以将网格搜索的任务分发到Dask集群中的多个节点上并行执行,从而加速模型的训练和超参数搜索过程。这种方法特别适用于大规模数据和需要耗时的模型训练任务,能够显著提高计算效率和性能。
with joblib.parallel_backend('dask'):
    grid_search.fit(X, y)

pd.DataFrame(grid_search.cv_results_).head()        

grid_search.predict(X)[:5]

grid_search.score(X, y)
# 0.983

# Finally delete the cluster by running
client.close()
cluster.close()

相关文章:
K8S 离线部署 Dask 分布式集群及 JupyterHub
Dask KubeCluster

为者常成,行者常至