ELKstack 中文指南
  • ELKstack 中文指南
  • Logstash
    • 入门示例
      • 下载安装
      • hello world
      • 配置语法
      • plugin的安装
      • 长期运行
    • 插件配置
      • input配置
        • file
        • stdin
        • syslog
        • tcp
      • codec配置
        • json
        • multiline
        • collectd
        • netflow
      • filter配置
        • date
        • grok
        • dissect
        • geoip
        • json
        • kv
        • metrics
        • mutate
        • ruby
        • split
        • elapsed
      • output配置
        • elasticsearch
        • email
        • exec
        • file
        • nagios
        • statsd
        • stdout
        • tcp
        • hdfs
    • 场景示例
      • nginx访问日志
      • nginx错误日志
      • postfix日志
      • ossec日志
      • windows系统日志
      • Java日志
      • MySQL慢查询日志
    • 性能与测试
      • generator方式
      • 监控方案
        • logstash-input-heartbeat方式
        • jmx启动参数方式
        • API方式
    • 扩展方案
      • 通过redis传输
      • 通过kafka传输
      • AIX 平台上的logstash-forwarder-java
      • rsyslog
      • nxlog
      • heka
      • fluent
      • Message::Passing
    • 源码解析
      • pipeline流程
      • Event的生成
    • 插件开发
      • utmp插件示例
  • Beats
    • filebeat
    • packetbeat网络流量分析
    • metricbeat
    • winlogbeat
  • ElasticSearch
    • 架构原理
      • segment、buffer和translog对实时性的影响
      • segment merge对写入性能的影响
      • routing和replica的读写过程
      • shard的allocate控制
      • 自动发现的配置
    • 接口使用示例
      • 增删改查操作
      • 搜索请求
      • Painless脚本
      • reindex接口
    • 性能优化
      • bulk提交
      • gateway配置
      • 集群状态维护
      • 缓存
      • fielddata
      • curator工具
      • profile接口
    • rally测试方案
    • 多集群互联
    • 别名的应用
    • 映射与模板的定制
    • puppet-elasticsearch模块的使用
    • 计划内停机升级的操作流程
    • 镜像备份
    • rollover和shrink
    • Ingest节点
    • Hadoop 集成
      • spark streaming交互
    • 权限管理
      • Shield
      • Search-Guard 在 Elasticsearch 2.x 上的运用
    • 监控方案
      • 监控相关接口
        • 集群健康状态
        • 节点状态
        • 索引状态
        • 任务管理
        • cat 接口的命令行使用
      • 日志记录
      • 实时bigdesk方案
      • cerebro
      • zabbix trapper方案
    • ES在运维监控领域的其他玩法
      • percolator接口
      • watcher报警
      • ElastAlert
      • 时序数据库
      • Grafana
      • juttle
      • Etsy的Kale异常检测
  • Kibana 5
    • 安装、配置和运行
    • 生产环境部署
    • discover功能
    • 各visualize功能
      • area
      • table
      • line
      • markdown
      • metric
      • pie
      • tile map
      • vertical bar
    • dashboard功能
    • timelion 介绍
    • console 介绍
    • setting功能
    • 常用sub agg示例
      • 函数堆栈链分析
      • 分图统计
      • TopN的时序趋势图
      • 响应时间的百分占比趋势图
      • 响应时间的概率分布在不同时段的相似度对比
    • 源码解析
      • .kibana索引的数据结构
      • 主页入口
      • discover解析
      • visualize解析
      • dashboard解析
    • 插件
      • 可视化开发示例
      • 后端开发示例
      • 完整app开发示例
    • Kibana报表
  • 竞品对比
  • 推荐阅读
  • 合作名单
  • 捐赠名单
Powered by GitBook
On this page

Was this helpful?

  1. Logstash
  2. 源码解析

pipeline流程

在一开始,就介绍过,Logstash 对日志的处理,从 input 到 output,就像在 Linux 命令行上的管道操作一样。事实上,在 Logstash 中,对此有一个专门的名词,叫 Pipeline。

Pipeline 的代码加载路径如下:

bin/logstash -> logstash-core/lib/logstash/runner.rb -> logstash-core/lib/logstash/agent.rb -> logstash-core/lib/logstash/pipeline.rb

Logstash 从 2.2 版开始对 pipeline 做了大幅重构,目前最新 5.0 版的 pipeline.rb,可以归纳成下面这么一段缩略版的代码:

    # 初始化阶段
    @config = grammar.parse(configstr)
    code = @config.compile
    eval(code)

    queue = LogStash::Util::WrappedSynchronousQueue.new
    @input_queue_client = queue.write_client
    @filter_queue_client = queue.read_client

    # 启动指标计数器
    @filter_queue_client.set_events_metric()
    @filter_queue_client.set_pipeline_metric()

    # 运行
    LogStash::Util.set_thread_name("[#{pipeline_id}]-pipeline-manager")

    # 启动输入插件
    @inputs.each do |input|
        input.register
        @input_threads << Thread.new do
            LogStash::Util::set_thread_name("[#{pipeline_id}]<#{input.class.config_name}")
            plugin.run(@input_queue)
        end
    end

    @outputs.each {|o| o.register }
    @filters.each {|f| f.register }

    max_inflight = batch_size * pipeline_workers
    pipeline_workers.times do |t|
        @worker_threads << Thread.new do
            LogStash::Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
            @filter_queue_client.set_batch_dimensions(batch_size, batch_delay)
            while true
                batch = @filter_queue_client.take_batch

                # 开始过滤
                batch.each do |event|
                    filter_func(event).each do |e|
                        batch.merge(e)
                    end
                end
                # 计数
                @filter_queue_client.add_filtered_metrics(batch)

                # 开始输出
                output_events_map = Hash.new { |h,k|  h[k] = [] }
                batch.each do |event|
                    output_func(event).each do |output|
                        output_events_map[output].push(event)
                    end
                end
                output_events_map.each do |output, events|
                    output.multi_receive(events)
                end
                @filter_queue_client.add_output_metrics(batch)

                # 释放
                @filter_queue_client.close_batch(batch)
            end
        end
    end

    # 运行
    @input_threads.each(&:join)

整个缩略版,可以了解到一个关键信息,对我们理解 Logstash 原理是最有用的:queue 是一个固定大小为 0 的多线程同步队列。filter 和 output 插件,则在相同的 pipeline_worker 线程中运行,该线程每次批量获取数据,也批量传递给 filter 和 output 插件。

由于 input 到 filter 之间有唯一的队列,任意一个 filter 或者 output 发生堵塞,都会一直堵塞到最前端的接收。这也是 logstash-input-heartbeat 的理论基础。

Previous源码解析NextEvent的生成

Last updated 5 years ago

Was this helpful?

这个全新的 NG pipeline 是从 2.2 版开始发布的,当时也导致 logstash-output-elasticsearch 的 ESClient 数量比过去大幅增加,对写入 Elasticsearch 的性能是不利的。随后官方意识到这个问题,并大举重构了 logstash-output-elasticsearch 的实现,改成了一个整体连接池的方式,代码见:。相关的新配置参数,在之前插件介绍中已经讲过。

https://github.com/logstash-plugins/logstash-output-elasticsearch/commit/06a47535111881b2bc6c9dbd3908e664e4852476