在一开始,就介绍过,Logstash 对日志的处理,从 input 到 output,就像在 Linux 命令行上的管道操作一样。事实上,在 Logstash 中,对此有一个专门的名词,叫 Pipeline。
bin/logstash
-> logstash-core/lib/logstash/runner.rb
-> logstash-core/lib/logstash/agent.rb
-> logstash-core/lib/logstash/pipeline.rb
# 初始化阶段
@config = grammar.parse(configstr)
code = @config.compile
eval(code)
queue = LogStash::Util::WrappedSynchronousQueue.new
@input_queue_client = queue.write_client
@filter_queue_client = queue.read_client
# 启动指标计数器
@filter_queue_client.set_events_metric()
@filter_queue_client.set_pipeline_metric()
# 运行
LogStash::Util.set_thread_name("[#{pipeline_id}]-pipeline-manager")
# 启动输入插件
@inputs.each do |input|
input.register
@input_threads << Thread.new do
LogStash::Util::set_thread_name("[#{pipeline_id}]<#{input.class.config_name}")
plugin.run(@input_queue)
end
end
@outputs.each {|o| o.register }
@filters.each {|f| f.register }
max_inflight = batch_size * pipeline_workers
pipeline_workers.times do |t|
@worker_threads << Thread.new do
LogStash::Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
@filter_queue_client.set_batch_dimensions(batch_size, batch_delay)
while true
batch = @filter_queue_client.take_batch
# 开始过滤
batch.each do |event|
filter_func(event).each do |e|
batch.merge(e)
end
end
# 计数
@filter_queue_client.add_filtered_metrics(batch)
# 开始输出
output_events_map = Hash.new { |h,k| h[k] = [] }
batch.each do |event|
output_func(event).each do |output|
output_events_map[output].push(event)
end
end
output_events_map.each do |output, events|
output.multi_receive(events)
end
@filter_queue_client.add_output_metrics(batch)
# 释放
@filter_queue_client.close_batch(batch)
end
end
end
# 运行
@input_threads.each(&:join)
由于 input 到 filter 之间有唯一的队列,任意一个 filter 或者 output 发生堵塞,都会一直堵塞到最前端的接收。这也是 logstash-input-heartbeat 的理论基础。