前言 今天来介绍 go-zero 生态的另一个组件 go-stash。这是一个 logstash 的 Go 语言替代版,我们用 go-stash 相比原先的 logstash 节省了2/3的服务器资源。如果你在用 logstash,不妨试试,…
一、极速精简Go版Logstash
1.1 前言
今天来介绍go-zero
生态的另一个组件go-stash
。这是一个logstash
的 Go 语言替代版,我们用
相比原先的
节省了2/3的服务器资源。如果你在用
,不妨试试,也可以看看基于
实现这样的工具是多么的容易,这个工具作者仅用了两天时间。
1.2 整体架构
先从它的配置中,我们来看看设计架构。
1 | Clusters: |
抽象了数据处理过程。
对,整个
就是如 config 配置中显示的,所见即所得。
1.3 启动
从stash.go
的启动流程大致分为几个部分。因为可以配置多个cluster
,那从一个
分析:
建立与es
的连接【传入es
配置】
构建filter processors
【es
前置处理器,做数据过滤以及处理,可以设置多个】
完善对es
中 索引配置,启动handle
,同时将filter
加入handle【处理输入输出】
连接下游的kafka
,将上面创建的
传入,完成
和es
之间的数据消费和数据写入
1.4 MessageHandler
在上面架构图中,中间的
只是从 config 中看到,其实更详细是MessageHandler
的一部分,做数据过滤和转换,下面来说说这块。
以下代码:https://github.com/tal-tech/go-stash/tree/master/stash/handler/handler.go
1 | type MessageHandler struct { |
的操作。
别急,从接口设计上
实现了go-queue
中ConsumeHandler
接口。
这里,上下游就串联了:
接管了es
的操作,负责数据处理到数据写入
对上实现了
的Consume
操作。这样在消费过程中执行handler
的操作,从而写入es
1 | 实际上, `Consume()` 也是这么处理的: |
1 | func (mh *MessageHandler) Consume(_, val string) error { |
return err
}
1 | // es 写入index配置 |
return nil
}
1 | bs, err := jsoniter.Marshal(m) |
}
1 | // es 写入 |
}
1 |
|
...
1 | // service 组合模式 |
1 | for _, processor := range c.Clusters { |
...
1 | // filter processors 构建 |
...
1 | // 准备es的写入操作 {写入的index, 写入器writer} |
}
1 | // 启动这个组合器 |
}
整个数据流,就和这个```text group组合器有关了。```java
1 | |- group.doStart() |
那么说明加入```text group的text service `都是实现java
1 | Start() |
。也就是说```text kafka端的启动逻辑在java `:java
1 | func (q *kafkaQueue) Start() { |
1 | q.producerRoutines.Wait() |
}
`启动```text
kafka
1 | 消费程序 |
这样整个数据流就彻底串起来了:
1.6 总结
作为
第一篇文章,本篇从架构和设计上整体介绍
,有关性能和为什么我们要开发一个这样的组件,我们下篇文章逐渐揭晓。
https://github.com/tal-tech/go-stash
关于
https://github.com/tal-tech/go-zero
https://gitee.com/kevwan/go-zero
欢迎使用 go-zero 并 star 支持我们!
本文标题: 极速精简Go版Logstash
发布时间: 2019年02月26日 00:00
最后更新: 2025年12月30日 08:54
原始链接: https://haoxiang.eu.org/3aebe5fd/
版权声明: 本文著作权归作者所有,均采用CC BY-NC-SA 4.0许可协议,转载请注明出处!

