ELKstack 中文指南
  • ELKstack 中文指南
  • Logstash
    • 入门示例
      • 下载安装
      • hello world
      • 配置语法
      • plugin的安装
      • 长期运行
    • 插件配置
      • input配置
        • file
        • stdin
        • syslog
        • tcp
      • codec配置
        • json
        • multiline
        • collectd
        • netflow
      • filter配置
        • date
        • grok
        • dissect
        • geoip
        • json
        • kv
        • metrics
        • mutate
        • ruby
        • split
        • elapsed
      • output配置
        • elasticsearch
        • email
        • exec
        • file
        • nagios
        • statsd
        • stdout
        • tcp
        • hdfs
    • 场景示例
      • nginx访问日志
      • nginx错误日志
      • postfix日志
      • ossec日志
      • windows系统日志
      • Java日志
      • MySQL慢查询日志
    • 性能与测试
      • generator方式
      • 监控方案
        • logstash-input-heartbeat方式
        • jmx启动参数方式
        • API方式
    • 扩展方案
      • 通过redis传输
      • 通过kafka传输
      • AIX 平台上的logstash-forwarder-java
      • rsyslog
      • nxlog
      • heka
      • fluent
      • Message::Passing
    • 源码解析
      • pipeline流程
      • Event的生成
    • 插件开发
      • utmp插件示例
  • Beats
    • filebeat
    • packetbeat网络流量分析
    • metricbeat
    • winlogbeat
  • ElasticSearch
    • 架构原理
      • segment、buffer和translog对实时性的影响
      • segment merge对写入性能的影响
      • routing和replica的读写过程
      • shard的allocate控制
      • 自动发现的配置
    • 接口使用示例
      • 增删改查操作
      • 搜索请求
      • Painless脚本
      • reindex接口
    • 性能优化
      • bulk提交
      • gateway配置
      • 集群状态维护
      • 缓存
      • fielddata
      • curator工具
      • profile接口
    • rally测试方案
    • 多集群互联
    • 别名的应用
    • 映射与模板的定制
    • puppet-elasticsearch模块的使用
    • 计划内停机升级的操作流程
    • 镜像备份
    • rollover和shrink
    • Ingest节点
    • Hadoop 集成
      • spark streaming交互
    • 权限管理
      • Shield
      • Search-Guard 在 Elasticsearch 2.x 上的运用
    • 监控方案
      • 监控相关接口
        • 集群健康状态
        • 节点状态
        • 索引状态
        • 任务管理
        • cat 接口的命令行使用
      • 日志记录
      • 实时bigdesk方案
      • cerebro
      • zabbix trapper方案
    • ES在运维监控领域的其他玩法
      • percolator接口
      • watcher报警
      • ElastAlert
      • 时序数据库
      • Grafana
      • juttle
      • Etsy的Kale异常检测
  • Kibana 5
    • 安装、配置和运行
    • 生产环境部署
    • discover功能
    • 各visualize功能
      • area
      • table
      • line
      • markdown
      • metric
      • pie
      • tile map
      • vertical bar
    • dashboard功能
    • timelion 介绍
    • console 介绍
    • setting功能
    • 常用sub agg示例
      • 函数堆栈链分析
      • 分图统计
      • TopN的时序趋势图
      • 响应时间的百分占比趋势图
      • 响应时间的概率分布在不同时段的相似度对比
    • 源码解析
      • .kibana索引的数据结构
      • 主页入口
      • discover解析
      • visualize解析
      • dashboard解析
    • 插件
      • 可视化开发示例
      • 后端开发示例
      • 完整app开发示例
    • Kibana报表
  • 竞品对比
  • 推荐阅读
  • 合作名单
  • 捐赠名单
Powered by GitBook
On this page
  • 自己写一个插件
  • 插件格式
  • 插件的关键方法
  • 推荐阅读
  • 插件打包

Was this helpful?

  1. Logstash

插件开发

自己写一个插件

前面已经提过在运行 logstash 的时候,可以通过 --pluginpath 参数来加载自己写的插件。那么,插件又该怎么写呢?

插件格式

一个标准的 logstash 输入插件格式如下:

require 'logstash/namespace'
require 'logstash/inputs/base'
class LogStash::Inputs::MyPlugin < LogStash::Inputs::Base
  config_name 'myplugin'
  default :codec, "line"
  config :myoption_key, :validate => :string, :default => 'myoption_value'
  public def register
  end
  public def run(queue)
  end
end

其中大多数语句在过滤器和输出阶段是共有的。

  • config_name 用来定义该插件写在 logstash 配置文件里的名字;

  • config 可以定义很多个,即该插件在 logstash 配置文件中的可配置参数。logstash 很温馨的提供了验证方法,确保接收的数据是你期望的数据类型;

  • register logstash 在启动的时候运行的函数,一些需要常驻内存的数据,可以在这一步先完成。比如对象初始化,filters/ruby 插件中的 init 语句等。

插件的关键方法

