如何使用Mars Remote API执行Python函数

90次阅读
没有评论

共计 9414 个字符,预计需要花费 24 分钟才能阅读完成。

本篇内容介绍了“如何使用 Mars Remote API 执行 Python 函数”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

Mars 是一个并行和分布式 Python 框架,能轻松把单机大家耳熟能详的的 numpy、pandas、scikit-learn 等库,以及 Python 函数利用多核或者多机加速。这其中,并行和分布式 Python 函数主要利用 Mars Remote API。

启动 Mars 分布式环境可以参考:

命令行方式在集群中部署。

Kubernetes 中部署。

MaxCompute 开箱即用的环境,购买了 MaxCompute 服务的可以直接使用。

如何使用 Mars Remote API

使用 Mars Remote API 非常简单,只需要对原有的代码做少许改动,就可以分布式执行。

拿用蒙特卡洛方法计算 π 为例。代码如下,我们编写了两个函数,calc_chunk  用来计算每个分片内落在圆内的点的个数,calc_pi  用来把多个分片  calc_chunk  计算的结果汇总最后得出 π 值。

from typing import Listimport numpy as npdef calc_chunk(n: int, i: int):#  计算 n 个随机点(x 和 y 轴落在 - 1 到 1 之间)到原点距离小于 1 的点的个数 rs = np.random.RandomState(i)
 a = rs.uniform(-1, 1, size=(n, 2))
 d = np.linalg.norm(a, axis=1)return (d   1).sum()def calc_pi(fs: List[int], N: int):#  将若干次  calc_chunk  计算的结果汇总,计算  pi  的值 return sum(fs) * 4 / N
N = 200_000_000
n = 10_000_000
fs = [calc_chunk(n, i) for i in range(N // n)]
pi = calc_pi(fs, N)
print(pi)

%%time  下可以看到结果:

3.1416312
CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s
Wall time: 12.3 s

在单机需要 12.3 s。

要让这个计算使用 Mars Remote API 并行起来,我们不需要对函数做任何改动,需要变动的仅仅是最后部分。

import mars.remote as mr#  函数调用改成  mars.remote.spawnfs = [mr.spawn(calc_chunk, args=(n, i)) for i in range(N // n)]#  把  spawn  的列表传入作为参数,再  spawn  新的函数 pi = mr.spawn(calc_pi, args=(fs, N))#  通过  execute()  触发执行,fetch()  获取结果 print(pi.execute().fetch())

%%time  下看到结果:

3.1416312
CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms
Wall time: 2.85 s

结果一模一样,但是却有数倍的性能提升。

可以看到,对已有的 Python 代码,Mars remote API 几乎不需要做多少改动,就能有效并行和分布式来加速执行过程。

一个例子

为了让读者理解 Mars Remote API 的作用,我们从另一个例子开始。现在我们有一个数据集,我们希望对它们做一个分类任务。要做分类,我们有很多算法和库可以选择,这里我们用 RandomForest、LogisticRegression,以及 XGBoost。

困难的地方是,除了有多个模型选择,这些模型也会包含多个超参,那哪个超参效果最好呢?对于调参不那么有经验的同学,跑过了才知道。所以,我们希望能生成一堆可选的超参,然后把他们都跑一遍,看看效果。

准备数据

这个例子里我们使用  otto 数据集。

首先,我们准备数据。读取数据后,我们按 2:1 的比例把数据分成训练集和测试集。

import pandas as pdfrom sklearn.preprocessing import LabelEncoderfrom sklearn.model_selection import train_test_splitdef gen_data():df = pd.read_csv( otto/train.csv)
 
 X = df.drop([target ,  id], axis=1)
 y = df[target]
 
 label_encoder = LabelEncoder()
 label_encoder.fit(y)
 y = label_encoder.transform(y) return train_test_split(X, y, test_size=0.33, random_state=123)
X_train, X_test, y_train, y_test = gen_data()

模型

接着,我们使用 scikit-learn 的 RandomForest 和 LogisticRegression 来处理分类。

RandomForest:

from sklearn.ensemble import RandomForestClassifierdef random_forest(X_train: pd.DataFrame, 
 y_train: pd.Series, 
 verbose: bool = False,
 **kw):model = RandomForestClassifier(verbose=verbose, **kw)
 model.fit(X_train, y_train)return model

接着,我们生成供 RandomForest 使用的超参,我们用 yield 的方式来迭代返回。

def gen_random_forest_parameters():for n_estimators in [50, 100, 600]:for max_depth in [None, 3, 15]:for criterion in [gini ,  entropy]:yield { n_estimators : n_estimators, max_depth : max_depth, criterion : criterion
 }

LogisticRegression 也是这个过程。我们先定义模型。

from sklearn.linear_model import LogisticRegressiondef logistic_regression(X_train: pd.DataFrame,
 y_train: pd.Series,
 verbose: bool = False,
 **kw):model = LogisticRegression(verbose=verbose, **kw)
 model.fit(X_train, y_train)return model

接着生成供 LogisticRegression 使用的超参。

def gen_lr_parameters():for penalty in [ l2 ,  none]:for tol in [0.1, 0.01, 1e-4]:yield { penalty : penalty, tol : tol
 }

XGBoost 也是一样,我们用  XGBClassifier  来执行分类任务。

from xgboost import XGBClassifierdef xgb(X_train: pd.DataFrame,
 y_train: pd.Series,
 verbose: bool = False,
 **kw):model = XGBClassifier(verbosity=int(verbose), **kw)
 model.fit(X_train, y_train)return model

生成一系列超参。

def gen_xgb_parameters():for n_estimators in [100, 600]:for criterion in [gini ,  entropy]:for learning_rate in [0.001, 0.1, 0.5]:yield { n_estimators : n_estimators, criterion : criterion, learning_rate : learning_rate
 }

验证

接着我们编写验证逻辑,这里我们使用  log_loss  来作为评价函数。

from sklearn.metrics import log_lossdef metric_model(model, 
 X_test: pd.DataFrame,
 y_test: pd.Series) -  float:if isinstance(model, bytes):
 model = pickle.loads(model)
 y_pred = model.predict_proba(X_test)return log_loss(y_test, y_pred)def train_and_metric(train_func,
 train_params: dict,
 X_train: pd.DataFrame, 
 y_train: pd.Series, 
 X_test: pd.DataFrame, 
 y_test: pd.Series,
 verbose: bool = False
 ):#  把训练和验证封装到一起 model = train_func(X_train, y_train, verbose=verbose, **train_params)
 metric = metric_model(model, X_test, y_test)return model, metric

找出最好的模型

做好准备工作后,我们就开始来跑模型了。针对每个模型,我们把每次生成的超参们送进去训练,除了这些超参,我们还把  n_jobs  设成 -1,这样能更好利用单机的多核。

results = []# -------------# Random Forest# -------------for params in gen_random_forest_parameters():
 print(f calculating on {params} )# fixed random_stateparams[random_state] = 123# use all CPU coresparams[n_jobs] = -1model, metric = train_and_metric(random_forest, params,
 X_train, y_train,
 X_test, y_test)
 print(f metric: {metric} )
 results.append({model : model,  metric : metric}) 
# -------------------# Logistic Regression# -------------------for params in gen_lr_parameters():
 print(f calculating on {params} )# fixed random_stateparams[random_state] = 123# use all CPU coresparams[n_jobs] = -1model, metric = train_and_metric(logistic_regression, params,
 X_train, y_train,
 X_test, y_test)
 print(f metric: {metric} )
 results.append({model : model,  metric : metric}) 
