datadog agent代码分析之日志数据流转
背景
去年11月份研究了一下开源的datadog agent代码(7.32.1), 整理了一篇文档。
1 日志数据流转
- tailer首先从log文件读取,将读取的内容源源不断地发送到 decoder的 input channel中
- decoder 从自身的input channel读取数据 ,判断数据是否需要截断,将数据写入line parser的 input channel
- line parser从自身的input channel读取数据,解析内容、status、时间戳等,写入line handler的 input channel
- line handler从自身的input channel 读取数据,去除空格,发送到自身的output channel
- tailer forwardMessage 从decoder的output channel(与line handler共享)读取数据,添加tag后, 发送给pipeline的 input channel
- processor 从自身的input channel(pipe line的input channel)读取数据,encode后(比如encode为json/pb格式),发送到sender的input channel
- sender 从input channel读取数据,最后又将message写入pipeline的output channle
- sender将message的content发送给datadog 后台,发送时默认不压缩传输,http支持gzip压缩传输,tcp不支持压缩。
- pipeline的output channel初始传入的是auditor的 input channel。 auditor从input channel 读取数据,写入内存 ,定时器从buffer刷入磁盘,另外一个定时器定期清理内存过期数据
- input channel用白色的小长条表示, output channel 用灰色的小长条表示。
- input channel或者outputchannel 都是暂存message的通道。message=日志+附加信息
- 实线箭头表示数据直接写入,虚线箭头仅表示数据流向,两个逻辑channel并无实际数据转移。
- 为了与代码表述一致,虚线箭头两次仍然画成两个channel。两个channel实际为同一个物理channel,数据写入其中一个channel中,另一个channel可以直接使用数据。
2 具体代码逻辑
2.1.1 tailer 核心逻辑
tailer 核心处理逻辑就是(1)读取文件; (2)内容转发给decoder 处理 ;(3)处理后将数据转发给pipeline.
2.1.2 readForEver
readForever最终调用tailer.read()。读取具体的log file 写入decoder的 InputChan
2.1.3 decoder
Start() 先看decoder.run(), 下一小节分析lineParser.Start() decodeIncomingData 调用sendLIne() sendLine() 调用到 lineParser.Handle() lineParser将数据发送到parser的InputChan
2.1.4 parser
Start() lineparser 并发调用 lineHander.Start()和自身的run() 先看自身的run(),,下一小节分析lineHandler.Start()。 读取lineParser的inputChan,然后调用 process() process()将parser读取的content、status、timestamp 传给lineHandler.Handle() 其实就是将message写入到lineHandler的inputChan
2.1.5 handler
lineHandler.Start()启动一个goroutine调用自身的run() lineHandler的run()读取inputChan数据,调用process() process()做了trimSpace 判断是否truncate ,最终发送数据到自身的outputChan 看一下handler的outputChan,是在NewSingleLineHandler传入的 调用NewSingleLineHandler传入的outputChan ,也传给了New New将outputChan传给了Decoder的OutputChan。也就是decoder和lineHandler共享output chan
2.1.6 forwardMessage()
forwardMessage()读取decoder的OutputChan ,给数据添加tag、origin、identifier后,写入到tailer的outputChan
2.2
2.2.1 provider组件
tailer的outputChan是NewTailer时传入 NewTailer()被createTailer()调用 createTailer传入的是pipelineProvider.NextPipelineChan() pipelineProvider.NextPipelineChan() return的是provider的其中一个pipeline的InputChan provider相当于pipeline的一个集合,处理数据时还是指定其中一个pipeline。 返回到NewAgent()来看一下, NewAgent会先创建一个auditor/provider,然后将auditor和provider传给scanner provider启动会将auditor 的inputChan 传给pipeline。provider 启动其实也就是各个pipeline.Start() auditor.Channel()返回的其实是auditor的inputChan
2.2.2 pipeline
NewPipeline()会实例化sender和processor
根据一下代码,从下往上看,可以得出processor的inputChan和pipeline的inputChan相同(蓝框部分),processor的outputChan是sender的inputChan(红框部分)
sender的outputChan是传入的auditor的inputChan(绿框部分) pipeline Start() 其实就是sender和 processor的 Start()
2.2.3 processor
processor.Start()启动一个goroutine 调用自身的run() run()从processor的inputChan读取数据 最后写入processor的outputChan(即sender的inputChan)
2.2.4 sender
sender.Start()启动一个goroutine调用自身的run() run()调用的是strategy.Send() Send 就是从sender的inputChan读取,做一些统计,调用send()发送给datadog后台,最后message写入sender.outputChan
以http 发送为例 发送给datadog后台
2.3 auditor
2.3.4 auditor核心逻辑
sender的outputChan 即auditor的inputChan 。 Agent.Start()会调用auditor.Start(). auditorStart()主要逻辑是createChannels() 、cleanupRegistry() 和 run()
run()
inputChan收到数据就把identifier、offset 、TailingMode更新到registry中,并启动两个定时器,
- 300秒清理一次过期registry数据
- 1秒flush registry到磁盘数据
updateRegistry()
更新registry在内存中的数据
flushRegistry()
写入registry数据到磁盘指定路径
cleanupRegistry()
清理过期的registry 数据
createChannels()
主要是创建capacity为100的Message channel