# 通过kafka传输

*本节作者：jingbli*

Kafka 是一个高吞吐量的分布式发布订阅日志服务，具有高可用、高性能、分布式、高扩展、持久性等特性。目前已经在各大公司中广泛使用。和之前采用 Redis 做轻量级消息队列不同，Kafka 利用磁盘作队列，所以也就无所谓消息缓冲时的磁盘问题。此外，如果公司内部已有 Kafka 服务在运行，logstash 也可以快速接入，免去重复建设的麻烦。

如果打算新建 Kafka 系统的，请参考 Kafka 官方入门文档：<http://kafka.apache.org/documentation.html#quickstart>

## kafka 基本概念

以下仅对相关基本概念说明，更多概念见官方文档：

* Topic

  主题，声明一个主题，producer指定该主题发布消息，订阅该主题的consumer对该主题进行消费
* Partition

  每个主题可以分为多个分区，每个分区对应磁盘上一个目录，分区可以分布在不同broker上，producer在发布消息时，可以通过指定partition key映射到对应分区，然后向该分区发布消息，在无partition key情况下，随机选取分区，一段时间内触发一次(比如10分钟)，这样就保证了同一个producer向同一partition发布的消息是顺序的。

  消费者消费时，可以指定partition进行消费，也可以使用high-level-consumer api,自动进行负载均衡，并将partition分给consumer，一个partition只能被一个consumer进行消费。
* Consumer 消费者，可以多实例部署，可以批量拉取，有两类API可供选择：一个simpleConsumer，暴露所有的操作给用户，可以提交offset、fetch offset、指定partition fetch message；另外一个high-level-consumer(ZookeeperConsumerConnector)，帮助用户做基于partition自动分配的负载均衡，定期提交offset，建立消费队列等。simpleConsumer相当于手动挡，high-level-consumer相当于自动挡。

  simpleConsumer：无需像high-level-consumer那样向zk注册brokerid、owner，甚至不需要提交offset到zk，可以将offset提交到任意地方比如(mysql,本地文件等)。

  high-level-consumer：一个进程中可以启多个消费线程，一个消费线程即是一个consumer，假设A进程里有2个线程(consumerid分别为1，2)，B进程有2个线程(consumerid分别为1，2)，topic1的partition有5个，那么partition分配是这样的：

```
    partition1 ---> A进程consumerid1
    partition2 ---> A进程consumerid1
    partition3 ---> A进程consumerid2
    partition4 ---> B进程consumer1
    partition5 ---> B进程consumer2
```

* Group High-level-consumer可以声明group，每个group可以有多个consumer，每group各自管理各自的消费offset，各个不同group之间互不关联影响。

  由于目前版本消费的offset、owner、group都是consumer自己通过zk管理，所以group对于broker和producer并不关心，一些监控工具需要通过group来监控，simpleComsumer无需声明group。

### 小提示

以上概念是 logstash 的 kafka 插件的必要参数，请理解阅读，对后续使用 kafka 插件有重要作用。logstash-kafka-input 插件使用的是 `High-level-consumer API`。

## 插件安装

### logstash-1.4 安装

如果你使用的还是 1.4 版本，需要自己单独安装 logstash-kafka 插件。插件地址见：<https://github.com/joekiller/logstash-kafka>。

