怎样实现kubeproxy源码分析

96次阅读
没有评论

共计 13092 个字符,预计需要花费 33 分钟才能阅读完成。

本篇文章给大家分享的是有关怎样实现 kubeproxy 源码分析,丸趣 TV 小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着丸趣 TV 小编一起来看看吧。

kubernetes 离线安装包

kube-proxy 源码解析

ipvs 相对于 iptables 模式具备较高的性能与稳定性, 本文讲以此模式的源码解析为主,如果想去了解 iptables 模式的原理,可以去参考其实现,架构上无差别。

kube-proxy 主要功能是监听 service 和 endpoint 的事件,然后下放代理策略到机器上。底层调用 docker/libnetwork, 而 libnetwork 最终调用了 netlink 与 netns 来实现 ipvs 的创建等动作

!–more–

初始化配置

代码入口:cmd/kube-proxy/app/server.go Run() 函数

通过命令行参数去初始化 proxyServer 的配置

proxyServer, err := NewProxyServer(o)
type ProxyServer struct {
 // k8s client
 Client clientset.Interface
 EventClient v1core.EventsGetter
 // ipvs  相关接口
 IptInterface utiliptables.Interface
 IpvsInterface utilipvs.Interface
 IpsetInterface utilipset.Interface
 //  处理同步时的处理器
 Proxier proxy.ProxyProvider
 //  代理模式,ipvs iptables userspace kernelspace(windows)四种
 ProxyMode string
 //  配置同步周期
 ConfigSyncPeriod time.Duration
 // service  与  endpoint  事件处理器
 ServiceEventHandler config.ServiceHandler
 EndpointsEventHandler config.EndpointsHandler
}

Proxier 是主要入口,抽象了两个函数:

type ProxyProvider interface {
 // Sync immediately synchronizes the ProxyProvider s current state to iptables.
 Sync()
 //  定期执行
 SyncLoop()}

ipvs 的 interface 这个很重要:

type Interface interface {
 //  删除所有规则
 Flush() error
 //  增加一个 virtual server
 AddVirtualServer(*VirtualServer) error
 UpdateVirtualServer(*VirtualServer) error
 DeleteVirtualServer(*VirtualServer) error
 GetVirtualServer(*VirtualServer) (*VirtualServer, error)
 GetVirtualServers() ([]*VirtualServer, error)
 //  给 virtual server 加个 realserver,  如  VirtualServer 就是一个 clusterip realServer 就是 pod(或者自定义的 endpoint)
 AddRealServer(*VirtualServer, *RealServer) error
 GetRealServers(*VirtualServer) ([]*RealServer, error)
 DeleteRealServer(*VirtualServer, *RealServer) error
}

我们在下文再详细看 ipvs_linux 是如何实现上面接口的

virtual server 与 realserver, 最重要的是 ip:port,然后就是一些代理的模式如 sessionAffinity 等:

type VirtualServer struct {
 Address net.IP
 Protocol string
 Port uint16
 Scheduler string
 Flags ServiceFlags
 Timeout uint32
type RealServer struct {
 Address net.IP
 Port uint16
 Weight int
}

创建 apiserver client

client, eventClient, err := createClients(config.ClientConnection, master)

创建 Proxier 这是仅仅关注 ipvs 模式的 proxier

else if proxyMode == proxyModeIPVS {glog.V(0).Info(Using ipvs Proxier.)
 proxierIPVS, err := ipvs.NewProxier(
 iptInterface,
 ipvsInterface,
 ipsetInterface,
 utilsysctl.New(),
 execer,
 config.IPVS.SyncPeriod.Duration,
 config.IPVS.MinSyncPeriod.Duration,
 config.IPTables.MasqueradeAll,
 int(*config.IPTables.MasqueradeBit),
 config.ClusterCIDR,
 hostname,
 getNodeIP(client, hostname),
 recorder,
 healthzServer,
 config.IPVS.Scheduler,
 proxier = proxierIPVS
serviceEventHandler = proxierIPVS
 endpointsEventHandler = proxierIPVS

这个 Proxier 具备以下方法:

 +OnEndpointsAdd(endpoints *api.Endpoints)
 +OnEndpointsDelete(endpoints *api.Endpoints)
 +OnEndpointsSynced()
 +OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints)
 +OnServiceAdd(service *api.Service)
 +OnServiceDelete(service *api.Service)
 +OnServiceSynced()
 +OnServiceUpdate(oldService, service *api.Service)
 +Sync()
 +SyncLoop()

所以 ipvs 的这个 Proxier 实现了我们需要的绝大部分接口

小结一下:

 +-----------  endpointHandler
 |
 +-----------  serviceHandler
 | ^
 | | +-------------  sync  定期同步等
 | | |
ProxyServer---------  Proxier --------  service  事件回调  
 | | 
 | +-------------  endpoint 事件回调  
 | |  触发
 +-----  ipvs interface ipvs handler  -----+

启动 proxyServer

检查是不是带了 clean up 参数,如果带了那么清除所有规则退出

OOM adjuster 貌似没实现,忽略

resouceContainer 也没实现,忽略

启动 metrics 服务器,这个挺重要,比如我们想监控时可以传入这个参数, 包含 promethus 的 metrics. metrics-bind-address 参数

启动 informer, 开始监听事件,分别启动协程处理。

1 2 3 4 我们都不用太关注,细看 5 即可:

informerFactory := informers.NewSharedInformerFactory(s.Client, s.ConfigSyncPeriod)
serviceConfig := config.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod)
//  注册  service handler 并启动
serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
//  这里面仅仅是把 ServiceEventHandler 赋值给 informer 回调  
go serviceConfig.Run(wait.NeverStop)
endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod)
//  注册 endpoint 
endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
go endpointsConfig.Run(wait.NeverStop)
go informerFactory.Start(wait.NeverStop)

