博客
关于我
聊聊 Kafka+Spark Streaming 管理offset的两种方法
阅读量:393 次
发布时间:2019-03-05

本文共 4281 字,大约阅读时间需要 14 分钟。

Kafka与Spark Streaming是大数据领域的黄金搭档,常用于数据实时处理和分析。然而,Kafka的offset管理在流式系统中至关重要。为了确保数据的Exactly Once(EOE)语义,通常需要手动管理offset,而不是依赖自动提交机制。这需要将enable.auto.commit设置为false,并通过适当的方式管理offset。

offset管理流程

offset管理的流程一般包括以下几个步骤:

  • 初始化时获取当前offset:在Kafka DirectStream初始化时,获取当前所有partition的存量offset,以便DirectStream能够正确地从读取位置开始处理数据。

  • 读取消息数据:从Kafka主题中读取消息数据,进行处理后存储结果。

  • 提交offset并持久化:在处理完成后,提交offset,并将其持久化到可靠的存储系统中。

  • 图中提到的“process and store results”及“commit offsets”两项操作,可以通过额外措施加以强化。例如,在存储结果时可以保证幂等性,在提交offset时采用原子操作。

    offset存储选项

    图中展示了四种offset存储的选项:HBase、Kafka自身、HDFS和ZooKeeper。综合考虑实现难度和效率,我们目前采用过Kafka自身和ZooKeeper两种方案。

    Kafka自身存储

    Kafka 0.10+版本中,offset的默认存储由ZooKeeper改为内置主题__consumer_offsets。Spark Streaming提供了commitAsync() API用于offset提交。以下是使用方法:

    stream.foreachRDD {     rdd =>         val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges         stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}

    然而,在实际应用中,我们常对DStream进行转换操作。这时可以借助DStream的transform()算子:

    var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]stream.transform(rdd => {    offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges    rdd}).mapPartitions(records => {    var result = new ListBuffer[...]()    result.toList.iterator}).foreachRDD(rdd => {    if (!rdd.isEmpty()) {        // 数据存储逻辑        session.createDataFrame(...).write().save()        // 提交offset        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)    }})

    需要注意的是,转换操作应保持RDD与Kafka分区的映射关系。map()mapPartitions()是安全的操作,而reduceByKey()join()等操作可能破坏分区关系,不建议使用。

    ZooKeeper存储

    尽管Kafka将offset从ZooKeeper中移出,但ZooKeeper依然可以作为有效的offset存储解决方案。ZooKeeper采用树形结构存储数据,适合管理细粒度的结构化数据。我们封装了ZooKeeper的offset管理逻辑:

    class ZkKafkaOffsetManager(zkUrl: String) {    private val logger = LoggerFactory.getLogger(classOf[ZkKafkaOffsetManager])    private val zkClientAndConn = ZkUtils.createZkClientAndConnection(zkUrl, 30000, 30000)    private val zkUtils = new ZkUtils(zkClientAndConn._1, zkClientAndConn._2, false)        def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = {        val offsets = mutable.HashMap.empty[TopicPartition, Long]        val partitionsForTopics = zkUtils.getPartitionsForTopics(topics)        partitionsForTopics.foreach(partitions => {            val topic = partitions._1            val groupTopicDirs = new ZKGroupTopicDirs(groupId, topic)            partitions._2.foreach(partition => {                val path = groupTopicDirs.consumerOffsetDir + "/" + partition                try {                    val data = zkUtils.readData(path)                    if (data != null) {                        offsets.put(new TopicPartition(topic, partition), data._1.toLong)                        logger.info(                            "Read offset - topic={}, partition={}, offset={}, path={}",                            Seq[AnyRef](topic, partition.toString, data._1, path)                        )                    }                } catch {                    case ex: Exception =>                         offsets.put(new TopicPartition(topic, partition), 0L)                        logger.info(                            "Read offset - not exist: {}, topic={}, partition={}, path={}",                            Seq[AnyRef](ex.getMessage, topic, partition.toString, path)                        )                }            })        })        offsets.toMap    }    def saveOffsets(offsetRanges: Seq[OffsetRange], groupId: String): Unit = {        offsetRanges.foreach(range => {            val groupTopicDirs = new ZKGroupTopicDirs(groupId, range.topic)            val path = groupTopicDirs.consumerOffsetDir + "/" + range.partition            zkUtils.updatePersistentPath(path, range.untilOffset.toString)            logger.info(                "Save offset - topic={}, partition={}, offset={}, path={}",                Seq[AnyRef](range.topic, range.partition.toString, range.untilOffset.toString, path)            )        })    }}

    offset会被存储在ZooKeeper的/consumers/[groupId]/offsets/[topic]/[partition]路径下。这种方式在分区数不是很大且批次间隔较长(如20秒)的情况下表现良好。

    为什么不用Checkpoint

    Spark Streaming的Checkpoint机制无疑是最简单的恢复方式,数据存储在HDFS中。然而,当Streaming程序代码发生变更时,重新打包运行会导致反序列化异常。由于Checkpoint首次持久化存储了整个jar包,这在新旧代码不一致时会导致问题。

    为了解决这个问题,只能删除HDFS上的Checkpoint文件,但这会同时删除Kafka的offset信息,导致数据丢失。这使得Checkpoint机制在代码频繁变更的场景下不适用。

    转载地址:http://fwsg.baihongyu.com/

    你可能感兴趣的文章
    Objective-C实现inversions倒置算法(附完整源码)
    查看>>
    Objective-C实现isalpha函数功能(附完整源码)
    查看>>
    Objective-C实现islower函数功能(附完整源码)
    查看>>
    Objective-C实现isPowerOfTwo算法(附完整源码)
    查看>>
    Objective-C实现isupper函数功能(附完整源码)
    查看>>
    Objective-C实现ItemCF算法(附完整源码)
    查看>>
    Objective-C实现ItemCF算法(附完整源码)
    查看>>
    Objective-C实现iterating through submasks遍历子掩码算法(附完整源码)
    查看>>
    Objective-C实现jaccard similarity相似度无平方因子数算法(附完整源码)
    查看>>
    Objective-C实现Julia集算法(附完整源码)
    查看>>
    Objective-C实现k nearest neighbours k最近邻分类算法(附完整源码)
    查看>>
    Objective-C实现k-Means算法(附完整源码)
    查看>>
    Objective-C实现k-nearest算法(附完整源码)
    查看>>
    Objective-C实现Knapsack problem背包问题算法(附完整源码)
    查看>>
    Objective-C实现knapsack背包问题算法(附完整源码)
    查看>>
    Objective-C实现knapsack背包问题算法(附完整源码)
    查看>>
    Objective-C实现knight tour骑士之旅算法(附完整源码)
    查看>>
    Objective-C实现KNN算法(附完整源码)
    查看>>
    Objective-C实现koch snowflake科赫雪花算法(附完整源码)
    查看>>
    Objective-C实现KPCA(附完整源码)
    查看>>