插件本身内容非常简单，其主要依赖同一作者写的 [jruby-kafka](https://github.com/joekiller/jruby-kafka) 模块。需要注意的是：**该模块仅支持 Kafka-0.8 版本。如果是使用 0.7 版本 kafka 的，将无法直接使 jruby-kafka 该模块和 logstash-kafka 插件。**

安装按照官方文档完全自动化的安装。或是可以通过以下方式手动自己安装插件，不过重点注意的是 **kafka 的版本**，上面已经指出了。

1. 下载 logstash 并解压重命名为 `./logstash-1.4.0` 文件目录。
2. 下载 kafka 相关组件，以下示例选的为 [kafka\_2.8.0-0.8.1.1-src](https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka-0.8.1.1-src.tgz)，并解压重命名为 `./kafka_2.8.0-0.8.1.1`。
3. 从 [releases](https://github.com/joekiller/logstash-kafka/releases) 页下载 logstash-kafka v0.4.2 版，并解压重命名为 `./logstash-kafka-0.4.2`。
4. 从 `./kafka_2.8.0-0.8.1.1/libs` 目录下复制所有的 jar 文件拷贝到 `./logstash-1.4.0/vendor/jar/kafka_2.8.0-0.8.1.1/libs` 下，其中你需要创建 `kafka_2.8.0-0.8.1.1/libs` 相关文件夹及目录。
5. 分别复制 `./logstash-kafka-0.4.2/logstash` 里的 `inputs` 和 `outputs` 下的 `kafka.rb`，拷贝到对应的 `./logstash-1.4.0/lib/logstash` 里的 `inputs` 和 `outputs` 对应目录下。
6. 切换到 `./logstash-1.4.0` 目录下，现在需要运行 logstash-kafka 的 gembag.rb 脚本去安装 jruby-kafka 库，执行以下命令： `GEM_HOME=vendor/bundle/jruby/1.9 GEM_PATH= java -jar vendor/jar/jruby-complete-1.7.11.jar --1.9 ../logstash-kafka-0.4.2/gembag.rb ../logstash-kafka-0.4.2/logstash-kafka.gemspec`。
7. 现在可以使用 logstash-kafka 插件运行 logstash 了。

### logstash-1.5 安装

logstash 从 1.5 版本开始才集成了 Kafka 支持。1.5 版本开始所有插件的目录和命名都发生了改变，插件发布地址见：<https://github.com/logstash-plugins>。 安装和更新插件都可以使用官方提供的方式：

```
$bin/plugin install OR $bin/plugin update
```

### 小贴士

对于插件的安装和更新，默认走的Gem源为 <https://rubygems.org>,对于咱们国内网络来说是出奇的慢或是根本无法访问（爬梯子除外），在安装或是更新插件是，可以尝试修改目录下 `Gemfile` 文件中的 `source` 为淘宝源<https://ruby.taobao.org>，这样会使你的安装或是更新顺畅很多。

## 插件配置

### Input 配置示例

以下配置可以实现对 kafka 读取端(consumer)的基本使用。

消费端更多详细的配置请查看 <http://kafka.apache.org/documentation.html#consumerconfigs> kafka 官方文档的消费者部分配置文档。

```
input {
    kafka {
        zk_connect => "localhost:2181"
        group_id => "logstash"
        topic_id => "test"
        codec => plain
        reset_beginning => false # boolean (optional)， default: false
        consumer_threads => 5  # number (optional)， default: 1
        decorate_events => true # boolean (optional)， default: false
    }
}
```

### Input 解释

作为 Consumer 端,插件使用的是 `High-level-consumer API`，**请结合上述 kafka 基本概念进行设置**：

* group\_id

消费者分组，可以通过组 ID 去指定，不同的组之间消费是相互不受影响的，相互隔离。

* topic\_id

指定消费话题，也是必填项目，指定消费某个 `topic` ，这个其实就是订阅某个主题，然后去消费。

* reset\_beginning

logstash 启动后从什么位置开始读取数据，默认是结束位置，也就是说 logstash 进程会以从上次读取结束时的偏移量开始继续读取，如果之前没有消费过，那么就开始从头读取.如果你是要导入原有数据，把这个设定改成 "true"， logstash 进程就从头开始读取.有点类似 `cat` ，但是读到最后一行不会终止，而是变成 `tail -F` ，继续监听相应数据。

* decorate\_events

在输出消息的时候会输出自身的信息包括:消费消息的大小， topic 来源以及 consumer 的 group 信息。

* rebalance\_max\_retries

当有新的 consumer(logstash) 加入到同一 group 时，将会 `reblance` ，此后将会有 `partitions` 的消费端迁移到新的 `consumer` 上，如果一个 `consumer` 获得了某个 `partition` 的消费权限，那么它将会向 `zookeeper` 注册， `Partition Owner registry` 节点信息，但是有可能此时旧的 `consumer` 尚没有释放此节点，此值用于控制，注册节点的重试次数。

* consumer\_timeout\_ms

指定时间内没有消息到达就抛出异常，一般不需要改。

以上是相对重要参数的使用示例，更多参数可以选项可以跟据 <https://github.com/joekiller/logstash-kafka/blob/master/README.md> 查看 input 默认参数。

#### 注意

1.想要使用多个 logstash 端协同消费同一个 `topic` 的话，那么需要把两个或是多个 logstash 消费端配置成相同的 `group_id` 和 `topic_id`， 但是前提是要把**相应的 topic 分多个 partitions (区)**，多个消费者消费是无法保证消息的消费顺序性的。

> 这里解释下，为什么要分多个 **partitions(区)**， kafka 的消息模型是对 topic 分区以达到分布式效果。每个 `topic` 下的不同的 **partitions (区)**&#x53EA;能有一个 **Owner** 去消费。所以只有多个分区后才能启动多个消费者，对应不同的区去消费。其中协调消费部分是由 server 端协调而成。不必使用者考虑太多。只是**消息的消费则是无序的**。

总结:保证消息的顺序，那就用一个 **partition**。 **kafka 的每个 partition 只能同时被同一个 group 中的一个 consumer 消费**。

### Output 配置

以下配置可以实现对 kafka 写入端 (producer) 的基本使用。

生产端更多详细的配置请查看 <http://kafka.apache.org/documentation.html#producerconfigs> kafka 官方文档的生产者部分配置文档。

```
 output {
    kafka {
        bootstrap_servers => "localhost:9092"
        topic_id => "test"
        compression_codec => "snappy" # string (optional)， one of ["none"， "gzip"， "snappy"]， default: "none"
    }
}
```

### Output 解释

作为 Producer 端使用，以下仅为重要概念解释，**请结合上述 kafka 基本概念进行设置**：

* compression\_codec

消息的压缩模式，默认是 none，可以有 gzip 和 snappy (暂时还未测试开启压缩与不开启的性能，数据传输大小等对比)。

* compressed\_topics

可以针对特定的 topic 进行压缩，设置这个参数为 `topic` ，表示此 `topic` 进行压缩。

* request\_required\_acks

消息的确认模式:

> 可以设置为 0: 生产者不等待 broker 的回应，只管发送.会有最低能的延迟和最差的保证性(在服务器失败后会导致信息丢失) 可以设置为 1: 生产者会收到 leader 的回应在 leader 写入之后.(在当前 leader 服务器为复制前失败可能会导致信息丢失) 可以设置为 -1: 生产者会收到 leader 的回应在全部拷贝完成之后。

* partitioner\_class

分区的策略，默认是 hash 取模

* send\_buffer\_bytes

socket 的缓存大小设置，其实就是缓冲区的大小

#### 消息模式相关

* serializer\_class

消息体的系列化处理类，转化为字节流进行传输，**请注意 encoder 必须和下面的 `key_serializer_class` 使用相同的类型**。

* key\_serializer\_class

默认的是与 `serializer_class` 相同

* producer\_type

生产者的类型 `async` 异步执行消息的发送 `sync` 同步执行消息的发送

* queue\_buffering\_max\_ms

**异步模式下**，那么就会在设置的时间缓存消息，并一次性发送

* queue\_buffering\_max\_messages

**异步的模式下**，最长等待的消息数

* queue\_enqueue\_timeout\_ms

**异步模式下**，进入队列的等待时间，若是设置为0，那么要么进入队列，要么直接抛弃

* batch\_num\_messages

**异步模式下**，每次发送的最大消息数，前提是触发了 `queue_buffering_max_messages` 或是 `queue_enqueue_timeout_ms` 的限制

以上是相对重要参数的使用示例，更多参数可以选项可以跟据 <https://github.com/joekiller/logstash-kafka/blob/master/README.md> 查看 output 默认参数。

### 小贴士

logstash-kafka 插件输入和输出默认 `codec` 为 json 格式。在输入和输出的时候注意下编码格式。消息传递过程中 logstash 默认会为消息编码内加入相应的时间戳和 hostname 等信息。如果不想要以上信息(一般做消息转发的情况下)，可以使用以下配置，例如:

```
 output {
    kafka {
        codec => plain {
            format => "%{message}"
        }
    }
}
```

作为 Consumer 从kafka中读数据，如果为非 json 格式的话需要进行相关解码，例如：

```
input {
    kafka {
        zk_connect => "xxx：xxx"
        group_id => "test"
        topic_id => "test-topic"
        codec => "line"
        ...........
    }
}
```

### 性能

#### 队列监控

其实 logstash 的 kafka 插件性能并不是很突出，可以通过使用以下命令查看队列积压消费情况：

```
$/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group test
```

队列积压严重，性能跟不上的情况下，结合实际服务器资源，可以适当增加 topic 的 partition 多实例化 Consumer 进行消费处理消息。

#### input-kafka 的 JSON 序列化性能

此外，跟 logstash-input-syslog 改在 filter 阶段 grok 的优化手段类似，也可以将 logstash-input-kafka 的默认 JSON 序列化操作从 codec 阶段后移到 filter 阶段。如下：

```
input {
    kafka {
        codec => plain
    }
}
filter {
    json {
        source => "message"
    }
}
```

然后通过 `bin/logstash -w $num_cpus` 运行，利用多核计算优势，可以获得大约一倍左右的性能提升。

#### 其他方案

* <https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer>
* <https://github.com/childe/hangout>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://hezhiqiang.gitbook.io/elkstack/logstash/scale/kafka.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
