开发一个filebeat output websocket插件

开发一个filebeat的websocket插件, 代码仓地址: https://github.com/shoothzj/beats_output_websocket

引入对beat的依赖#

1
go get github.com/elastic/beats/v7

定义在filebeat中的配置文件#

filebeat通常以配置文件的方式加载插件。让我们定义一下必须的配置,就像elasticsearch中的连接地址等等一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
output.websocket:
# worker
# 用于工作的websocket客户端数量
workers: 1
# 日志批量的最大大小
batch_size: 1
# 重试的最大次数,0代表不重试
retry_limit: 1
# conn
# ws/wss
schema: "ws"
# websocket连接地址
addr: "localhost:8080"
# websocket路径
path: "/echo"
# websocket心跳间隔,用于保活
ping_interval: 30

go文件中的配置#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type clientConfig struct {
// Number of worker goroutines publishing log events
Workers int `config:"workers" validate:"min=1"`
// Max number of events in a batch to send to a single client
BatchSize int `config:"batch_size" validate:"min=1"`
// Max number of retries for single batch of events
RetryLimit int `config:"retry_limit"`
// Schema WebSocket Schema
Schema string `config:"schema"`
// Addr WebSocket Addr
Addr string `config:"addr"`
// Path WebSocket Path
Path string `config:"path"`
// PingInterval WebSocket PingInterval
PingInterval int `config:"ping_interval"`
}

初始化加载插件#

加载插件#

在某个init函数中注册插件

1
2
3
func init() {
outputs.RegisterType("websocket", newWsOutput)
}

newWsOutput中卸载配置,并提供配置给WebSocket客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func newWsOutput(_ outputs.IndexManager, _ beat.Info, stats outputs.Observer, cfg *common.Config) (outputs.Group, error) {
config := clientConfig{}
// 卸载配置,将配置用于初始化WebSocket客户端
if err := cfg.Unpack(&config); err != nil {
return outputs.Fail(err)
}
clients := make([]outputs.NetworkClient, config.Workers)
for i := 0; i < config.Workers; i++ {
clients[i] = &wsClient{
stats: stats,
Schema: config.Schema,
Host: config.Addr,
Path: config.Path,
PingInterval: config.PingInterval,
}
}

return outputs.SuccessNet(true, config.BatchSize, config.RetryLimit, clients)
}

初始化WebSocket客户端#

WebSocket客户端不仅仅是一个WebSocket客户端,而且还需要实现filebeat中的NetworkClient接口,接下来,让我们来关注接口中的每一个方法的作用及实现

String()接口#

String作为客户端的名字,用来标识日志以及指标。是最简单的一个接口

1
2
3
func (w *wsClient) String() string {
return "websocket"
}

Connect()接口#

Connect用来初始化客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (w *wsClient) Connect() error {
u := url.URL{Scheme: w.Schema, Host: w.Host, Path: w.Path}
dial, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err == nil {
w.conn = dial
ticker := time.NewTicker(time.Duration(w.PingInterval) * time.Second)
go func() {
for range ticker.C {
w.conn.WriteMessage(websocket.PingMessage, nil)
}
}()
} else {
time.Sleep(10 * time.Second)
}
return err
}

注意,这里初始化失败,需要Sleep一段时间,否则,filebeat会一直重试。这绝非是你想要的。或许对于场景来说,退避重试可能会更好

Close()接口#

关闭客户端,也是很简单的接口

1
2
3
func (w *wsClient) Close() error {
return w.conn.Close()
}

Publish()接口#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (w *wsClient) Publish(_ context.Context, batch publisher.Batch) error {
events := batch.Events()
// 记录这批日志
w.stats.NewBatch(len(events))
failEvents, err := w.PublishEvents(events)
if err != nil {
// 如果发送正常,则ACK
batch.ACK()
} else {
// 发送失败,则重试。受RetryLimit的限制
batch.RetryEvents(failEvents)
}
return err
}

func (w *wsClient) PublishEvents(events []publisher.Event) ([]publisher.Event, error) {
for i, event := range events {
err := w.publishEvent(&event)
if err != nil {
// 如果单条消息发送失败,则将剩余的消息直接重试
return events[i:], err
}
}
return nil, nil
}

func (w *wsClient) publishEvent(event *publisher.Event) error {
bytes, err := encode(&event.Content)
if err != nil {
// 如果编码失败,就不重试了,重试也不会成功
// encode error, don't retry.
// consider being success
return nil
}
err = w.conn.WriteMessage(websocket.TextMessage, bytes)
if err != nil {
// 写入WebSocket Server失败
return err
}
return nil
}

编码#

编码的逻辑因人而异,事实上,这可能是大家最大的差异所在。这里只是做一个简单地例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type LogOutput struct {
Timestamp time.Time `json:"timestamp"`
Message string `json:"message"`
}

func encode(event *beat.Event) ([]byte, error) {
logOutput := &LogOutput{}
value, err := event.Fields.GetValue("message")
if err != nil {
return nil, err
}
logOutput.Timestamp = event.Timestamp
logOutput.Message = value.(string)
return json.Marshal(logOutput)
}

最后是我们的wsclient#

1
2
3
4
5
6
7
8
9
10
type wsClient struct {
// construct field
Schema string
Host string
Path string
PingInterval int

stats outputs.Observer
conn *websocket.Conn
}

添加额外的功能:大包丢弃#

你可能会想保护你的WebSocket服务器,避免接收到超级大的日志。我们可以在配置项中添加一个配置

maxLen用来限制日志长度,超过maxLen的日志直接丢弃。为什么不使用filebeat中的max_bytes

因为filebeatmax_bytes的默认行为是截断,截断的日志在某些场景下不如丢弃。(比如,日志是json格式,截断后格式无法解析)

配置中添加maxLen#

1
max_len: 1024

省略掉那些重复的添加结构体,读取max_len在encode的时候忽略掉

1
2
3
4
s := value.(string)
if len(s) >= w.MaxLen {
return nil, err
}