K8S 离线部署 Dask 分布式集群及 JupyterHub(二)
一、Jupyter 执行 Dask-ml
!pip install --upgrade dask-ml dask scikit-learn
1、Setup Dask Client:
Start by connecting to the Dask cluster. Ensure you have the correct address (simple-scheduler.jhub.svc.cluster.local:8786 in your case).
2 Generate Sample Data:
For demonstration purposes, let's generate some synthetic data. You can replace this with your actual dataset.
3. Split Data for Training:
Since Dask-ML's algorithms can work directly with Dask arrays, you don't need to worry about splitting the data explicitly for training and testing. Dask handles this internally when performing computations.
4. Train a Model:
Let's train a simple model using Dask-ML. Here, we'll use a logistic regression classifier.
执行分布式任务:
from dask.distributed import Client
# Connect to the Dask cluster
client = Client("simple-scheduler.jhub.svc.cluster.local:8786")
# ----
from sklearn.datasets import make_classification
import dask.array as da
# Generate synthetic dataset
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
# Convert data to Dask arrays
X_dask = da.from_array(X, chunks=100)
y_dask = da.from_array(y, chunks=100)
# --------------------
# pip install --upgrade dask-ml dask scikit-learn
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X_dask, y_dask, random_state=42)
# Initialize the logistic regression model
model = LogisticRegression()
# Fit the model
model.fit(X_train, y_train)
# Predict on the test set
y_pred = model.predict(X_test)
# Calculate accuracy
accuracy = accuracy_score(y_test.compute(), y_pred.compute())
print(f"Accuracy: {accuracy}")
client.close()
二、构建新的环境镜像
问题:
在dask worker 节点 没有相关的依赖,如何在 worker 节点的官方镜像中 ghcr.io/dask/dask:latest 添加 dask-ml scikit-learn 依赖库?请给出好的方法
要在 ghcr.io/dask/dask:latest 官方镜像中的 Dask worker 节点上添加 dask-ml 和 scikit-learn 依赖库,你可以通过创建自定义的 Docker 镜像来实现。下面是具体的步骤:
1、编写 Dockerfile
首先,在本地创建一个名为 Dockerfile 的文件,用于构建自定义的 Dask worker 镜像。
# 基于官方 Dask 镜像构建
FROM ghcr.io/dask/dask:latest
# 在镜像中安装所需的 Python 包
RUN pip install \
dask-ml \
scikit-learn \
xgboost \
dask_xgboost
# 添加任何其他自定义配置或依赖项的安装步骤
# 设定入口点(可选)
CMD ["dask-worker", "tcp://scheduler:8786"]
2、构建自定义镜像
在包含 Dockerfile 的目录中执行以下命令,将会构建一个新的镜像:
# 构建
docker build -t my-dask-worker:v1.0.0 -f Dockerfile .
# 打tag
docker tag my-dask-worker:v1.0.0 magic-harbor.medemo.com/jupyterhub/my-dask-worker:v1.0.0
docker push magic-harbor.demo.com/jupyterhub/my-dask-worker:v1.0.0
其中,my-dask-worker 是你为新镜像命名的标签。请确保你在构建之前已经登录到 Docker Hub 或者你的容器注册表,这样就可以成功构建和存储镜像。
3、启动 Dask 集群
使用你的自定义镜像启动 Dask 集群。可以使用 docker-compose 或者其他适合你环境的方法启动:
# docker-compose.yml 示例文件
version: '3'
services:
scheduler:
image: ghcr.io/dask/dask:latest
command: ["dask-scheduler"]
ports:
- "8786:8786"
- "8787:8787"
worker:
image: my-dask-worker # 使用你刚刚构建的自定义镜像
command: ["dask-worker", "tcp://scheduler:8786"]
depends_on:
- scheduler
在这个例子中,worker 服务使用了我们刚刚构建的 my-dask-worker 镜像,它包含了 dask-ml 和 scikit-learn。
4、验证依赖安装
启动完毕后,你可以连接到 Dask 的 Web UI 或者使用 docker exec 命令进入到 worker 容器中,验证 dask-ml 和 scikit-learn 是否成功安装和可用。
docker exec -it <worker_container_id> bash
# 然后在容器内部验证:
python -c "import dask_ml; print(dask_ml.__version__)"
python -c "import sklearn; print(sklearn.__version__)"
总结
通过创建一个基于官方 Dask 镜像的自定义 Docker 镜像,并在其中安装所需的 Python 包,你可以在 Dask worker 节点上添加 dask-ml 和 scikit-learn 的依赖库。这种方法允许你灵活地扩展和定制 Dask 集群的功能,以满足特定的数据处理和机器学习需求。
使用KubeCluster 动态创建dask
# 设置 KUBECONFIG 环境变量
import os
os.environ['KUBECONFIG'] = '/home/jovyan/101_kubeconfig'
# 调用k8s资源,必须使用kube-config文件
# from kubernetes import client, config
# config.load_incluster_config()
# print("Before loading kube config")
# config.load_kube_config(config_file="/home/jovyan/magic_k8s_config-deeplearning")
# print("After loading kube config")
from dask_kubernetes.operator import KubeCluster
print("KubeCluster loading success")
# cluster.scale(10)
cluster = KubeCluster(
name="kube-dask-cluster",
image="magic-harbor.magic.com/jupyterhub/my-dask-worker:v1.0.0",
namespace="jhub",
resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "4Gi"}},
)
cluster.scale(3)
# 扩缩容 worker
# Scale down the cluster
# cluster.scale(2)
# 连接集群
from dask.distributed import Client
# Connect Dask to the cluster
client = Client(cluster)
# Finally delete the cluster by running
cluster.close()
101_kubeconfig 文件内容:
apiVersion: v1
clusters:
- cluster:
certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FUR...QVRFLS0tLS0K
server: https://192.168.1.248:8443
name: yc-k8s
contexts:
- context:
cluster: yc-k8s
user: admin
name: context-yc-k8s
current-context: context-yc-k8s
kind: Config
preferences: {}
users:
- name: admin
user:
client-certificate-data: LS0tLS1CRUdJTiBDRVJU...UJoTUNRMDR4RVRBUEJnTlZCQWdUQ0VoaGJtZGFhRzkxTVFzd0NRLQo=
client-key-data: LS0tLS1CRUdJTiBSU0EgUFJ...SBLRVktLS0tLQo=
查看动态创建的 dask worker 服务:
[root@192.168.1.101 ~]#kubectl get pods -A | grep dask
dask-operator dask-kubernetes-operator-686bfb958f-kscjf 1/1 Running 0 30d
jhub kube-dask-cluster-default-worker-5d569caaeb-7cfb7884f8-4gjxm 1/1 Running 0 19s
jhub kube-dask-cluster-default-worker-b6acb0fd1d-5b7fc7c9f4-l2skx 1/1 Running 0 19s
jhub kube-dask-cluster-default-worker-e3fbb21383-cc58cf5d-9fbhk 1/1 Running 0 19s
jhub kube-dask-cluster-scheduler-7fcff99b7d-7zxmh 1/1 Running 0 36s
在Jupyter 中调试:
二、实战
分布式建模示例
示例一:Create Scikit-Learn Estimator
https://examples.dask.org/machine-learning.html
# 设置 KUBECONFIG 环境变量
import os
os.environ['KUBECONFIG'] = '/home/jovyan/101_kubeconfig'
# 创建分布式
from dask_kubernetes.operator import KubeCluster
print("KubeCluster loading success")
# cluster.scale(10)
cluster = KubeCluster(
name="kube-dask-cluster",
image="magic-harbor.magic.com/jupyterhub/my-dask-worker:v1.0.0",
namespace="jhub",
resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "4Gi"}},
)
cluster.scale(3)
# 连接集群
from dask.distributed import Client
# Connect Dask to the cluster
client = Client(cluster)
# 建模
from sklearn.datasets import make_classification
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
import pandas as pd
X, y = make_classification(n_samples=1000, random_state=0)
X[:5]
param_grid = {"C": [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
"kernel": ['rbf', 'poly', 'sigmoid'],
"shrinking": [True, False]}
grid_search = GridSearchCV(SVC(gamma='auto', random_state=0, probability=True),
param_grid=param_grid,
return_train_score=False,
cv=3,
n_jobs=-1)
# To fit that normally, we would call
grid_search.fit(X, y)
# To fit it using the cluster, we just need to use a context manager provided by joblib.
import joblib
# 使用了joblib提供的上下文管理器,并选择了使用Dask作为并行化的后端。Dask是一个开源的并行计算框架,可以在集群上运行分布式任务。
# 通过使用joblib.parallel_backend('dask'),可以将网格搜索的任务分发到Dask集群中的多个节点上并行执行,从而加速模型的训练和超参数搜索过程。这种方法特别适用于大规模数据和需要耗时的模型训练任务,能够显著提高计算效率和性能。
with joblib.parallel_backend('dask'):
grid_search.fit(X, y)
pd.DataFrame(grid_search.cv_results_).head()
grid_search.predict(X)[:5]
grid_search.score(X, y)
# 0.983
# Finally delete the cluster by running
client.close()
cluster.close()
相关文章:
K8S 离线部署 Dask 分布式集群及 JupyterHub
Dask KubeCluster
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)