开发一个filebeat的websocket
插件, 代码仓地址: https://github.com/shoothzj/beats_output_websocket
引入对beat
的依赖 1 go get github.com/elastic/beats/v7
定义在filebeat中的配置文件 filebeat
通常以配置文件的方式加载插件。让我们定义一下必须的配置,就像elasticsearch
中的连接地址等等一样。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 output.websocket: workers: 1 batch_size: 1 retry_limit: 1 schema: "ws" addr: "localhost:8080" path: "/echo" ping_interval: 30
go文件中的配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 type clientConfig struct { Workers int `config:"workers" validate:"min=1"` BatchSize int `config:"batch_size" validate:"min=1"` RetryLimit int `config:"retry_limit"` Schema string `config:"schema"` Addr string `config:"addr"` Path string `config:"path"` PingInterval int `config:"ping_interval"` }
初始化加载插件 加载插件 在某个init函数中注册插件
1 2 3 func init () { outputs.RegisterType("websocket" , newWsOutput) }
在newWsOutput
中卸载配置,并提供配置给WebSocket
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func newWsOutput (_ outputs.IndexManager, _ beat.Info, stats outputs.Observer, cfg *common.Config) (outputs.Group, error ) { config := clientConfig{} if err := cfg.Unpack(&config); err != nil { return outputs.Fail(err) } clients := make ([]outputs.NetworkClient, config.Workers) for i := 0 ; i < config.Workers; i++ { clients[i] = &wsClient{ stats: stats, Schema: config.Schema, Host: config.Addr, Path: config.Path, PingInterval: config.PingInterval, } } return outputs.SuccessNet(true , config.BatchSize, config.RetryLimit, clients) }
初始化WebSocket
客户端 WebSocket
客户端不仅仅是一个WebSocket
客户端,而且还需要实现filebeat
中的NetworkClient
接口,接下来,让我们来关注接口中的每一个方法的作用及实现
String()接口 String
作为客户端的名字,用来标识日志以及指标。是最简单的一个接口
1 2 3 func (w *wsClient) String() string { return "websocket" }
Connect()接口 Connect
用来初始化客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (w *wsClient) Connect() error { u := url.URL{Scheme: w.Schema, Host: w.Host, Path: w.Path} dial, _, err := websocket.DefaultDialer.Dial(u.String(), nil ) if err == nil { w.conn = dial ticker := time.NewTicker(time.Duration(w.PingInterval) * time.Second) go func () { for range ticker.C { w.conn.WriteMessage(websocket.PingMessage, nil ) } }() } else { time.Sleep(10 * time.Second) } return err }
注意,这里初始化失败,需要Sleep
一段时间,否则,filebeat会一直重试。这绝非是你想要的。或许对于场景来说,退避重试可能会更好
Close()接口 关闭客户端,也是很简单的接口
1 2 3 func (w *wsClient) Close() error { return w.conn.Close() }
Publish()接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 func (w *wsClient) Publish(_ context.Context, batch publisher.Batch) error { events := batch.Events() w.stats.NewBatch(len (events)) failEvents, err := w.PublishEvents(events) if err != nil { batch.ACK() } else { batch.RetryEvents(failEvents) } return err } func (w *wsClient) PublishEvents(events []publisher.Event) ([]publisher.Event, error ) { for i, event := range events { err := w.publishEvent(&event) if err != nil { return events[i:], err } } return nil , nil } func (w *wsClient) publishEvent(event *publisher.Event) error { bytes, err := encode(&event.Content) if err != nil { return nil } err = w.conn.WriteMessage(websocket.TextMessage, bytes) if err != nil { return err } return nil }
编码 编码的逻辑因人而异,事实上,这可能是大家最大的差异所在。这里只是做一个简单地例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 type LogOutput struct { Timestamp time.Time `json:"timestamp"` Message string `json:"message"` } func encode (event *beat.Event) ([]byte , error ) { logOutput := &LogOutput{} value, err := event.Fields.GetValue("message" ) if err != nil { return nil , err } logOutput.Timestamp = event.Timestamp logOutput.Message = value.(string ) return json.Marshal(logOutput) }
最后是我们的wsclient
1 2 3 4 5 6 7 8 9 10 type wsClient struct { Schema string Host string Path string PingInterval int stats outputs.Observer conn *websocket.Conn }
添加额外的功能:大包丢弃 你可能会想保护你的WebSocket
服务器,避免接收到超级大的日志。我们可以在配置项中添加一个配置
maxLen用来限制日志长度,超过maxLen的日志直接丢弃。为什么不使用filebeat
中的max_bytes
?
因为filebeat
中max_bytes
的默认行为是截断,截断的日志在某些场景下不如丢弃。(比如,日志是json格式,截断后格式无法解析)
配置中添加maxLen
省略掉那些重复的添加结构体,读取max_len
在encode的时候忽略掉
1 2 3 4 s := value.(string ) if len (s) >= w.MaxLen { return nil , err }