# -------# XGBoost# -------for params in gen_xgb_parameters():
 print(f calculating on {params} )# fixed random_stateparams[random_state] = 123# use all CPU coresparams[n_jobs] = -1model, metric = train_and_metric(xgb, params,
 X_train, y_train,
 X_test, y_test)
 print(f metric: {metric} )
 results.append({model : model,  metric : metric})

运行一下,需要相当长时间,我们省略掉一部分输出内容。

calculating on {n_estimators : 50,  max_depth : None,  criterion :  gini}
metric: 0.6964123781828575calculating on {n_estimators : 50,  max_depth : None,  criterion :  entropy}
metric: 0.6912312790832288#  省略其他模型的输出结果 CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s
Wall time: 31min 44s

从 CPU 时间和 Wall 时间,能看出来这些训练还是充分利用了多核的性能。但整个过程还是花费了 31 分钟。

使用 Remote API 分布式加速

现在我们尝试使用 Remote API 通过分布式方式加速整个过程。

集群方面,我们使用最开始说的第三种方式,直接在 MaxCompute 上拉起一个集群。大家可以选择其他方式,效果是一样的。

n_cores = 8mem = 2 * n_cores # 16G# o  是  MaxCompute  入口,这里创建  10  个  worker  的集群,每个  worker 8 核 16Gcluster = o.create_mars_cluster(10, n_cores, mem, image= extended)

为了方便在分布式读取数据,我们对数据处理稍作改动,把数据上传到 MaxCompute 资源。对于其他环境,用户可以考虑 HDFS、Aliyun OSS 或者 Amazon S3 等存储。

