前言 今天来介绍 go-zero 生态的另一个组件 go-stash。这是一个 logstash 的 Go 语言替代版,我们用 go-stash 相比原先的 logstash 节省了2/3的服务器资源。如果你在用 logstash,不妨试试,…
#### 前言
今天来介绍
1 | go-zero |
生态的另一个组件
1 | go-stash |
。这是一个
1 | logstash |
的 Go 语言替代版,我们用
1 | go-stash |
相比原先的
1 | logstash |
节省了2/3的服务器资源。如果你在用
1 | logstash |
,不妨试试,也可以看看基于
1 | go-zero |
实现这样的工具是多么的容易,这个工具作者仅用了两天时间。
整体架构
先从它的配置中,我们来看看设计架构。
1 | Clusters: |
看配置名:
1 | kafka |
是数据输出端,
1 | es |
是数据输入端,
1 | filter |
抽象了数据处理过程。
对,整个
1 | go-stash |
就是如 config 配置中显示的,所见即所得。
启动
从
1 | stash.go |
的启动流程大致分为几个部分。因为可以配置多个
1 | cluster |
,那从一个
1 | cluster |
分析:
建立与
1 | es |
的连接【传入
1 | es |
配置】
构建
1 | filter processors |
【
1 | es |
前置处理器,做数据过滤以及处理,可以设置多个】
完善对
1 | es |
中 索引配置,启动
1 | handle |
,同时将
1 | filter |
加入handle【处理输入输出】
连接下游的
1 | kafka |
,将上面创建的
1 | handle |
传入,完成
1 | kafka |
和
1 | es |
之间的数据消费和数据写入
MessageHandler
在上面架构图中,中间的
1 | filter |
只是从 config 中看到,其实更详细是
1 | MessageHandler |
的一部分,做数据过滤和转换,下面来说说这块。
以下代码:https://github.com/tal-tech/go-stash/tree/master/stash/handler/handler.go
1 | type MessageHandler struct { |
这个就对应上面说的,
1 | filter |
只是其中一部分,在结构上
1 | MessageHandler |
是对接下游
1 | es |
,但是没有看到对
1 | kafka |
的操作。
别急,从接口设计上
1 | MessageHandler |
实现了
1 | go-queue |
中
1 | ConsumeHandler |
接口。
这里,上下游就串联了:
1 | MessageHandler |
接管了
1 | es |
的操作,负责数据处理到数据写入
对上实现了
1 | kafka |
的
1 | Consume |
操作。这样在消费过程中执行
1 | handler |
的操作,从而写入
1 | es |
实际上,
1 | Consume() |
也是这么处理的:
1 | func (mh *MessageHandler) Consume(_, val string) error { |
数据流
说完了数据处理,以及上下游的连接点。但是数据要从
1 | kafka -> es |
,数据流出这个动作从
1 | kafka |
角度看,应该是由开发者主动
1 | pull data from kafka |
。
那么数据流是怎么动起来?我们回到主程序 https://github.com/tal-tech/go-stash/blob/master/stash/stash.go
其实 启动 整个流程中,其实就是一个组合模式:
1 | func main() { |
整个数据流,就和这个
1 | group |
组合器有关了。
1 | group.Start() |
那么说明加入
1 | group |
的
1 | service |
都是实现
1 | Start() |
。也就是说
1 | kafka |
端的启动逻辑在
1 | Start() |
:
1 | func (q *kafkaQueue) Start() { |
启动
1 | kafka |
消费程序
启动
1 | kafka |
消费拉取端【可能会被名字迷惑,实际上是从
1 | kafka |
拉取消息到
1 | q.channel |
】
消费程序终止,收尾工作
而我们传入
1 | kafka |
中的
1 | handler |
,上文说过其实是
1 | Consume |
,而这个方法就是在
1 | q.startConsumers() |
中执行的:
1 | q.startConsumers() |
这样整个数据流就彻底串起来了:
总结
作为
1 | go-stash |
第一篇文章,本篇从架构和设计上整体介绍
1 | go-stash |
,有关性能和为什么我们要开发一个这样的组件,我们下篇文章逐渐揭晓。
https://github.com/tal-tech/go-stash
关于
1 | go-zero |
更多的设计和实现文章,可以持续关注我们。
https://github.com/tal-tech/go-zero
https://gitee.com/kevwan/go-zero
欢迎使用 go-zero 并 star 支持我们!
微信交流群
关注『微服务实践』公众号并回复 进群 获取社区群二维码。
本文标题: 极速精简 Go 版 Logstash
本文作者: OSChina
发布时间: 2021年04月15日 09:48
最后更新: 2025年04月03日 11:07
原始链接: https://haoxiang.eu.org/3cca5588/
版权声明: 本文著作权归作者所有,均采用CC BY-NC-SA 4.0许可协议,转载请注明出处!