ELKstack 中文指南
  • ELKstack 中文指南
  • Logstash
    • 入门示例
      • 下载安装
      • hello world
      • 配置语法
      • plugin的安装
      • 长期运行
    • 插件配置
      • input配置
        • file
        • stdin
        • syslog
        • tcp
      • codec配置
        • json
        • multiline
        • collectd
        • netflow
      • filter配置
        • date
        • grok
        • dissect
        • geoip
        • json
        • kv
        • metrics
        • mutate
        • ruby
        • split
        • elapsed
      • output配置
        • elasticsearch
        • email
        • exec
        • file
        • nagios
        • statsd
        • stdout
        • tcp
        • hdfs
    • 场景示例
      • nginx访问日志
      • nginx错误日志
      • postfix日志
      • ossec日志
      • windows系统日志
      • Java日志
      • MySQL慢查询日志
    • 性能与测试
      • generator方式
      • 监控方案
        • logstash-input-heartbeat方式
        • jmx启动参数方式
        • API方式
    • 扩展方案
      • 通过redis传输
      • 通过kafka传输
      • AIX 平台上的logstash-forwarder-java
      • rsyslog
      • nxlog
      • heka
      • fluent
      • Message::Passing
    • 源码解析
      • pipeline流程
      • Event的生成
    • 插件开发
      • utmp插件示例
  • Beats
    • filebeat
    • packetbeat网络流量分析
    • metricbeat
    • winlogbeat
  • ElasticSearch
    • 架构原理
      • segment、buffer和translog对实时性的影响
      • segment merge对写入性能的影响
      • routing和replica的读写过程
      • shard的allocate控制
      • 自动发现的配置
    • 接口使用示例
      • 增删改查操作
      • 搜索请求
      • Painless脚本
      • reindex接口
    • 性能优化
      • bulk提交
      • gateway配置
      • 集群状态维护
      • 缓存
      • fielddata
      • curator工具
      • profile接口
    • rally测试方案
    • 多集群互联
    • 别名的应用
    • 映射与模板的定制
    • puppet-elasticsearch模块的使用
    • 计划内停机升级的操作流程
    • 镜像备份
    • rollover和shrink
    • Ingest节点
    • Hadoop 集成
      • spark streaming交互
    • 权限管理
      • Shield
      • Search-Guard 在 Elasticsearch 2.x 上的运用
    • 监控方案
      • 监控相关接口
        • 集群健康状态
        • 节点状态
        • 索引状态
        • 任务管理
        • cat 接口的命令行使用
      • 日志记录
      • 实时bigdesk方案
      • cerebro
      • zabbix trapper方案
    • ES在运维监控领域的其他玩法
      • percolator接口
      • watcher报警
      • ElastAlert
      • 时序数据库
      • Grafana
      • juttle
      • Etsy的Kale异常检测
  • Kibana 5
    • 安装、配置和运行
    • 生产环境部署
    • discover功能
    • 各visualize功能
      • area
      • table
      • line
      • markdown
      • metric
      • pie
      • tile map
      • vertical bar
    • dashboard功能
    • timelion 介绍
    • console 介绍
    • setting功能
    • 常用sub agg示例
      • 函数堆栈链分析
      • 分图统计
      • TopN的时序趋势图
      • 响应时间的百分占比趋势图
      • 响应时间的概率分布在不同时段的相似度对比
    • 源码解析
      • .kibana索引的数据结构
      • 主页入口
      • discover解析
      • visualize解析
      • dashboard解析
    • 插件
      • 可视化开发示例
      • 后端开发示例
      • 完整app开发示例
    • Kibana报表
  • 竞品对比
  • 推荐阅读
  • 合作名单
  • 捐赠名单
Powered by GitBook
On this page

Was this helpful?

  1. Logstash
  2. 源码解析

Event的生成

上一节大家可能注意到了,整个 pipeline 非常简单,无非就是一个多线程的线程间数据读写。但是,之前介绍的 codec 在哪里?这个问题,并不在 pipeline 中完成,而是 plugin 中。

