共计 7125 个字符,预计需要花费 18 分钟才能阅读完成。
这篇文章给大家介绍如何理解 Docker 中的容器日志处理与 log-driver 实现,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
概要
丸趣 TV 小编将从 docker(1.12.6)源码的角度分析 docker daemon 怎么将容器的日志收集出来并通过配置的 log-driver 发送出去,并结合示例介绍了好雨云帮中实现的一个 zmq-loger。
阅读准备
(1)首先你需要认知以下几个关键词:
stdout:
标准输出,进程写数据的流。
stderr:
错误输出,进程写错误数据的流。
子进程:
由一个进程(父进程)创建的进程,集成父进程大部分属性,同时可以被父进程守护和管理。
(2)你需要知道关于进程产生日志的形式:
进程产生日志有两类输出方式,一类是写入到文件中。另一类是直接写到 stdout 或者 stderr,例如 php 的 echo python 的 print golang 的 fmt.Println()等等。
(3)是否知道 docker-daemon 与运行中 container 的关系?一个 container 就是一个特殊的进程,它是由 docker daemon 创建并启动,因此 container 是 docker daemon 的子进程。由 docker daemon 守护和管理。因此 container 的 stdout 能够被 docker daemon 获取到。基于此理论,我们来分析 docker daemon 相关代码。
docker-daemon 关于日志源码分析 container 实例源码
# /container/container.go:62
type CommonContainer struct{
StreamConfig *stream.Config
...
# /container/stream/streams.go:26
type Config struct {
sync.WaitGroup
stdout *broadcaster.Unbuffered
stderr *broadcaster.Unbuffered
stdin io.ReadCloser
stdinPipe io.WriteCloser
}
找到如上所示对应的代码,显示了每一个 container 实例都有几个属性 stdout,stderr,stdin, 以及管道 stdinPipe。这里说下 stdinPipe, 当容器使用 - i 参数启动时标准输入将被运行,daemon 将能够使用此管道向容器内写入标准输入。
![2017011930658image2017-1-18 17-18-38.png](http://7xqmjb.com1.z0.glb.clouddn.com/2017011930658image2017-1-18 17-18-38.png)
我们试想以上图例,如果是你,你怎么实现日志收集转发?
# /container/container.go:312
func (container *Container) StartLogger(cfg containertypes.LogConfig) (logger.Logger, error) {c, err := logger.GetLogDriver(cfg.Type)
if err != nil {return nil, fmt.Errorf( Failed to get logging factory: %v , err)
ctx := logger.Context{
Config: cfg.Config,
ContainerID: container.ID,
ContainerName: container.Name,
ContainerEntrypoint: container.Path,
ContainerArgs: container.Args,
ContainerImageID: container.ImageID.String(),
ContainerImageName: container.Config.Image,
ContainerCreated: container.Created,
ContainerEnv: container.Config.Env,
ContainerLabels: container.Config.Labels,
DaemonName: docker ,
// Set logging file for json-logger
if cfg.Type == jsonfilelog.Name {ctx.LogPath, err = container.GetRootResourcePath(fmt.Sprintf( %s-json.log , container.ID))
if err != nil {
return nil, err
return c(ctx)
#/container/container.go:978
func (container *Container) startLogging() error {
if container.HostConfig.LogConfig.Type == none {
return nil // do not start logging routines
l, err := container.StartLogger(container.HostConfig.LogConfig)
if err != nil {return fmt.Errorf( Failed to initialize logging driver: %v , err)
copier := logger.NewCopier(map[string]io.Reader{stdout : container.StdoutPipe(), stderr : container.StderrPipe()}, l)
container.LogCopier = copier
copier.Run()
container.LogDriver = l
// set LogPath field only for json-file logdriver
if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {container.LogPath = jl.LogPath()
return nil
}
第一个方法是为 container 查找 log-driver。首先根据容器配置的 log-driver 类别调用:logger.GetLogDriver(cfg.Type)返回一个方法类型:
/daemon/logger/factory.go:9
type Creator func(Context) (Logger, error)
实质就是从工厂类注册的 logdriver 插件去查找,具体源码下文分析。获取到 c 方法后构建调用参数具体就是容器的一些信息。然后使用调用 c 方法返回 driver。driver 是个接口类型,我们看看有哪些方法:
# /daemon/logger/logger.go:61
type Logger interface {Log(*Message) error
Name() string
Close() error}
很简单的三个方法,也很容易理解,Log()发送日志消息到 driver,Close()进行关闭操作(根据不同实现)。也就是说我们自己实现一个 logdriver,只需要实现如上三个方法,然后注册到 logger 工厂类中即可。下面我们来看 /daemon/logger/factory.go
第二个方法就是处理日志了,获取到日志 driver, 在创建一个 Copier, 顾名思义就是复制日志,分别从 stdout 和 stderr 复制到 logger driver。下面看看具体关键实现:
#/daemon/logger/copir.go:41
func (c *Copier) copySrc(name string, src io.Reader) {defer c.copyJobs.Done()
reader := bufio.NewReader(src)
for {
select {
case -c.closed:
return
default:
line, err := reader.ReadBytes(\n)
line = bytes.TrimSuffix(line, []byte{\n})
// ReadBytes can return full or partial output even when it failed.
// e.g. it can return a full entry and EOF.
if err == nil || len(line) 0 {if logErr := c.dst.Log( Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {logrus.Errorf( Failed to log msg %q for logger %s: %s , line, c.dst.Name(), logErr)
if err != nil {
if err != io.EOF {logrus.Errorf( Error scanning log stream: %s , err)
return
}
每读取一行数据,构建一个消息,调用 logdriver 的 log 方法发送到 driver 处理。
日志 driver 注册器
位于 /daemon/logger/factory.go 的源码实现即时日志 driver 的注册器,其中几个重要的方法(上文已经提到一个):
# /daemon/logger/factory.go:21
func (lf *logdriverFactory) register(name string, c Creator) error {if lf.driverRegistered(name) {return fmt.Errorf( logger: log driver named %s is already registered , name)
lf.m.Lock()
lf.registry[name] = c
lf.m.Unlock()
return nil
# /daemon/logger/factory.go:39
func (lf *logdriverFactory) registerLogOptValidator(name string, l LogOptValidator) error {lf.m.Lock()
defer lf.m.Unlock()
if _, ok := lf.optValidator[name]; ok {return fmt.Errorf( logger: log validator named %s is already registered , name)
lf.optValidator[name] = l
return nil
}
看起来很简单,就是将一个 Creator 方法类型添加到一个 map 结构中,将 LogOptValidator 添加到另一个 map 这里注意加锁的操作。
#/daemon/logger/factory.go:13
type LogOptValidator func(cfg map[string]string) error
这个主要是验证 driver 的参数 ,dockerd 和 docker 启动参数中有:–log-opt
好雨云帮自己实现一个基于 zmq 的 log-driver
上文已经完整分析了 docker daemon 管理 logdriver 和处理日志的整个流程。相信你已经比较明白了。下面我们以 zmq-driver 为例讲讲我们怎么实现自己的 driver。直接接收容器的日志。
上文我们已经谈了一个 log-driver 需要实现的几个方法。我们可以看看位于 /daemon/logger 目录下的已有的 driver 的实现,例如 fluentd,awslogs 等。下面我们来分析 zmq-driver 具体的代码:
// 定义一个 struct, 这里包含一个 zmq 套接字
type ZmqLogger struct {
writer *zmq.Socket
containerId string
tenantId string
serviceId string
felock sync.Mutex
// 定义 init 方法调用 logger 注册器的方法注册当前 driver
// 和参数验证方法。func init() {if err := logger.RegisterLogDriver(name, New); err != nil {logrus.Fatal(err)
if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {logrus.Fatal(err)
// 实现一个上文提到的 Creator 方法注册 logdriver.
// 这里新建一个 zmq 套接字构建一个实例
func New(ctx logger.Context) (logger.Logger, error) {zmqaddress := ctx.Config[zmqAddress]
puber, err := zmq.NewSocket(zmq.PUB)
if err != nil {
return nil, err
var (env = make(map[string]string)
tenantId string
serviceId string
for _, pair := range ctx.ContainerEnv {p := strings.SplitN(pair, = , 2)
//logrus.Errorf(ContainerEnv pair: %s , pair)
if len(p) == 2 {key := p[0]
value := p[1]
env[key] = value
tenantId = env[TENANT_ID]
serviceId = env[SERVICE_ID]
if tenantId == {
tenantId = default
if serviceId == {
serviceId = default
puber.Connect(zmqaddress)
return ZmqLogger{
writer: puber,
containerId: ctx.ID(),
tenantId: tenantId,
serviceId: serviceId,
felock: sync.Mutex{},}, nil
// 实现 Log 方法,这里使用 zmq socket 发送日志消息
// 这里必须注意,zmq socket 是线程不安全的,我们知道
// 本方法可能被两个线程 (复制 stdout 和肤质 stderr) 调用 // 必须使用锁保证线程安全。否则会发生错误。func (s *ZmqLogger) Log(msg *logger.Message) error {s.felock.Lock()
defer s.felock.Unlock()
s.writer.Send(s.tenantId, zmq.SNDMORE)
s.writer.Send(s.serviceId, zmq.SNDMORE)
if msg.Source == stderr {s.writer.Send(s.containerId+ : +string(msg.Line), zmq.DONTWAIT)
} else {s.writer.Send(s.containerId+ : +string(msg.Line), zmq.DONTWAIT)
return nil
// 实现 Close 方法,这里用来关闭 zmq socket。// 同样注意线程安全,调用此方法的是容器关闭协程。func (s *ZmqLogger) Close() error {s.felock.Lock()
defer s.felock.Unlock()
if s.writer != nil {return s.writer.Close()
return nil
func (s *ZmqLogger) Name() string {
return name
// 验证参数的方法,我们使用参数传入 zmq pub 的地址。func ValidateLogOpt(cfg map[string]string) error {
for key := range cfg {
switch key {
case zmqAddress:
default:
return fmt.Errorf(unknown log opt %s for %s log driver , key, name)
if cfg[zmqAddress] == {return fmt.Errorf( must specify a value for log opt %s , zmqAddress)
return nil
}
关于如何理解 Docker 中的容器日志处理与 log-driver 实现就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。