serviceConfig.Run 与 endpointConfig.Run 仅仅是给回调函数赋值, 所以注册的 handler 就给了 informer, informer 监听到事件时就会回调:

for i := range c.eventHandlers {glog.V(3).Infof(Calling handler.OnServiceSynced() )
 c.eventHandlers[i].OnServiceSynced()}

那么问题来了,注册进去的这个 handler 是啥?回顾一下上文的

serviceEventHandler = proxierIPVS
 endpointsEventHandler = proxierIPVS

所以都是这个 proxierIPVS

handler 的回调函数, informer 会回调这几个函数,所以我们在自己开发时实现这个 interface 注册进去即可:

type ServiceHandler interface {
 // OnServiceAdd is called whenever creation of new service object
 // is observed.
 OnServiceAdd(service *api.Service)
 // OnServiceUpdate is called whenever modification of an existing
 // service object is observed.
 OnServiceUpdate(oldService, service *api.Service)
 // OnServiceDelete is called whenever deletion of an existing service
 // object is observed.
 OnServiceDelete(service *api.Service)
 // OnServiceSynced is called once all the initial even handlers were
 // called and the state is fully propagated to local cache.
 OnServiceSynced()}

开始监听

go informerFactory.Start(wait.NeverStop)

这里执行后,我们创建删除 service endpoint 等动作都会被监听到,然后回调, 回顾一下上面的图,最终都是由 Proxier 去实现,所以后面我们重点关注 Proxier 即可

s.Proxier.SyncLoop()

然后开始 SyncLoop, 下文开讲

Proxier 实现

我们创建一个 service 时 OnServiceAdd 方法会被调用, 这里记录一下之前的状态与当前状态两个东西,然后发个信号给 syncRunner 让它去处理:

