Event的生成

上一节大家可能注意到了,整个 pipeline 非常简单,无非就是一个多线程的线程间数据读写。但是,之前介绍的 codec 在哪里?这个问题,并不在 pipeline 中完成,而是 plugin 中。

Logstash 从 1.5 开始,把各个 plugin 拆分成了单独的 gem,主代码里只留下了几个 base.rb 类。所以,要了解详细情况,我们需要阅读一个实际跑数据的插件,比如 vendor/bundle/jruby/1.9/gems/logstash-input-stdin-3.2.0/lib/logstash/inputs/stdin.rb

可以看到其中最关键的读取数据部分代码如下:

    @host = Socket.gethostname
    while !stop?
      if data = stdin_read
      @codec.decode(data) do |event|
        decorate(event)
        event.set("host", @host) if !event.include?("host")
        queue << event
      end
    end

这里两个关键函数:@codec.decode(line)decorate(event)

@codec 在 stdin.rb 中默认为 line,那么我们就继续看 vendor/bundle/jruby/1.9/gems/logstash-codec-line-3.0.2/lib/logstash/codecs/line.rb 的相关部分:

  def register
    require "logstash/util/buftok"
    @buffer = FileWatch::BufferedTokenizer.new(@delimiter)
    @converter = LogStash::Util::Charset.new(@charset)
    @converter.logger = @logger
  end
  public
  def decode(data)
    @buffer.extract(data).each do |line|
      yield LogStash::Event.new("message" => @converter.convert(line))
    end
  end # def decode

超简短。就是在这个 @codec.decode(data) 里,生成了 LogStash::Event 对象。那么,我们通过 output { codec => rubydebug } 看到的除了 message 字段以外的那些数据,又是怎么来的呢?尤其是那个 @timestamp 是怎么出来的?

在 5.0 之前,我们可以通过 lib/logstash/event.rb 看到相关属性的定义和操作。5.0 之后,Logstash 为了提高性能,对 Event 部分采用 Java 语言进行了重构,现在你在 logstash-core-event-java/lib/logstash/event.rb 里只能看到通过 JRuby 的专属 require 指令加载 jar 的语句了。

想要了解 Logstash::Event 的实际定义,需要去 Git 仓库下载,然后阅读 Java 源代码,你也可以直接通过网页阅读,地址是:https://github.com/elastic/logstash/blob/master/logstash-core-event-java/src/main/java/org/logstash/Event.java

现在就清楚了,这个特殊的 @timestamp 是在 event 对象初始化的时候加上的,其实现同样在这个 Java 源码中,见https://github.com/elastic/logstash/blob/master/logstash-core-event-java/src/main/java/org/logstash/Timestamp.java

这就是我们看到 Logstash 生成的事件总是 UTC 时区时间的原因。

至于如果一开始就传入了 @timestamp 数据的处理,则是这样:

同样会利用 joda 库做一次解析,还是转换成 UTC 时区。

Last updated

Was this helpful?