containerd源码接口调用的方法是什么

66次阅读
没有评论

共计 9222 个字符,预计需要花费 24 分钟才能阅读完成。

本篇内容主要讲解“containerd 源码接口调用的方法是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让丸趣 TV 小编来带大家学习“containerd 源码接口调用的方法是什么”吧!

源码接口调用详情从 ctr 调用 containerd-api

####checkpoint(用于快照,docker 目前该功能不完善)|ctr cmd | containerd-api | | ———– |——–| | list | /types.API/ListCheckpoint | | create | /types.API/CreateCheckpoint | | delete | /types.API/DeleteCheckpoint |

####containers |ctr cmd | containerd-api | | ———– |——–| | list、state| /types.API/State | | pause、resume、update | /types.API/UpdateContainer | | create | /types.API/CreateContainer | | stats | /types.API/Stats | | watch | /types.API/State, /types.API/Events | | exec | /types.API/Events, /types.API/AddProcess, /types.API/UpdateProcess | | kill | /types.API/Signal | | start | /types.API/Events, /types.API/CreateContainer , /types.API/UpdateProcess | | update | /types.API/UpdateContainer | ####events /types.API/Events ####state /types.API/State ####version /types.API/GetServerVersion –return result

从 containerd-api 至 supervisor 任务处理

注:API–server.go — daemon – supervisor.go(handleTask func) ####checkpoint |containerd-api | supervisor | | ———– |——–| | /types.API/ListCheckpoint (supervisor.GetContainersTask)| getContainers | | /types.API/CreateCheckpoint | createCheckpoint | | /types.API/DeleteCheckpoint | deleteCheckpoint |

####containers |containerd-api | supervisor | | ———– |——–| | /types.API/State /types.API/Stats (supervisor.GetContainersTask) | getContainers | | /types.API/UpdateContainer (supervisor.UpdateTask) | updateContainer | | /types.API/CreateContainer (supervisor.StartTask) | start | | /types.API/Events | Events| | /types.API/AddProcess | addProcess | | /types.API/UpdateProcess | updateProcess | | /types.API/Signal| signal|

从 supervisor 至 runtime(runC)

####checkpoint |supervisor | runtime | | ———– |——–| | getContainers | – | | createCheckpoint | (runtime)CheckPoint — exec.Command(c.runtime,arg….) | | deleteCheckpoint | (runtime)DeleteCheckpoint| ####containers |supervisor | runtime | | ———– |——–| | getContainers | – | | updateContainer |(runtime)Resume Pause UpdateResources– exec.Command(c.runtime,arg….) | | start | (runtime supervisor/worker.go) Start — exec.Command(c.shim,c.id,c.bundle,c.runtime)| | addProcess | (runtime) exec — exec.Command(c.shim,c.id,c.bundle,c.runtime) | | updateProcess | – | | signal | -|

## 以 createContainer 为例走读代码 ###deamon 启动监听 tasks 及 startTasks 进程 #### 进入 main.go main 方法调用 daemon 方法

app.Action = func(context *cli.Context) { if err := daemon(context); err != nil { logrus.Fatal(err)
 }
}

#### 进入 main.go daemon 方法