func (proxier *Proxier) OnServiceAdd(service *api.Service) {namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 if proxier.serviceChanges.update(namespacedName, nil, service)   proxier.isInitialized() {proxier.syncRunner.Run()
}

记录 service 信息, 可以看到没做什么事,就是把 service 存在 map 里, 如果没变直接删掉 map 信息不做任何处理:

change, exists := scm.items[*namespacedName]
if !exists {change =  serviceChange{}
 //  老的 service 信息
 change.previous = serviceToServiceMap(previous)
 scm.items[*namespacedName] = change
//  当前监听到的 service 信息
change.current = serviceToServiceMap(current)
如果一样,直接删除
if reflect.DeepEqual(change.previous, change.current) {delete(scm.items, *namespacedName)
}

proxier.syncRunner.Run() 里面就发送了一个信号

select {case bfr.run  - struct{}{}:
default:
}

这里面处理了这个信号

s.Proxier.SyncLoop()
func (proxier *Proxier) SyncLoop() {// Update healthz timestamp at beginning in case Sync() never succeeds.
 if proxier.healthzServer != nil {proxier.healthzServer.UpdateTimestamp()
 proxier.syncRunner.Loop(wait.NeverStop)
}

runner 里收到信号执行,没收到信号会定期执行:

func (bfr *BoundedFrequencyRunner) Loop(stop  -chan struct{}) {glog.V(3).Infof(%s Loop running , bfr.name)
 bfr.timer.Reset(bfr.maxInterval)
 for {
 select {
 case  -stop:
 bfr.stop()
 glog.V(3).Infof(%s Loop stopping , bfr.name)
 return
 case  -bfr.timer.C(): //  定期执行
 bfr.tryRun()
 case  -bfr.run:
 bfr.tryRun() //  收到事件信号执行}

这个 bfr runner 里我们最需要主意的是一个回调函数,tryRun 里检查这个回调是否满足被调度的条件:

type BoundedFrequencyRunner struct {
 name string // the name of this instance
 minInterval time.Duration // the min time between runs, modulo bursts
 maxInterval time.Duration // the max time between runs
 run chan struct{} // try an async run
 mu sync.Mutex // guards runs of fn and all mutations
 fn func() // function to run,  这个回调
 lastRun time.Time // time of last run
 timer timer // timer for deferred runs
 limiter rateLimiter // rate limiter for on-demand runs
//  传入的 proxier.syncProxyRules 这个函数
proxier.syncRunner = async.NewBoundedFrequencyRunner(sync-runner , proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)

这是个 600 行左右的搓逼函数,也是处理主要逻辑的地方。

syncProxyRules

设置一些 iptables 规则,如 mark 与 comment

确定机器上有网卡,ipvs 需要绑定地址到上面

确定有 ipset,ipset 是 iptables 的扩展,可以给一批地址设置 iptables 规则 …(又臭又长,重复代码多,看不下去了,细节问题自己去看吧)

我们最关注的,如何去处理 VirtualServer 的

serv :=  utilipvs.VirtualServer{Address: net.ParseIP(ingress.IP),
 Port: uint16(svcInfo.port),
 Protocol: string(svcInfo.protocol),
 Scheduler: proxier.ipvsScheduler,
if err := proxier.syncService(svcNameString, serv, false); err == nil {if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil {}

看下实现, 如果没有就创建,如果已存在就更新, 给网卡绑定 service 的 cluster ip:

func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool) error {appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
 if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
 if appliedVirtualServer == nil {if err := proxier.ipvs.AddVirtualServer(vs); err != nil {return err} else {if err := proxier.ipvs.UpdateVirtualServer(appliedVirtualServer); err != nil {
 return err
 // bind service address to dummy interface even if service not changed,
 // in case that service IP was removed by other processes
 if bindAddr {_, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
 if err != nil {
 return err
 return nil
}

创建 service 实现

现在可以去看 ipvs 的 AddVirtualServer 的实现了,主要是利用 socket 与内核进程通信做到的。pkg/util/ipvs/ipvs_linux.go 里 runner 结构体实现了这些方法, 这里用到了 docker/libnetwork/ipvs 库:

// runner implements Interface.
type runner struct {
 exec utilexec.Interface
 ipvsHandle *ipvs.Handle
// New returns a new Interface which will call ipvs APIs.
func New(exec utilexec.Interface) Interface {ihandle, err := ipvs.New() // github.com/docker/libnetwork/ipvs
 if err != nil {glog.Errorf( IPVS interface can t be initialized, error: %v , err)
 return nil
 return  runner{
 exec: exec,
 ipvsHandle: ihandle,
}

New 的时候创建了一个特殊的 socket, 这里与我们普通的 socket 编程无差别,关键是 syscall.AF_NETLINK 这个参数,代表与内核进程通信:

sock, err := nl.GetNetlinkSocketAt(n, netns.None(), syscall.NETLINK_GENERIC)
func getNetlinkSocket(protocol int) (*NetlinkSocket, error) {fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW|syscall.SOCK_CLOEXEC, protocol)
 if err != nil {
 return nil, err
 s :=  NetlinkSocket{fd: int32(fd),
 s.lsa.Family = syscall.AF_NETLINK
 if err := syscall.Bind(fd,  s.lsa); err != nil {syscall.Close(fd)
 return nil, err
 return s, nil
}

创建一个 service, 转换成 docker service 格式,直接调用:

// AddVirtualServer is part of Interface.
func (runner *runner) AddVirtualServer(vs *VirtualServer) error {eSvc, err := toBackendService(vs)
 if err != nil {
 return err
 return runner.ipvsHandle.NewService(eSvc)
}

然后就是把 service 信息打包,往 socket 里面写即可:

func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) {req := newIPVSRequest(cmd)
 req.Seq = atomic.AddUint32(i.seq, 1)
 if s == nil {
 req.Flags |= syscall.NLM_F_DUMP //Flag to dump all messages
 req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute
 } else {req.AddData(fillService(s))
 } //  把 service 塞到请求中
 if d == nil {
 if cmd == ipvsCmdGetDest {req.Flags |= syscall.NLM_F_DUMP} else {req.AddData(fillDestinaton(d))
 //  给内核进程发送 service 信息
 res, err := execute(i.sock, req, 0)
 if err != nil {return [][]byte{}, err
 return res, nil
}

构造请求

func newIPVSRequest(cmd uint8) *nl.NetlinkRequest {return newGenlRequest(ipvsFamily, cmd)
}

在构造请求时传入的是 ipvs 协议簇

然后构造一个与内核通信的消息头

func NewNetlinkRequest(proto, flags int) *NetlinkRequest {
 return  NetlinkRequest{
 NlMsghdr: syscall.NlMsghdr{Len: uint32(syscall.SizeofNlMsghdr),
 Type: uint16(proto),
 Flags: syscall.NLM_F_REQUEST | uint16(flags),
 Seq: atomic.AddUint32(nextSeqNr, 1),
}

给消息加 Data, 这个 Data 是个数组,需要实现两个方法:

type NetlinkRequestData interface {Len() int //  长度
 Serialize() []byte //  序列化,  内核通信也需要一定的数据格式,service 信息也需要实现
}

比如 header 是这样序列化的, 一看愣住了,思考好久才看懂:拆下看:([unsafe.Sizeof(hdr)]byte) 一个 []byte 类型,长度就是结构体大小 (unsafe.Pointer(hdr)) 把结构体转成 byte 指针类型 加个取它的值 用 [:] 转成 byte 返回

func (hdr *genlMsgHdr) Serialize() []byte {return (*(*[unsafe.Sizeof(*hdr)]byte)(unsafe.Pointer(hdr)))[:]
}

发送 service 信息给内核

一个很普通的 socket 发送接收数据

func execute(s *nl.NetlinkSocket, req *nl.NetlinkRequest, resType uint16) ([][]byte, error) {
 var (
 err error
 if err := s.Send(req); err != nil {
 return nil, err
 pid, err := s.GetPid()
 if err != nil {
 return nil, err
 var res [][]byte
done:
 for {msgs, err := s.Receive()
 if err != nil {
 return nil, err
 for _, m := range msgs {
 if m.Header.Seq != req.Seq {
 continue
 if m.Header.Pid != pid {return nil, fmt.Errorf( Wrong pid %d, expected %d , m.Header.Pid, pid)
 if m.Header.Type == syscall.NLMSG_DONE {
 break done
 if m.Header.Type == syscall.NLMSG_ERROR {error := int32(native.Uint32(m.Data[0:4]))
 if error == 0 {
 break done
 return nil, syscall.Errno(-error)
 if resType != 0   m.Header.Type != resType {
 continue
 res = append(res, m.Data)
 if m.Header.Flags syscall.NLM_F_MULTI == 0 {
 break done
 return res, nil
}

Service 数据打包 这里比较细,核心思想就是内核只认一定格式的标准数据,我们把 service 信息按其标准打包发送给内核即可。至于怎么打包的就不详细讲了。

func fillService(s *Service) nl.NetlinkRequestData {cmdAttr := nl.NewRtAttr(ipvsCmdAttrService, nil)
 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddressFamily, nl.Uint16Attr(s.AddressFamily))
 if s.FWMark != 0 {nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFWMark, nl.Uint32Attr(s.FWMark))
 } else {nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrProtocol, nl.Uint16Attr(s.Protocol))
 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddress, rawIPData(s.Address))
 // Port needs to be in network byte order.
 portBuf := new(bytes.Buffer)
 binary.Write(portBuf, binary.BigEndian, s.Port)
 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPort, portBuf.Bytes())
 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrSchedName, nl.ZeroTerminated(s.SchedName))
 if s.PEName !=   {nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPEName, nl.ZeroTerminated(s.PEName))
 f :=  ipvsFlags{
 flags: s.Flags,
 mask: 0xFFFFFFFF,
 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFlags, f.Serialize())
 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrTimeout, nl.Uint32Attr(s.Timeout))
 nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrNetmask, nl.Uint32Attr(s.Netmask))
 return cmdAttr
}

Service 总体来讲代码比较简单,但是觉得有些地方实现的有点绕,不够简单直接。总体来说就是监听 apiserver 事件,然后比对 处理,定期也会去执行同步策略。

以上就是怎样实现 kubeproxy 源码分析,丸趣 TV 小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注丸趣 TV 行业资讯频道。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计13092字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)