Device Manager在什么时候创建

87次阅读
没有评论

共计 24386 个字符,预计需要花费 61 分钟才能阅读完成。

这篇文章主要介绍“Device Manager 在什么时候创建”,在日常操作中,相信很多人在 Device Manager 在什么时候创建问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Device Manager 在什么时候创建”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

Create Device Manager InstanceDevice Manager 在何时创建

Device Manager 和 Volume Manager、QoS Container Manager 等一样,都属于 kubelet 管理的众多 Manager 之一。Device Manager 在 kubelet 启动时的 NewContainerManager 中创建。

pkg/kubelet/cm/container_manager_linux.go:197
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {glog.Infof( Creating device plugin manager: %t , devicePluginEnabled)
 if devicePluginEnabled {cm.deviceManager, err = devicemanager.NewManagerImpl()
 } else {cm.deviceManager, err = devicemanager.NewManagerStub()
 if err != nil {return nil, err}

ManagerImpl 结构体

我们有必要先了解 Device Manager 的结构体:

// ManagerImpl is the structure in charge of managing Device Plugins.
type ManagerImpl struct {
 socketname string
 socketdir string
 endpoints map[string]endpoint // Key is ResourceName
 mutex sync.Mutex
 server *grpc.Server
 // activePods is a method for listing active pods on the node
 // so the amount of pluginResources requested by existing pods
 // could be counted when updating allocated devices
 activePods ActivePodsFunc
 // sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
 // We use it to determine when we can purge inactive pods from checkpointed state.
 sourcesReady config.SourcesReady
 // callback is used for updating devices  states in one time call.
 // e.g. a new device is advertised, two old devices are deleted and a running device fails.
 callback monitorCallback
 // healthyDevices contains all of the registered healthy resourceNames and their exported device IDs.
 healthyDevices map[string]sets.String
 // unhealthyDevices contains all of the unhealthy devices and their exported device IDs.
 unhealthyDevices map[string]sets.String
 // allocatedDevices contains allocated deviceIds, keyed by resourceName.
 allocatedDevices map[string]sets.String
 // podDevices contains pod to allocated device mapping.
 podDevices podDevices
 store utilstore.Store
 pluginOpts map[string]*pluginapi.DevicePluginOptions
}

下面是核心 field 的说明:

socketname: 就是 kubelet 对外暴露的 socket 名,即 kubelet.sock。

socketdir: device plugins socket 的存放的目录,/var/lib/kubelet/device-plugins/。

endpoints: map 对象,key 为 Resource Name,value 为 endpoint 接口 (包括 run,stop,allocate,preStartContainer,getDevices,callback,isStoped,StopGracePeriodExpired),每个 endpoint 接口对应一个已注册的 device plugin,负责与 device plugin 的 gRPC 通信及缓存 device plugin 反馈的 device states。

server: Register 服务暴露的 gRPC Server。

activePods: 用来获取该节点上所有 active pods,即 non-Terminated 状态的 Pods。在 kubelet 的 initializeRuntimeDependentModules 时会注册 activePods Func 为如下函数:

// GetActivePods returns non-terminal pods
 func (kl *Kubelet) GetActivePods() []*v1.Pod {allPods := kl.podManager.GetPods()
 activePods := kl.filterOutTerminatedPods(allPods)
 return activePods
 }

callback: 是 kubelet 收到 device plugin 的 ListAndWatch gRCP stream 中有 devices state 变更时的回调函数,包括有新设备增加、旧设备删除、设备状态变化,所以通过 ListAndWatch 接口的回调方式,可以实现设备的自动发现和热插拔。

type monitorCallback func(resourceName string, added, updated, deleted []pluginapi.Device)

healthyDevices: map 对象,key 为 Resource Name,value 为对应的健康的 device IDs。

unhealthyDevices: map 对象,key 为 Resource Name,value 为对应的不健康的 device IDs。

allocatedDevices: map 对象,key 为 Resource Name,value 为已经分配出去的 device IDs。

podDevices: 记录每个 pod 中每个容器的 device 分配情况。

// ContainerAllocateResponse 为容器内某个 device 对应的分配信息,包括注入的环境变量、挂载信息、Annotations。type ContainerAllocateResponse struct {Envs map[string]string 
 Mounts []*Mount 
 Devices []*DeviceSpec 
 Annotations map[string]string 
 // deviceAllocateInfo
 type deviceAllocateInfo struct {
 deviceIds sets.String
 allocResp *pluginapi.ContainerAllocateResponse
 type resourceAllocateInfo map[string]deviceAllocateInfo // Keyed by resourceName.
 type containerDevices map[string]resourceAllocateInfo // Keyed by containerName.
 type podDevices map[string]containerDevices // Keyed by podUID.

store: 是对 checkpointData 的文件存储 (/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint),具体存储了每个 Pod 分配的 Devices 信息 PodDeviceEntries, 以及已经注册的 Resource Name 及对应的 Devices IDs。

type checkpointData struct {PodDeviceEntries []podDevicesCheckpointEntry
 RegisteredDevices map[string][]string // key 为 Resource Name,value 为 DeviceIDs
 type podDevicesCheckpointEntry struct {
 PodUID string
 ContainerName string
 ResourceName string
 DeviceIDs []string
 AllocResp []byte}

pluginOpts: map 对象,key 为 Resource Name,value 为 DevicePluginOptions,目前只有一项内容,就是 PreStartRequired bool,表示是否在容器启动前要调用 device plugin 的 PreStartContiner 接口。在 nvidia-k8s-plugin 中,PreStartContainer 为空实现。

NewManagerImpl

我们再来看看 Device Manager 的具体创建实现 NewManagerImpl。

pkg/kubelet/cm/devicemanager/manager.go:97
// NewManagerImpl creates a new manager.
func NewManagerImpl() (*ManagerImpl, error) {
 //  通过 /var/lib/kubelet/device-plugins/kubelet.sock 与 device plugin 交互
 return newManagerImpl(pluginapi.KubeletSocket)
func newManagerImpl(socketPath string) (*ManagerImpl, error) {glog.V(2).Infof(Creating Device Plugin manager at %s , socketPath)
 if socketPath ==   || !filepath.IsAbs(socketPath) {return nil, fmt.Errorf(errBadSocket+  %v , socketPath)
 dir, file := filepath.Split(socketPath)
 manager :=  ManagerImpl{endpoints: make(map[string]endpoint),
 socketname: file,
 socketdir: dir,
 healthyDevices: make(map[string]sets.String),
 unhealthyDevices: make(map[string]sets.String),
 allocatedDevices: make(map[string]sets.String),
 pluginOpts: make(map[string]*pluginapi.DevicePluginOptions),
 podDevices: make(podDevices),
 manager.callback = manager.genericDeviceUpdateCallback
 // The following structs are populated with real implementations in manager.Start()
 // Before that, initializes them to perform no-op operations.
 manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
 manager.sourcesReady =  sourcesReadyStub{}
 var err error
 //  在 /var/lib/kubelet/device-plugins/ 目录下创建 file store 类型的 key-value 存储文件 kubelet_internal_checkpoint,用来作为 kubelet 的 device plugin 的 checkpoint。manager.store, err = utilstore.NewFileStore(dir, utilfs.DefaultFs{})
 if err != nil {return nil, fmt.Errorf( failed to initialize device plugin checkpointing store: %+v , err)
 return manager, nil
}

kubelet Device Manager 通过 /var/lib/kubelet/device-plugins/kubelet.sock 与 device plugin 交互。

注册 callback 为 genericDeviceUpdateCallback,用来处理对应 devices 的 add,delete,update 事件。

在 /var/lib/kubelet/device-plugins/ 目录下创建 file store 类型的 key-value 存储文件 kubelet_internal_checkpoint,用来作为 kubelet 的 device plugin 的 checkpoint。

当监听到 devices add/delete/update 事件发生时,会更新到 kubelet_internal_checkpoint 文件中。

当 device plugin 的 stop time 超过 grace period time(代码写死为 5min,不可配置), 会从 checkpoint 中删除对应的 devices。在这个时间范围内,Device Manager 会继续缓存该 endpoint 及对应的 devices。

为 Container Allocate Devices 后,也会将 PodDevices 更新到 checkpoint 中。

我们来看看 callback 的实现 genericDeviceUpdateCallback 的实现,了解 Device Manager 是如何处理 devices 的 add/delete/update 消息的。

pkg/kubelet/cm/devicemanager/manager.go:134
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, added, updated, deleted []pluginapi.Device) {kept := append(updated, added...)
 m.mutex.Lock()
 if _, ok := m.healthyDevices[resourceName]; !ok {m.healthyDevices[resourceName] = sets.NewString()
 if _, ok := m.unhealthyDevices[resourceName]; !ok {m.unhealthyDevices[resourceName] = sets.NewString()
 for _, dev := range kept {
 if dev.Health == pluginapi.Healthy {m.healthyDevices[resourceName].Insert(dev.ID)
 m.unhealthyDevices[resourceName].Delete(dev.ID)
 } else {m.unhealthyDevices[resourceName].Insert(dev.ID)
 m.healthyDevices[resourceName].Delete(dev.ID)
 for _, dev := range deleted {m.healthyDevices[resourceName].Delete(dev.ID)
 m.unhealthyDevices[resourceName].Delete(dev.ID)
 m.mutex.Unlock()
 m.writeCheckpoint()}

将 callback 中收到的 devices 状态是 Healthy,那么将 device ID 插入到 ManagerImpl 中 healthDevices 中,并从 unhealthyDevices 中删除。

将 callback 中收到的 devices 状态是 Unhealthy,那么将 device ID 插入到 ManagerImpl 中 unhealthDevices 中,并从 healthyDevices 中删除。

将 device plugin 反馈的需要 delete 的 devices 从 healthDevices 和 unhealthDevices 中一并删除。

将 ManagerImpl 中的数据更新到 checkpoint 文件中。

Device Manager 的启动

前面把 Device Manager 的创建流程分析了一下,还涉及到 checkpoint 和 callback 的分析。接下来,我们继续对 Device Manager 的 Start 流程进行分析。

Start Device Manager

Device Manager 是在 containerManagerImpl 的 Start 时启动的。

pkg/kubelet/cm/container_manager_linux.go:527
func (cm *containerManagerImpl) Start(node *v1.Node,
 activePods ActivePodsFunc,
 sourcesReady config.SourcesReady,
 podStatusProvider status.PodStatusProvider,
 runtimeService internalapi.RuntimeService) error {
 // Starts device manager.
 if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil {
 return err
 return nil
}

deviceManager.Start 的第一个参数是获取该节点的 active(non-terminated)Pods 的函数。

SourcesReady 是用来跟踪 kubelet 配置的 Pod Sources,这些 Sources 包括:

file: 通过 static file 创建静态 Pods。

http: 通过 http 接口来获取 Pods 信息。

api: 从 Kubernetes API Server 获取 Pods 信息,是 Kubernetes 默认的内部机制。

*: 表示包含以上全部的 Sources 类型。

ManagerIml Start

ManagerIml.Start 负责启动 Device Manager,对外提供 gRPC 服务。

pkg/kubelet/cm/devicemanager/manager.go:204
// Start starts the Device Plugin Manager amd start initialization of
// podDevices and allocatedDevices information from checkpoint-ed state and
// starts device plugin registration service.
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
 m.activePods = activePods
 m.sourcesReady = sourcesReady
 // Loads in allocatedDevices information from disk.
 err := m.readCheckpoint()
 socketPath := filepath.Join(m.socketdir, m.socketname)
 os.MkdirAll(m.socketdir, 0755)
 // Removes all stale sockets in m.socketdir. Device plugins can monitor
 // this and use it as a signal to re-register with the new Kubelet.
 if err := m.removeContents(m.socketdir); err != nil {glog.Errorf( Fail to clean up stale contents under %s: %+v , m.socketdir, err)
 s, err := net.Listen(unix , socketPath)
 if err != nil {glog.Errorf(errListenSocket+  %+v , err)
 return err
 m.server = grpc.NewServer([]grpc.ServerOption{}...)
 pluginapi.RegisterRegistrationServer(m.server, m)
 go m.server.Serve(s)
 glog.V(2).Infof(Serving device plugin registration server on %q , socketPath)
 return nil
}

首先读取 checkpoint file 中数据,恢复 ManagerImpl 的相关数据,包括:

podDevices;

allocatedDevices;

healthyDevices;

unhealthyDevices;

endpoints,注意这里会将 endpoint 的 stop time 设置为当前时间,意味着 kubelet restart 后,需要等待 device plugin 进行 re-register 后,才认为这些 resource 是可用的。

然后将 /var/lib/kubelet/device-plugins/ 下面的所有文件清空,当然 checkpiont 文件除外,也就是清空所有的 socket 文件,包括自己的 kubelet.sock, 以及其他所有之前的 device plugin 的 socket 文件。device plugin 会监控 kubelet.sock 文件是否被删除,如果删除,则会触发自己的向 kubelet 重新注册自己。

创建 kubelet.sock 并启动 gRPC Server 对外提供 gRPC 服务,目前只注册了 Register 服务,用于 Device plugin 调用进行插件注册。

Register 服务

我们就来看看 kubelet Device Manager 对外提供的唯一 gRPC 接口 Register。

Register

pkg/kubelet/cm/devicemanager/manager.go:289
// Register registers a device plugin.
func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {glog.Infof( Got registration request from device plugin with resource name %q , r.ResourceName)
 metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()
 var versionCompatible bool
 for _, v := range pluginapi.SupportedVersions {
 if r.Version == v {
 versionCompatible = true
 break
 if !versionCompatible {errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)
 glog.Infof(Bad registration request from device plugin with resource name %q: %v , r.ResourceName, errorString)
 return  pluginapi.Empty{}, fmt.Errorf(errorString)
 if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) {errorString := fmt.Sprintf(errInvalidResourceName, r.ResourceName)
 glog.Infof(Bad registration request from device plugin: %v , errorString)
 return  pluginapi.Empty{}, fmt.Errorf(errorString)
 // TODO: for now, always accepts newest device plugin. Later may consider to
 // add some policies here, e.g., verify whether an old device plugin with the
 // same resource name is still alive to determine whether we want to accept
 // the new registration.
 go m.addEndpoint(r)
 return  pluginapi.Empty{}, nil}

注册请求是 device plugin 向 kubelet 发送的,注册请求 RegisterRequest 为:

type RegisterRequest struct {
 Version string // Kubernetes 1.10 对应的 device plugin api version 为 v1beta1
 Endpoint string // device plugin 对应的 socket name
 ResourceName string 
 Options *DevicePluginOptions 
 }

这里会检查注册的 Resource Name 是否符合 Extended Resource 的规则:

Resource Name 不能属于 kubernetes.io,得有自己的 domain,比如 nvidia.com。

Resource Name 中不能包含 requests. 前缀。

对应的 Resource value 只能是整数值。

调用 addEndpoint 进行插件注册。

addEndpoint 进行 device plugin 注册

从上面 Register 的方法中可见,真正插件注册的逻辑是在 addEndpoint 中实现的。

pkg/kubelet/cm/devicemanager/manager.go:332
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {existingDevs := make(map[string]pluginapi.Device)
 m.mutex.Lock()
 old, ok := m.endpoints[r.ResourceName]
 if ok   old != nil {
 // Pass devices of previous endpoint into re-registered one,
 // to avoid potential orphaned devices upon re-registration
 devices := make(map[string]pluginapi.Device)
 for _, device := range old.getDevices() {devices[device.ID] = device
 existingDevs = devices
 m.mutex.Unlock()
 socketPath := filepath.Join(m.socketdir, r.Endpoint)
 e, err := newEndpointImpl(socketPath, r.ResourceName, existingDevs, m.callback)
 if err != nil {glog.Errorf( Failed to dial device plugin with request %v: %v , r, err)
 return
 m.mutex.Lock()
 if r.Options != nil {m.pluginOpts[r.ResourceName] = r.Options
 // Check for potential re-registration during the initialization of new endpoint,
 // and skip updating if re-registration happens.
 // TODO: simplify the part once we have a better way to handle registered devices
 ext := m.endpoints[r.ResourceName]
 if ext != old {glog.Warningf( Some other endpoint %v is added while endpoint %v is initialized , ext, e)
 m.mutex.Unlock()
 e.stop()
 return
 // Associates the newly created endpoint with the corresponding resource name.
 // Stops existing endpoint if there is any.
 m.endpoints[r.ResourceName] = e
 glog.V(2).Infof(Registered endpoint %v , e)
 m.mutex.Unlock()
 if old != nil {old.stop()
 go func() {e.run()
 e.stop()
 m.mutex.Lock()
 if old, ok := m.endpoints[r.ResourceName]; ok   old == e {m.markResourceUnhealthy(r.ResourceName)
 glog.V(2).Infof(Unregistered endpoint %v , e)
 m.mutex.Unlock()}

首先检查注册的这个 device plugin 是否已经注册过,如果注册过,则获取已经缓存的 devices。

再检查 device plugin 的 socket 是否能 dial 成功,如果 dial 失败,则说明 device plugin 没正常启动。如果 dial 成功,就根据已经缓存的 devices 重新初始化 Endpoint,EndpointImpl 的定义如下:

type endpointImpl struct {
 client pluginapi.DevicePluginClient
 clientConn *grpc.ClientConn
 socketPath string
 resourceName string
 stopTime time.Time
 devices map[string]pluginapi.Device
 mutex sync.Mutex
 cb monitorCallback
 }

为了防止在 EndpointImpl 重新初始化的过程中 device plugin 进行 re-register,初始化完成后再次获取缓存中该 device plugin 的 Endpoint,并与初始化之前的 Endpoint 对象进行比对:

如果不是同一个对象,则说明在初始化过程中发生了 re-register,那么就 invoke Endpoint 的 stop 接口,关闭 gRPC 连接,并设置 Endpoint 的 stopTime 为当前时间,Register 流程以失败结束。

否则继续后面流程。

如果该 device plugin 之前注册过,那么再重新调用 Endpoint 的 run() 启动之前,先调用 Endpoint 的 stop 关闭 gRPC 连接,并设置 Endpoint 的 stopTime 为当前时间。

然后启动 golang 协程执行 Endpoint 的 run(),在 run 方法中:

调用 device plugin 的 ListAndWatch gRPC 接口,通过长连接持续获取 ListAndWatch gRPC stream,

从 stream 流中获取的 devices 与 Endpoint 中缓存的 devices 进行比对,得到需要 add/delete/update 的 devices,

然后调用 Endpoint 的 callback(也就是 ManagerImpl 注册的 callback 方法 genericDeviceUpdateCallback)进行 Device Manager 的缓存更新并写到 checkpoint 文件中。

直到与 device plugin 的 gRPC 连接发生 errListAndWatch 错误,跳出持续获取 stream 的死循环,然后调用 Endpoint 的 stop 关闭 gRPC 连接,并设置 Endpoint 的 stopTime 为当前时间。

invoke stop 后,再标记该 device plugin 对应的所有 devices 为 unhealthy,即设置 healthyDevices 为空,所有原来 healthy 的 devices 都加到 unhealthyDevices 中,此时表示注册失败。

调用 Device Plugin 的 Allocate 接口注册 UpdatePluginResources 为 Pod Admit Handler

kubelet 在 NewMainKubelet 中会注册一系列的 Pod Admit Handler,当有 Pod 需要创建的时,都会先调用这些 Pod Admit Handler 进行处理,其中 klet.containerManager.UpdatePluginResources 就是 kubelet Device Manager 为 Pod 分配 devices 的。

pkg/kubelet/kubelet.go:893
func NewMainKubelet( ... ) (*Kubelet, error) {klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
pkg/kubelet/cm/container_manager_linux.go:618
func (cm *containerManagerImpl) UpdatePluginResources(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {return cm.deviceManager.Allocate(node, attrs)
}

Allocate

kubelet 在创建 Pod 前,会 invoke Device Manager 的 Allocate 方法,为 Pod 中的每个 Container 请求分配对应的 devices,kubelet 会将请求转发到对应的 Endpoint 的 Allocate 方法,然后请求会到对应的 device plugin 进行处理。

pkg/kubelet/cm/devicemanager/manager.go:259
func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
 pod := attrs.Pod
 devicesToReuse := make(map[string]sets.String)
 // TODO: Reuse devices between init containers and regular containers.
 for _, container := range pod.Spec.InitContainers {if err := m.allocateContainerResources(pod,  container, devicesToReuse); err != nil {
 return err
 m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
 for _, container := range pod.Spec.Containers {if err := m.allocateContainerResources(pod,  container, devicesToReuse); err != nil {
 return err
 m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
 m.mutex.Lock()
 defer m.mutex.Unlock()
 // quick return if no pluginResources requested
 if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
 return nil
 m.sanitizeNodeAllocatable(node)
 return nil
}

调用 allocateContainerResources 为 Pod 中的 init container 分配 devices,并更新 ManagerImpl 中 PodDevices 缓存;

调用 allocateContainerResources 为 Pod 中的 regular container 分配 devices,并更新 ManagerImpl 中 PodDevices 缓存;

调用 sanitizeNodeAllocatable 更新 scheduler cache 中 Node 对应 Resource Name 的 Allocatable Resource;

allocateContainerResources

pkg/kubelet/cm/devicemanager/manager.go:608
func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {podUID := string(pod.UID)
 contName := container.Name
 allocatedDevicesUpdated := false
 // Extended resources are not allowed to be overcommitted.
 // Since device plugin advertises extended resources,
 // therefore Requests must be equal to Limits and iterating
 // over the Limits should be sufficient.
 for k, v := range container.Resources.Limits {resource := string(k)
 needed := int(v.Value())
 glog.V(3).Infof(needs %d %s , needed, resource)
 if !m.isDevicePluginResource(resource) {
 continue
 // Updates allocatedDevices to garbage collect any stranded resources
 // before doing the device plugin allocation.
 if !allocatedDevicesUpdated {m.updateAllocatedDevices(m.activePods())
 allocatedDevicesUpdated = true
 allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
 if err != nil {
 return err
 if allocDevices == nil || len(allocDevices)  = 0 {
 continue
 startRPCTime := time.Now()
 m.mutex.Lock()
 e, ok := m.endpoints[resource]
 m.mutex.Unlock()
 if !ok {m.mutex.Lock()
 m.allocatedDevices = m.podDevices.devices()
 m.mutex.Unlock()
 return fmt.Errorf(Unknown Device Plugin %s , resource)
 devs := allocDevices.UnsortedList()
 glog.V(3).Infof(Making allocation request for devices %v for device plugin %s , devs, resource)
 resp, err := e.allocate(devs)
 metrics.DevicePluginAllocationLatency.WithLabelValues(resource).Observe(metrics.SinceInMicroseconds(startRPCTime))
 if err != nil {m.mutex.Lock()
 m.allocatedDevices = m.podDevices.devices()
 m.mutex.Unlock()
 return err
 // Update internal cached podDevices state.
 m.mutex.Lock()
 m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0])
 m.mutex.Unlock()
 // Checkpoints device to container allocation information.
 return m.writeCheckpoint()}

device plugin 提供的 Resource 属于 Kubernetes Extended Resources,所以其 Resource QoS 只能是 Guaranted。

每次在为 Pod 分配 devices 之前,都去检查一下此时的 active pods,并与 podDevices 缓存中的 pods 进行比对,将已经 terminated 的 Pods 的 devices 从 podDevices 中删除,即进行了 devices 的 GC 操作。

从 healthyDevices 中随机分配对应数量的 devices 给该 Pod,并注意更新 allocatedDevices,否则会导致一个 device 被分配给多个 Pod。

拿到 devices 后,就调用 Endpoint 的 Allocate 方法(进而调用对应 device plugin 的 Allocate gRPC Service),device plugin 返回 ContainerAllocateResponse(包括注入的环境变量、挂载信息、Annotations)。

更新 podDevices 缓存信息,并将 ManagerImpl 中缓存数据更新到 checkpoint 文件中。

思考:当 init container 结束后,对应分配的 devices 会被释放吗?目前还不会释放 devices,在 Allocate 前只会回收 Terminated Pods 的 devices,并没有回收 init container 的 devices。要优化这个也是比较简单的,只要修改上面代码中 updateAllocatedDevices 方法内的逻辑就行了,增加 init container 的 devices 回收逻辑。
所以当前版本最好不会要在 init container 中使用 devices,虽然这种场景几乎不存在。

维护 NodeStatus 中 Device Plugin 管理的 Resource Capacity

当 kubelet 更新 node status 时会调用 GetCapacity 更新 device plugins 对应的 Resource 信息。

pkg/kubelet/kubelet_node_status.go:599
func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = kl.containerManager.GetDevicePluginResourceCapacity()

func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {return cm.deviceManager.GetCapacity() }

下面是 GetCapacity 的具体代码实现,逻辑很简单:

检测 healthyDevices 对应的 device plugin 是否已经从缓存中删除或者已经停止超过 5min,如果满足以上条件之一,则从 endpoints 和 healthyDevices 缓存中删除这些 devices。

检测 unhealthyDevices 对应的 device plugin 是否已经从缓存中删除或者已经停止超过 5min,如果满足以上条件之一,则从 endpoints 和 unhealthyDevices 缓存中删除这些 devices。

如果缓存发生变化,则更新到 checkpoint 文件中。

pkg/kubelet/cm/devicemanager/manager.go:414
func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
 needsUpdateCheckpoint := false
 var capacity = v1.ResourceList{}
 var allocatable = v1.ResourceList{}
 deletedResources := sets.NewString()
 m.mutex.Lock()
 for resourceName, devices := range m.healthyDevices {e, ok := m.endpoints[resourceName]
 if (ok   e.stopGracePeriodExpired()) || !ok {
 if !ok {glog.Errorf( unexpected: healthyDevices and endpoints are out of sync)
 delete(m.endpoints, resourceName)
 delete(m.healthyDevices, resourceName)
 deletedResources.Insert(resourceName)
 needsUpdateCheckpoint = true
 } else {capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
 allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
 for resourceName, devices := range m.unhealthyDevices {e, ok := m.endpoints[resourceName]
 if (ok   e.stopGracePeriodExpired()) || !ok {
 if !ok {glog.Errorf( unexpected: unhealthyDevices and endpoints are out of sync)
 delete(m.endpoints, resourceName)
 delete(m.unhealthyDevices, resourceName)
 deletedResources.Insert(resourceName)
 needsUpdateCheckpoint = true
 } else {capacityCount := capacity[v1.ResourceName(resourceName)]
 unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
 capacityCount.Add(unhealthyCount)
 capacity[v1.ResourceName(resourceName)] = capacityCount
 m.mutex.Unlock()
 if needsUpdateCheckpoint {m.writeCheckpoint()
 return capacity, allocatable, deletedResources.UnsortedList()}

GetCapacity 更新 NodeStatus 如下数据:

registered device plugin resource Capacity

registered device plugin resource Allocatable

previously registered resources that are no longer active

调用 Device Plugin 的 PreStartContainer 接口

在 kubelet 的 GetResource 中,会调用 DeviceManager 的 GetDeviceRunContainerOptions,并将这些 options 添加到 kubecontainer.RunContainerOptions 中。RunContainerOptions 包括 Envs、Mounts、Devices、PortMappings、Annotations 等信息。

pkg/kubelet/cm/container_manager_linux.go:601
// TODO: move the GetResources logic to PodContainerManager.
func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {opts :=  kubecontainer.RunContainerOptions{}
 // Allocate should already be called during predicateAdmitHandler.Admit(),
 // just try to fetch device runtime information from cached state here
 devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
 if err != nil {return nil, err} else if devOpts == nil {
 return opts, nil
 opts.Devices = append(opts.Devices, devOpts.Devices...)
 opts.Mounts = append(opts.Mounts, devOpts.Mounts...)
 opts.Envs = append(opts.Envs, devOpts.Envs...)
 opts.Annotations = append(opts.Annotations, devOpts.Annotations...)
 return opts, nil
}

Device Manager 的 GetDeviceRunContainerOptions 会根据 pluginOpts 的 PreStartRequired 是否为 true,决定是否调用 device plugin 的 PreStartContainer gRPC Service。

注意:如果某个 device plugin 的 PreStartRequired 为 true,那么需要注册 kubelet Device Manager 调用 device plugin 的 PreStartContainer 接口的超时时间是 30s,即 30s 内必须完成 PreStartContainer 的逻辑并返回。

pkg/kubelet/cm/devicemanager/manager.go:688
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
// for the passed-in  pod, container  and returns its DeviceRunContainerOptions
// for the found one. An empty struct is returned in case no cached state is found.
func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {podUID := string(pod.UID)
 contName := container.Name
 for k := range container.Resources.Limits {resource := string(k)
 if !m.isDevicePluginResource(resource) {
 continue
 err := m.callPreStartContainerIfNeeded(podUID, contName, resource)
 if err != nil {
 return nil, err
 m.mutex.Lock()
 defer m.mutex.Unlock()
 return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil
}

然后 deviceRunContainerOptions 负责封装 Container 的 Envs、Mount points、Device files、Annotations。

到此,关于“Device Manager 在什么时候创建”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计24386字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)