spark streaming交互

Apache Spark 是一个高性能集群计算框架,其中 Spark Streaming 作为实时批处理组件,因为其简单易上手的特性深受喜爱。在 es-hadoop 2.1.0 版本之后,也新增了对 Spark 的支持,使得结合 ES 和 Spark 成为可能。

目前最新版本的 es-hadoop 是 2.1.0-Beta4。安装如下:

wget http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-cdh4.tgz
wget http://download.elasticsearch.org/hadoop/elasticsearch-hadoop-2.1.0.Beta4.zip

然后通过 ADD_JARS=../elasticsearch-hadoop-2.1.0.Beta4/dist/elasticsearch-spark_2.10-2.1.0.Beta4.jar 环境变量,把对应的 jar 包加入 Spark 的 jar 环境中。

下面是一段使用 spark streaming 接收 kafka 消息队列,然后写入 ES 的配置:

import org.apache.spark._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.elasticsearch.spark.sql._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.Logging
import org.apache.log4j.{Level, Logger}

object Elastic {
  def main(args: Array[String]) {
    val numThreads = 1
    val zookeeperQuorum = "localhost:2181"
    val groupId = "test"
    val topic = Array("test").map((_, numThreads)).toMap
    val elasticResource = "apps/blog"

    val sc = new SparkConf()
                 .setMaster("local[*]")
                 .setAppName("Elastic Search Indexer App")

    sc.set("es.index.auto.create", "true")
    val ssc = new StreamingContext(sc, Seconds(10))
    ssc.checkpoint("checkpoint")
    val logs = KafkaUtils.createStream(ssc,
                                       zookeeperQuorum,
                                       groupId,
                                       topic,
                                       StorageLevel.MEMORY_AND_DISK_SER)
                         .map(_._2)

    logs.foreachRDD { rdd =>
      val sc = rdd.context
      val sqlContext = new SQLContext(sc)
      val log = sqlContext.jsonRDD(rdd)
      log.saveToEs(elasticResource)
    }

    ssc.start()
    ssc.awaitTermination()

  }
}

注意,代码中使用了 spark SQL 提供的 jsonRDD() 方法,如果在对应的 kafka topic 里的数据,本身并不是已经处理好了的 JSON 数据的话,这里还需要自己写一写额外的处理函数,利用 cast class 来规范数据。

Last updated