Logstash 从 1.5 开始,把各个 plugin 拆分成了单独的 gem,主代码里只留下了几个 base.rb 类。所以,要了解详细情况,我们需要阅读一个实际跑数据的插件,比如 vendor/bundle/jruby/1.9/gems/logstash-input-stdin-3.2.0/lib/logstash/inputs/stdin.rb。

可以看到其中最关键的读取数据部分代码如下:

    @host = Socket.gethostname
    while !stop?
      if data = stdin_read
      @codec.decode(data) do |event|
        decorate(event)
        event.set("host", @host) if !event.include?("host")
        queue << event
      end
    end

这里两个关键函数:@codec.decode(line) 和 decorate(event)。

@codec 在 stdin.rb 中默认为 line,那么我们就继续看 vendor/bundle/jruby/1.9/gems/logstash-codec-line-3.0.2/lib/logstash/codecs/line.rb 的相关部分:

  def register
    require "logstash/util/buftok"
    @buffer = FileWatch::BufferedTokenizer.new(@delimiter)
    @converter = LogStash::Util::Charset.new(@charset)
    @converter.logger = @logger
  end
  public
  def decode(data)
    @buffer.extract(data).each do |line|
      yield LogStash::Event.new("message" => @converter.convert(line))
    end
  end # def decode

超简短。就是在这个 @codec.decode(data) 里,生成了 LogStash::Event 对象。那么,我们通过 output { codec => rubydebug } 看到的除了 message 字段以外的那些数据,又是怎么来的呢?尤其是那个 @timestamp 是怎么出来的?

在 5.0 之前,我们可以通过 lib/logstash/event.rb 看到相关属性的定义和操作。5.0 之后,Logstash 为了提高性能,对 Event 部分采用 Java 语言进行了重构,现在你在 logstash-core-event-java/lib/logstash/event.rb 里只能看到通过 JRuby 的专属 require 指令加载 jar 的语句了。

  public static final String METADATA = "@metadata";
  public static final String METADATA_BRACKETS = "[" + METADATA + "]";
  public static final String TIMESTAMP = "@timestamp";
  public static final String TIMESTAMP_FAILURE_TAG = "_timestampparsefailure";
  public static final String TIMESTAMP_FAILURE_FIELD = "_@timestamp";
  public static final String VERSION = "@version";
  public static final String VERSION_ONE = "1";

  public Event()
  {
      this.metadata = new HashMap<String, Object>();
      this.data = new HashMap<String, Object>();
      this.data.put(VERSION, VERSION_ONE);
      this.cancelled = false;
      this.timestamp = new Timestamp();
      this.data.put(TIMESTAMP, this.timestamp);
      this.accessors = new Accessors(this.data);
      this.metadata_accessors = new Accessors(this.metadata);
  }
  public class Timestamp implements Cloneable {
    private DateTime time;
    public Timestamp() {
      this.time = new DateTime(DateTimeZone.UTC);
    }
  }

这就是我们看到 Logstash 生成的事件总是 UTC 时区时间的原因。

至于如果一开始就传入了 @timestamp 数据的处理,则是这样:

  public Timestamp(String iso8601) {
    this.time = ISODateTimeFormat.dateTimeParser().parseDateTime(iso8601).toDateTime(DateTimeZone.UTC);
  }
  public Timestamp(long epoch_milliseconds) {
    this.time = new DateTime(epoch_milliseconds, DateTimeZone.UTC);
  }

同样会利用 joda 库做一次解析,还是转换成 UTC 时区。

Previouspipeline流程Next插件开发

Last updated 5 years ago

Was this helpful?

想要了解 Logstash::Event 的实际定义,需要去 Git 仓库下载,然后阅读 Java 源代码,你也可以直接通过网页阅读,地址是::

现在就清楚了,这个特殊的 @timestamp 是在 event 对象初始化的时候加上的,其实现同样在这个 Java 源码中,见:

https://github.com/elastic/logstash/blob/master/logstash-core-event-java/src/main/java/org/logstash/Event.java
https://github.com/elastic/logstash/blob/master/logstash-core-event-java/src/main/java/org/logstash/Timestamp.java