if not o.exist_resource(otto_train.csv):with open(otto/train.csv) as f:#  上传资源 o.create_resource(otto_train.csv ,  file , fileobj=f) 
def gen_data():#  改成从资源读取 df = pd.read_csv(o.open_resource( otto_train.csv))
 
 X = df.drop([target ,  id], axis=1)
 y = df[target]
 
 label_encoder = LabelEncoder()
 label_encoder.fit(y)
 y = label_encoder.transform(y) return train_test_split(X, y, test_size=0.33, random_state=123)

稍作改动之后,我们使用  mars.remote.spawn  方法来让  gen_data  调度到集群上运行。

import mars.remote as mr# n_output  说明是  4  输出 # execute()  执行后,数据会读取到  Mars  集群内部 data = mr.ExecutableTuple(mr.spawn(gen_data, n_output=4)).execute()# remote_  开头的都是  Mars  对象,这时候数据在集群内,这些对象只是引用 remote_X_train, remote_X_test, remote_y_train, remote_y_test = data

目前 Mars 能正确序列化 numpy ndarray、pandas DataFrame 等,还不能序列化模型,所以,我们要对  train_and_metric  稍作改动,把模型 pickle 了之后再返回。

def distributed_train_and_metric(train_func,
 train_params: dict,
 X_train: pd.DataFrame, 
 y_train: pd.Series, 
 X_test: pd.DataFrame, 
 y_test: pd.Series,
 verbose: bool = False
 ):model, metric = train_and_metric(train_func, train_params,
 X_train, y_train, 
 X_test, y_test, verbose=verbose)return pickle.dumps(model), metric

后续 Mars 支持了序列化模型后可以直接 spawn 原本的函数。

接着我们就对前面的执行过程稍作改动,把函数调用全部都用  mars.remote.spawn  来改写。

import numpy as np
tasks = []
models = []
metrics = []# -------------# Random Forest# -------------for params in gen_random_forest_parameters():# fixed random_stateparams[random_state] = 123task = mr.spawn(distributed_train_and_metric,
 args=(random_forest, params,
 remote_X_train, remote_y_train,
 remote_X_test, remote_y_test), 
 kwargs={verbose : 2},
 n_output=2)
 tasks.extend(task)#  把模型和评价分别存储 models.append(task[0])
 metrics.append(task[1]) 
 
# -------------------# Logistic Regression# -------------------for params in gen_lr_parameters():# fixed random_stateparams[ random_state] = 123task = mr.spawn(distributed_train_and_metric,
 args=(logistic_regression, params,
 remote_X_train, remote_y_train,
 remote_X_test, remote_y_test), 
 kwargs={verbose : 2},
 n_output=2)
 tasks.extend(task)#  把模型和评价分别存储 models.append(task[0])
 metrics.append(task[1])# -------# XGBoost# -------for params in gen_xgb_parameters():# fixed random_stateparams[ random_state] = 123#  再指定并发为核的个数 params[n_jobs] = n_cores
 task = mr.spawn(distributed_train_and_metric,
 args=(xgb, params,
 remote_X_train, remote_y_train,
 remote_X_test, remote_y_test), 
 kwargs={verbose : 2},
 n_output=2)
 tasks.extend(task)#  把模型和评价分别存储 models.append(task[0])
 metrics.append(task[1])#  把顺序打乱,目的是能分散到  worker  上平均一点 shuffled_tasks = np.random.permutation(tasks)
_ = mr.ExecutableTuple(shuffled_tasks).execute()

可以看到代码几乎一致。

运行查看结果:

CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms
Wall time: 1min 59s

时间一下子从 31 分钟多来到了 2 分钟,提升 15x+。但代码修改的代价可以忽略不计。

细心的读者可能注意到了,分布式运行的代码中,我们把模型的 verbose 给打开了,在分布式环境下,因为这些函数远程执行,打印的内容只会输出到 worker 的标准输出流,我们在客户端不会看到打印的结果,但 Mars 提供了一个非常有用的接口来让我们查看每个模型运行时的输出。

以第 0 个模型为例,我们可以在 Mars 对象上直接调用  fetch_log  方法。

print(models[0].fetch_log())

输出我们简略一部分。

building tree 1 of 50building tree 2 of 50building tree 3 of 50building tree 4 of 50building tree 5 of 50building tree 6 of 50#  中间省略 building tree 49 of 50building tree 50 of 50

要看哪个模型都可以通过这种方式。试想下,如果没有  fetch_log API,你确想看中间过程的输出有多麻烦。首先这个函数在哪个 worker 上执行,不得而知;然后,即便知道是哪个 worker,因为每个 worker 上可能有多个函数执行,这些输出就可能混杂在一起,甚至被庞大日志淹没了。fetch_log  接口让用户不需要关心在哪个 worker 上执行,也不用担心日志混合在一起。

想要了解  fetch_log  接口,可以查看   文档。

“如何使用 Mars Remote API 执行 Python 函数”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-25发表,共计9414字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)