for i := 0; i   10; i++ { wg.Add(1)
 w := supervisor.NewWorker(sv, wg)
 go w.Start()
if err := sv.Start(); err != nil { return err}

#### 初始化 supervisor/worker.go NewWorker 并启动监听 startTask 并处理

func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker {
 return  worker{
 s: s,
 wg: wg,
 }
func (w *worker) Start() { defer w.wg.Done()
 for t := range w.s.startTasks {
 started := time.Now()
 process, err := t.Container.Start(t.CheckpointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
 if err != nil {
 logrus.WithFields(logrus.Fields{
  error : err,
  id : t.Container.ID(),
 }).Error(containerd: start container)
 t.Err  - err
 evt :=  DeleteTask{ ID: t.Container.ID(),
 NoEvent: true,
 Process: process,
 }
 w.s.SendTask(evt)
 continue
 }

### 启动 supervisor/supervisor.go task 监听 task 并处理

func (s *Supervisor) Start() error {
 logrus.WithFields(logrus.Fields{
  stateDir : s.stateDir,
  runtime : s.runtime,
  runtimeArgs : s.runtimeArgs,
  memory : s.machine.Memory,
  cpus : s.machine.Cpus,
 }).Debug(containerd: supervisor running)
 go func() {
 for i := range s.tasks { s.handleTask(i)
 
 }

###containers 容器创建示例 Ctl 控制台命令入口 ctr/main.go containersCommand

execCommand,
killCommand,
listCommand,
pauseCommand,
resumeCommand,
startCommand,
stateCommand,
statsCommand,
watchCommand,
updateCommand,

####ctr/container.go

var startCommand = cli.Command{
 Name:  start ,
 Usage:  start a container ,
 ArgsUsage:  ID BundlePath”, ————…...
 
 events, err := c.Events(netcontext.Background(),  types.EventsRequest{})/* 事件创建 */
 if err != nil { fatal(err.Error(), 1)
 }
 if _, err := c.CreateContainer(netcontext.Background(), r); err != nil {/* 容器创建 */
 fatal(err.Error(), 1)
 }
 if context.Bool(attach) { go func() { io.Copy(stdin, os.Stdin)
 if _, err := c.UpdateProcess(netcontext.Background(),  types.UpdateProcessRequest{/* 更新进程 */
 Id: id,
 Pid:  init ,
 CloseStdin: true,
 }); err != nil { fatal(err.Error(), 1)
 }
 restoreAndCloseStdin()
 }()
 if tty { resize(id,  init , c)
 go func() { s := make(chan os.Signal, 64)
 signal.Notify(s, syscall.SIGWINCH)
 for range s { if err := resize(id,  init , c); err != nil { log.Println(err)
 }
 }
 }()
 }
 waitForExit(c, events, id,  init , restoreAndCloseStdin)
 }
 
},

###api 处理 ####api/grpc/types/api.pb.go

func (c *aPIClient) Events(ctx context.Context, in *EventsRequest, opts ...grpc.CallOption) (API_EventsClient, error) { stream, err := grpc.NewClientStream(ctx,  _API_serviceDesc.Streams[0], c.cc,  /types.API/Events , opts...)
 if err != nil {
 return nil, err
 }
 x :=  aPIEventsClient{stream}
 if err := x.ClientStream.SendMsg(in); err != nil {
 return nil, err
 }
 if err := x.ClientStream.CloseSend(); err != nil {
 return nil, err
 }
 return x, nil
 
func (c *aPIClient) CreateContainer(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) { out := new(CreateContainerResponse)
 err := grpc.Invoke(ctx,  /types.API/CreateContainer , in, out, c.cc, opts...)
 if err != nil {
 return nil, err
 }
 return out, nil
func (c *aPIClient) UpdateProcess(ctx context.Context, in *UpdateProcessRequest, opts ...grpc.CallOption) (*UpdateProcessResponse, error) { out := new(UpdateProcessResponse)
 err := grpc.Invoke(ctx,  /types.API/UpdateProcess , in, out, c.cc, opts...)
 if err != nil {
 return nil, err
 }
 return out, nil
} 
 
func _API_Events_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(EventsRequest)
 if err := stream.RecvMsg(m); err != nil {
 return err
 }
 return srv.(APIServer).Events(m,  aPIEventsServer{stream})
 
 
func _API_CreateContainer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(CreateContainerRequest)
 if err := dec(in); err != nil {
 return nil, err
 }
 if interceptor == nil { return srv.(APIServer).CreateContainer(ctx, in)
 }
 info :=  grpc.UnaryServerInfo{
 Server: srv,
 FullMethod:  /types.API/CreateContainer ,
 }
 handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(APIServer).CreateContainer(ctx, req.(*CreateContainerRequest))
 }
 return interceptor(ctx, in, info, handler)
 
func _API_UpdateProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(UpdateProcessRequest)
 if err := dec(in); err != nil {
 return nil, err
 }
 if interceptor == nil { return srv.(APIServer).UpdateProcess(ctx, in)
 }
 info :=  grpc.UnaryServerInfo{
 Server: srv,
 FullMethod:  /types.API/UpdateProcess ,
 }
 handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(APIServer).UpdateProcess(ctx, req.(*UpdateProcessRequest))
 }
 return interceptor(ctx, in, info, handler)
}

api/grpc/server/server.go 进入第一步中的 tasks 及 sendTasks 处理队列

func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer) error {
 
events := s.sv.Events(t, r.StoredOnly, r.Id) 
func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) {
 
s.sv.SendTask(e)
 
apiC, err := createAPIContainer(r.Container, false) 
func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) { e :=  supervisor.UpdateProcessTask{}
 e.ID = r.Id
 e.PID = r.Pid
 e.Height = int(r.Height)
 e.Width = int(r.Width)
 e.CloseStdin = r.CloseStdin
 s.sv.SendTask(e)
 if err :=  -e.ErrorCh(); err != nil {
 return nil, err
 }
 return  types.UpdateProcessResponse{}, nil}

####supervisor/create.go

func (s *Supervisor) start(t *StartTask) error {s.startTasks  - task }

####supervisor/worker.go

func (w *worker) Start() { defer w.wg.Done()
 for t := range w.s.startTasks {

####runtime/container.go

func (c *container) Start(checkpointPath string, s Stdio) (Process, error) { processRoot := filepath.Join(c.root, c.id, InitProcessID)
 if err := os.Mkdir(processRoot, 0755); err != nil {
 return nil, err
 }
 cmd := exec.Command(c.shim,
 c.id, c.bundle, c.runtime,
 
 ) --- 执行  docker-containerd-shim 命令
 
 cmd.Dir = processRoot
 cmd.SysProcAttr =  syscall.SysProcAttr{
 Setpgid: true,
 }
 spec, err := c.readSpec()
 if err != nil {
 return nil, err
 }
 config :=  processConfig{
 checkpoint: checkpointPath,
 root: processRoot,
 id: InitProcessID,
 c: c,
 stdio: s,
 spec: spec,
 processSpec: specs.ProcessSpec(spec.Process),
 }
 p, err := newProcess(config)
 if err != nil {
 return nil, err
 }
 if err := c.createCmd(InitProcessID, cmd, p); err != nil {
 return nil, err
 }
 return p, nil
}

查看 shim 的 Main 方法注释参数传递

// Arg0: id of the container // Arg1: bundle path // Arg2: runtime binary

##containerd-shim 接收后处理 ###containerd-shim/main.go

func start(log *os.File) error {
 
p, err := newProcess(flag.Arg(0), flag.Arg(1), flag.Arg(2))
if err != nil {
 return err
defer func() { if err := p.Close(); err != nil { writeMessage(log,  warn , err)
 }
if err := p.create(); err != nil { p.delete()
 return err
}

###containerd-shim/process.go 跳转执行 runc 命令

func (p *process) create() error {cmd := exec.Command(p.runtime, args...)

到此,相信大家对“containerd 源码接口调用的方法是什么”有了更深的了解,不妨来实际操作一番吧!这里是丸趣 TV 网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计9222字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)