输入插件独有的是 run 方法。在 run 方法中,必须实现一个长期运行的程序(最简单的就是 loop 指令)。然后在每次收到数据并处理成 event,这段示例在上一节演示 Logstash::Event 的生成时已经介绍过。最后,一定要调用 queue << event 语句。一个输入流程就算是完成了。

而如果是过滤器插件,对应修改成:

require 'logstash/filters/base'
class LogStash::Filters::MyPlugin < LogStash::Filters::Base
  config_name 'myplugin'
  public def register
  end
  public def filter(event)

    filter_matched(event)
  end
end

其中,filter_matched 是在 filter 函数完成本插件自己的处理逻辑之后一定要调用的。

输出插件则是:

require 'logstash/outputs/base'
class LogStash::Outputs::MyPlugin < LogStash::Outputs::Base
  config_name 'myplugin'
  concurrency :single
  public def register
  end
  public def multi_receive(events)
  end
end

这里和过去的版本最明显的差别是,处理方法改成了 multi_receive,而不是 receive。因为新的 pipeline 机制是批量传递数据给输出插件的。不过为了兼容过去的插件,LogStash::Outputs::Base 基类中的 multi_receive 实现继续迭代调用了 receive。

另一个是新出现的配置 concurrency,代表着本插件是否 threadsafe,并由此取代了过去的 workers 选项。可选项为:single 和 shared。

  • single 表示本插件是非线程安全的,必须在各 pipeline workers 之间同一时刻只有一个运行。

  • shared 表示本插件是线程安全的,每个 pipeline workers 之间可以独立运行,这也就意味着插件作者要自己在 multi_receive 里调用 Mutexes。

推荐阅读

插件打包

Logstash 从 1.5.0-GA 版开始,对插件规范做了重大变更。放弃了 milestone 定义,去除了 --pluginpath 命令行参数。统一改成 bin/plugin 管理的 rubygem 包插件。那么,我们自己写的 Logstash 插件,也同样需要适应这个新规则,写完 Ruby 代码,还要打包成 gem 才能使用。

为了我们更方便的完成工作,logstash 针对 4 种插件形态提供了 4 个示例库,可以按照自己所需克隆使用。比如要写一个 logstash-filter-mything 插件:

git clone https://github.com/logstash-plugins/logstash-filter-example
cd logstash-filter-example
mv logstash-filter-example.gemspec logstash-filter-mything.gemspec
mv lib/logstash/filters/example.rb lib/logstash/filters/mything.rb
mv spec/filters/example_spec.rb spec/filters/mything_spec.rb

然后把代码写在 lib/logstash/filters/mything.rb 里即可。

代码部分完成。然后就是定义 gem 打包需要的额外文件和库依赖了。

目录中有两个文件,Gemfile 和 logstash-filter-mything.gemspec。

Gemfile 文件就是标准格式,用来运行 bundler install 时下载 rubygems 包的。默认情况下,最基础的内容是:

source 'https://rubygems.org' # 国内建议改成 'http://ruby.taobao.org'
gemspec
gem "logstash", :github => "elastic/logstash", :branch => "1.5"

gemspec 文件则是用来定义软件包本身规范,不单限于 rubygems。示例如下:

Gem::Specification.new do |s|
  s.name = 'logstash-filter-mything'
  s.version         = '1.1.0'
  s.licenses = ['Apache License (2.0)']
  s.summary = "This mything filter is just for Elastic Stack Guide example"
  s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
  s.authors = ["Chenryn"]
  s.email = 'chenlin7@staff.sina.com.cn'
  s.homepage = "http://kibana.logstash.es"
  s.require_paths = ["lib"]

  s.files = `find . -type f ! -wholename '*.svn*'`.split($\)
  s.test_files = s.files.grep(%r{^(test|spec|features)/})

  s.metadata = { "logstash_plugin" => "true", "logstash_group" => "filter" }

  s.requirements << "jar 'org.elasticsearch:elasticsearch', '1.4.0'"
  s.add_runtime_dependency "jar-dependencies"
  s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 2.0.0'
  s.add_development_dependency 'logstash-devutils'
end

其中:

  • s.version 就是 milestone 的替代品,0.1.x 相当于是 milestone 0;0.9.x 相当于是 milestone 2;1.x.x 相当于是 milestone 3。

  • s.file 默认写法是 git ls-files,因为默认是 git 库,如果你本身采用了 svn,或者 cvs 库,都不要紧,只要命令列出的是你需要打包进去的文件即可。

  • s.metadata 是 logstash 的 plugin 命令在 install 的时候会提前 verify 的特殊信息,一定要保留。

  • s.add_runtime_dependency 是定义插件依赖库的指令。如果有 jar 包依赖,则额外再加 s.requirements。

好了,全部完毕。下面打包:

gem build logstash-filter-mything.gemspec

运行完就会生成一个 logstash-filter-mything-1.1.0.gem 软件包,可以安装使用了。

PreviousEvent的生成Nextutmp插件示例

Last updated 5 years ago

Was this helpful?

Extending logstash
Plugin Milestones