共计 7982 个字符,预计需要花费 20 分钟才能阅读完成。
这篇文章给大家分享的是有关 kubernetes 中 rolling update 机制的示例分析的内容。丸趣 TV 小编觉得挺实用的,因此分享给大家做个参考,一起跟随丸趣 TV 小编过来看看吧。
##0. 命令行和依赖的基础知识
Synopsis
Perform a rolling update of the given ReplicationController.
Replaces the specified controller with new controller, updating one pod at a time to use the
new PodTemplate. The new-controller.json must specify the same namespace as the
existing controller and overwrite at least one (common) label in its replicaSelector.
kubectl rolling-update OLD_CONTROLLER_NAME -f NEW_CONTROLLER_SPEC
Examples
// Update pods of frontend-v1 using new controller data in frontend-v2.json.
$ kubectl rolling-update frontend-v1 -f frontend-v2.json
// Update pods of frontend-v1 using JSON data passed into stdin.
$ cat frontend-v2.json | kubectl rolling-update frontend-v1 -f -
ReplicationController,简称 rc,是 kubernet 体系中某一种类型 pod 的集合,rc 有一个关键参数叫做 replicas,也是就是 pod 的数量。
那么 rc 有什么用呢?这是为了解决在集群上一堆 pod 中有些如果挂了,那么就在别的宿主机上把容器启动起来,并让业务流量导入到正确启动的 pod 上。也就是说,rc 保证了集群服务的可用性,当你有很多个服务启动在一个集群中,你需要用程序去监控这些服务的运行状况,并动态保证服务可用。
rc 和 pod 的对应关系是怎么样的?rc 通过 selector 来选择一些 pod 作为他的控制范围。只要 pod 的标签(label)符合 seletor,则属于这个 rc,下面是 pod 和 rc 的示例。
xx-controller.json
spec :{
replicas :1,
selector :{
name : redis ,
role : master
},
xx-pod.json
labels : {
name : redis
},
kubernetes 被我们简称为 k8s,如果对其中的基础概念有兴趣可以看这篇
##1.kubctl 入口
/cmd/kubectl/kubctl.go
func main() {runtime.GOMAXPROCS(runtime.NumCPU())
cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), os.Stdin, os.Stdout, os.Stderr)
if err := cmd.Execute(); err != nil {os.Exit(1)
}
##2. 实际调用
源代码在 pkg 包内,/pkg/kubectl/cmd/cmd.go,每个子命令都实现统一的接口,rollingupdate 这行是:
cmds.AddCommand(NewCmdRollingUpdate(f, out))
这个函数的实现在:/pkg/kubectl/cmd/rollingupdate.go
func NewCmdRollingUpdate(f *cmdutil.Factory, out io.Writer) *cobra.Command {
cmd := cobra.Command{
Use: rolling-update OLD_CONTROLLER_NAME -f NEW_CONTROLLER_SPEC ,
// rollingupdate is deprecated.
Aliases: []string{ rollingupdate},
Short: Perform a rolling update of the given ReplicationController. ,
Long: rollingUpdate_long,
Example: rollingUpdate_example,
Run: func(cmd *cobra.Command, args []string) {err := RunRollingUpdate(f, out, cmd, args)
cmdutil.CheckErr(err)
}
可以看到实际调用时的执行函数是 RunRollingUpdate,算是进入正题了
func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []string) error {mapper, typer := f.Object()
// TODO: use resource.Builder instead
obj, err := resource.NewBuilder(mapper, typer, f.ClientMapperForCommand()).
NamespaceParam(cmdNamespace).RequireNamespace().
FilenameParam(filename).
Do().
Object()
if err != nil {
return err
newRc, ok := obj.(*api.ReplicationController)
if !ok {return cmdutil.UsageError(cmd, %s does not specify a valid ReplicationController , filename)
}
这是建立一个新的 rc 的代码,其中 resource 是 kubneter 所有资源 (pod,service,rc) 的基类。可以看到新的 rc 从 json 参数文件中获取所有信息,然后转义为 ReplicationController 这个类。
if oldName == newName {
return cmdutil.UsageError(cmd, %s cannot have the same name as the existing ReplicationController %s ,
filename, oldName)
var hasLabel bool
for key, oldValue := range oldRc.Spec.Selector {if newValue, ok := newRc.Spec.Selector[key]; ok newValue != oldValue {
hasLabel = true
break
if !hasLabel {
return cmdutil.UsageError(cmd, %s must specify a matching key with non-equal value in Selector for %s ,
filename, oldName)
}
这里可以看到,对于新的 rc 和旧的 rc,有 2 项限制,一个是新旧名字需要不同,另一个是 rc 的 selector 中需要至少有一项的值不一样。
updater := kubectl.NewRollingUpdater(newRc.Namespace, client)
// fetch rc
oldRc, err := client.ReplicationControllers(newRc.Namespace).Get(oldName)
if err != nil {
return err
err = updater.Update(out, oldRc, newRc, period, interval, timeout)
if err != nil {return err}
在做 rolling update 的时候,有两个条件限制,一个是新的 rc 的名字需要和旧的不一样,第二是至少有个一个标签的值不一样。其中 namespace 是 k8s 用来做多租户资源隔离的,可以先忽略不计。
##3. 数据结构和实现
这段代码出现了 NewRollingUpdater,是在上一层的 /pkg/kubectl/rollingupdate.go 这个文件中,更加接近主体了
// RollingUpdater provides methods for updating replicated pods in a predictable,
// fault-tolerant way.
type RollingUpdater struct {
// Client interface for creating and updating controllers
c client.Interface
// Namespace for resources
ns string
}
可以看到这里的 RollingUpdater 里面是一个 k8s 的 client 的结构来向 api server 发送命令
func (r *RollingUpdater) Update(out io.Writer, oldRc, newRc *api.ReplicationController, updatePeriod, interval, timeout time.Duration) error {
oldName := oldRc.ObjectMeta.Name
newName := newRc.ObjectMeta.Name
retry := RetryParams{interval, timeout}
waitForReplicas := RetryParams{interval, timeout}
if newRc.Spec.Replicas = 0 {return fmt.Errorf( Invalid controller spec for %s; required: 0 replicas, actual: %s\n , newName, newRc.Spec)
desired := newRc.Spec.Replicas
sourceId := fmt.Sprintf(%s:%s , oldName, oldRc.ObjectMeta.UID)
// look for existing newRc, incase this update was previously started but interrupted
rc, existing, err := r.getExistingNewRc(sourceId, newName)
if existing {fmt.Fprintf(out, Continuing update with existing controller %s.\n , newName)
if err != nil {
return err
replicas := rc.ObjectMeta.Annotations[desiredReplicasAnnotation]
desired, err = strconv.Atoi(replicas)
if err != nil {
return fmt.Errorf( Unable to parse annotation for %s: %s=%s ,
newName, desiredReplicasAnnotation, replicas)
newRc = rc
} else {fmt.Fprintf(out, Creating %s\n , newName)
if newRc.ObjectMeta.Annotations == nil {newRc.ObjectMeta.Annotations = map[string]string{}
newRc.ObjectMeta.Annotations[desiredReplicasAnnotation] = fmt.Sprintf(%d , desired)
newRc.ObjectMeta.Annotations[sourceIdAnnotation] = sourceId
newRc.Spec.Replicas = 0
newRc, err = r.c.ReplicationControllers(r.ns).Create(newRc)
if err != nil {
return err
// +1, -1 on oldRc, newRc until newRc has desired number of replicas or oldRc has 0 replicas
for newRc.Spec.Replicas desired oldRc.Spec.Replicas != 0 {
newRc.Spec.Replicas += 1
oldRc.Spec.Replicas -= 1
fmt.Printf( At beginning of loop: %s replicas: %d, %s replicas: %d\n ,
oldName, oldRc.Spec.Replicas,
newName, newRc.Spec.Replicas)
fmt.Fprintf(out, Updating %s replicas: %d, %s replicas: %d\n ,
oldName, oldRc.Spec.Replicas,
newName, newRc.Spec.Replicas)
newRc, err = r.resizeAndWait(newRc, retry, waitForReplicas)
if err != nil {
return err
time.Sleep(updatePeriod)
oldRc, err = r.resizeAndWait(oldRc, retry, waitForReplicas)
if err != nil {
return err
fmt.Printf( At end of loop: %s replicas: %d, %s replicas: %d\n ,
oldName, oldRc.Spec.Replicas,
newName, newRc.Spec.Replicas)
// delete remaining replicas on oldRc
if oldRc.Spec.Replicas != 0 {
fmt.Fprintf(out, Stopping %s replicas: %d - %d\n ,
oldName, oldRc.Spec.Replicas, 0)
oldRc.Spec.Replicas = 0
oldRc, err = r.resizeAndWait(oldRc, retry, waitForReplicas)
// oldRc, err = r.resizeAndWait(oldRc, interval, timeout)
if err != nil {
return err
// add remaining replicas on newRc
if newRc.Spec.Replicas != desired {
fmt.Fprintf(out, Resizing %s replicas: %d - %d\n ,
newName, newRc.Spec.Replicas, desired)
newRc.Spec.Replicas = desired
newRc, err = r.resizeAndWait(newRc, retry, waitForReplicas)
if err != nil {
return err
// Clean up annotations
if newRc, err = r.c.ReplicationControllers(r.ns).Get(newName); err != nil {
return err
delete(newRc.ObjectMeta.Annotations, sourceIdAnnotation)
delete(newRc.ObjectMeta.Annotations, desiredReplicasAnnotation)
newRc, err = r.updateAndWait(newRc, interval, timeout)
if err != nil {
return err
// delete old rc
fmt.Fprintf(out, Update succeeded. Deleting %s\n , oldName)
return r.c.ReplicationControllers(r.ns).Delete(oldName)
}
这段代码很长,但做的事情很简单:
如果新的 rc 没有被创建,就先创一下,如果已经创建了(在上次的 rolling_update 中创建了但超时了)
用几个循环,把新的 rc 的 replicas 增加上去,旧的 rc 的 replicas 降低下来,主要调用的函数是 resizeAndWait 和 updateAndWait
##4. 底层调用
接上一节的 resizeAndWait,代码在 /pkg/kubectl/resize.go,这里的具体代码就不贴了 其余的所有调用都发生 /pkg/client 这个目录下,这是一个 http/json 的 client,主要功能就是向 api-server 发送请求 整体来说,上面的 wait 的实现都是比较土的,就是发一个 update 请求过去,后面轮询的调用 get 来检测状态是否符合最终需要的状态。
##5. 总结
先说一下这三个时间参数的作用:
update-period:新 rc 增加一个 pod 后,等待这个 period,然后从旧 rc 缩减一个 pod poll-interval:这个函数名来源于 linux 上的 poll 调用,就是每过一个 poll-interval,向服务端发起请求,直到这个请求成功或者报失败 timeout:总操作的超时时间
rolling update 主要是客户端这边实现的,分析完了,但还是有一些未知的问题,例如:
api-server, cadvisor, kubelet, proxy, etcd 这些服务端组件是怎么交互的?怎么保证在服务一直可用的情况下增减 pod?
是否有可能在 pod 增减的时候插入自己的一些代码或者过程?因为我们目前的架构中没有使用 k8s 的 proxy,需要自己去调用负载均衡的系统给这些 pod 导流量
对于具体的 pod,我们怎么去做内部程序的健康检查?在业务不可用的情况下向 k8s 系统发送消息,干掉这个 pod,在别的机器上创建新的来替代。
感谢各位的阅读!关于“kubernetes 中 rolling update 机制的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!