ELKstack 中文指南
  • ELKstack 中文指南
  • Logstash
    • 入门示例
      • 下载安装
      • hello world
      • 配置语法
      • plugin的安装
      • 长期运行
    • 插件配置
      • input配置
        • file
        • stdin
        • syslog
        • tcp
      • codec配置
        • json
        • multiline
        • collectd
        • netflow
      • filter配置
        • date
        • grok
        • dissect
        • geoip
        • json
        • kv
        • metrics
        • mutate
        • ruby
        • split
        • elapsed
      • output配置
        • elasticsearch
        • email
        • exec
        • file
        • nagios
        • statsd
        • stdout
        • tcp
        • hdfs
    • 场景示例
      • nginx访问日志
      • nginx错误日志
      • postfix日志
      • ossec日志
      • windows系统日志
      • Java日志
      • MySQL慢查询日志
    • 性能与测试
      • generator方式
      • 监控方案
        • logstash-input-heartbeat方式
        • jmx启动参数方式
        • API方式
    • 扩展方案
      • 通过redis传输
      • 通过kafka传输
      • AIX 平台上的logstash-forwarder-java
      • rsyslog
      • nxlog
      • heka
      • fluent
      • Message::Passing
    • 源码解析
      • pipeline流程
      • Event的生成
    • 插件开发
      • utmp插件示例
  • Beats
    • filebeat
    • packetbeat网络流量分析
    • metricbeat
    • winlogbeat
  • ElasticSearch
    • 架构原理
      • segment、buffer和translog对实时性的影响
      • segment merge对写入性能的影响
      • routing和replica的读写过程
      • shard的allocate控制
      • 自动发现的配置
    • 接口使用示例
      • 增删改查操作
      • 搜索请求
      • Painless脚本
      • reindex接口
    • 性能优化
      • bulk提交
      • gateway配置
      • 集群状态维护
      • 缓存
      • fielddata
      • curator工具
      • profile接口
    • rally测试方案
    • 多集群互联
    • 别名的应用
    • 映射与模板的定制
    • puppet-elasticsearch模块的使用
    • 计划内停机升级的操作流程
    • 镜像备份
    • rollover和shrink
    • Ingest节点
    • Hadoop 集成
      • spark streaming交互
    • 权限管理
      • Shield
      • Search-Guard 在 Elasticsearch 2.x 上的运用
    • 监控方案
      • 监控相关接口
        • 集群健康状态
        • 节点状态
        • 索引状态
        • 任务管理
        • cat 接口的命令行使用
      • 日志记录
      • 实时bigdesk方案
      • cerebro
      • zabbix trapper方案
    • ES在运维监控领域的其他玩法
      • percolator接口
      • watcher报警
      • ElastAlert
      • 时序数据库
      • Grafana
      • juttle
      • Etsy的Kale异常检测
  • Kibana 5
    • 安装、配置和运行
    • 生产环境部署
    • discover功能
    • 各visualize功能
      • area
      • table
      • line
      • markdown
      • metric
      • pie
      • tile map
      • vertical bar
    • dashboard功能
    • timelion 介绍
    • console 介绍
    • setting功能
    • 常用sub agg示例
      • 函数堆栈链分析
      • 分图统计
      • TopN的时序趋势图
      • 响应时间的百分占比趋势图
      • 响应时间的概率分布在不同时段的相似度对比
    • 源码解析
      • .kibana索引的数据结构
      • 主页入口
      • discover解析
      • visualize解析
      • dashboard解析
    • 插件
      • 可视化开发示例
      • 后端开发示例
      • 完整app开发示例
    • Kibana报表
  • 竞品对比
  • 推荐阅读
  • 合作名单
  • 捐赠名单
Powered by GitBook
On this page
  • 创建管道流
  • 测试管道流
  • 处理器
  • convert
  • grok
  • gsub
  • date
  • 其他处理器插件

Was this helpful?

  1. ElasticSearch

Ingest节点

Ingest 节点是 Elasticsearch 5.0 新增的节点类型和功能。其开启方式为:在 elasticsearch.yml 中定义:

node.ingest: true

Ingest 节点的基础原理,是:节点接收到数据之后,根据请求参数中指定的管道流 id,找到对应的已注册管道流,对数据进行处理,然后将处理过后的数据,按照 Elasticsearch 标准的 indexing 流程继续运行。

创建管道流

curl -XPUT http://localhost:9200/_ingest/pipeline/my-pipeline-id -d '
{
    "description" : "describe pipeline",
    "processors" : [
        {
            "convert" : {
                "field": "foo",
                "type": "integer"
            }
        }
    ]
}'

然后发送端带着这个 my-pipeline-id 发请求就好了。示例见本书 beats 章节的介绍。

测试管道流

想知道自己的 ingest 配置是否正确,可以通过仿真接口测试验证一下:

curl -XPUT http://localhost:9200/_ingest/pipeline/_simulate -d '
{
    "pipeline" : {
        "description" : "describe pipeline",
        "processors" : [
            {
                "set" : {
                    "field": "foo",
                    "value": "bar"
                }
            }
        ]
    },
    "docs" : [
        {
            "_index": "index",
            "_type": "type",
            "_id": "id",
            "_source": {
                "foo" : "bar"
            }
        }
    ]
}'

处理器

Ingest 节点的处理器,相当于 Logstash 的 filter 插件。事实上其主要处理器就是直接移植了 Logstash 的 filter 代码成 Java 版本。目前最重要的几个处理器分别是:

convert

{
    "convert": {
        "field" : "foo",
        "type": "integer"
    }
}

grok

    {
        "grok": {
            "field": "message",
            "patterns": ["my %{FAVORITE_DOG:dog} is colored %{RGB:color}"]
            "pattern_definitions" : {
                "FAVORITE_DOG" : "beagle",
                "RGB" : "RED|GREEN|BLUE"
            }
        }
    }

gsub

{
    "gsub": {
        "field": "field1",
        "pattern": "\.",
        "replacement": "-"
    }
}

date

    {
        "date" : {
            "field" : "initial_date",
            "target_field" : "timestamp",
            "formats" : ["dd/MM/yyyy hh:mm:ss"],
            "timezone" : "Europe/Amsterdam"
        }
    }

其他处理器插件

除了内置的处理器之外,还有 3 个处理器,官方选择了以插件性质单独发布,它们是 attachement,geoip 和 user-agent 。原因应该是这 3 个处理器需要额外数据模块,而且处理性能一般,担心拖累 ES 集群。

它们可以和其他普通 ES 插件一样安装:

sudo bin/elasticsearch-plugin install ingest-geoip

使用方式和其他处理器一样:

curl -XPUT http://localhost:9200/_ingest/pipeline/my-pipeline-id-2 -d '
{
    "description" : "Add geoip info",
    "processors" : [
        {
            "geoip" : {
                "field" : "ip",
                "target_field" : "geo",
                "database_file" : "GeoLite2-Country.mmdb.gz"
            }
        }
    ]
}
Previousrollover和shrinkNextHadoop 集成

Last updated 5 years ago

Was this helpful?