本文共 4281 字,大约阅读时间需要 14 分钟。
Kafka与Spark Streaming是大数据领域的黄金搭档,常用于数据实时处理和分析。然而,Kafka的offset管理在流式系统中至关重要。为了确保数据的Exactly Once(EOE)语义,通常需要手动管理offset,而不是依赖自动提交机制。这需要将enable.auto.commit设置为false,并通过适当的方式管理offset。
offset管理的流程一般包括以下几个步骤:
初始化时获取当前offset:在Kafka DirectStream初始化时,获取当前所有partition的存量offset,以便DirectStream能够正确地从读取位置开始处理数据。
读取消息数据:从Kafka主题中读取消息数据,进行处理后存储结果。
提交offset并持久化:在处理完成后,提交offset,并将其持久化到可靠的存储系统中。
图中提到的“process and store results”及“commit offsets”两项操作,可以通过额外措施加以强化。例如,在存储结果时可以保证幂等性,在提交offset时采用原子操作。
图中展示了四种offset存储的选项:HBase、Kafka自身、HDFS和ZooKeeper。综合考虑实现难度和效率,我们目前采用过Kafka自身和ZooKeeper两种方案。
Kafka 0.10+版本中,offset的默认存储由ZooKeeper改为内置主题__consumer_offsets。Spark Streaming提供了commitAsync() API用于offset提交。以下是使用方法:
stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)} 然而,在实际应用中,我们常对DStream进行转换操作。这时可以借助DStream的transform()算子:
var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]stream.transform(rdd => { offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd}).mapPartitions(records => { var result = new ListBuffer[...]() result.toList.iterator}).foreachRDD(rdd => { if (!rdd.isEmpty()) { // 数据存储逻辑 session.createDataFrame(...).write().save() // 提交offset stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }}) 需要注意的是,转换操作应保持RDD与Kafka分区的映射关系。map()和mapPartitions()是安全的操作,而reduceByKey()、join()等操作可能破坏分区关系,不建议使用。
尽管Kafka将offset从ZooKeeper中移出,但ZooKeeper依然可以作为有效的offset存储解决方案。ZooKeeper采用树形结构存储数据,适合管理细粒度的结构化数据。我们封装了ZooKeeper的offset管理逻辑:
class ZkKafkaOffsetManager(zkUrl: String) { private val logger = LoggerFactory.getLogger(classOf[ZkKafkaOffsetManager]) private val zkClientAndConn = ZkUtils.createZkClientAndConnection(zkUrl, 30000, 30000) private val zkUtils = new ZkUtils(zkClientAndConn._1, zkClientAndConn._2, false) def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = { val offsets = mutable.HashMap.empty[TopicPartition, Long] val partitionsForTopics = zkUtils.getPartitionsForTopics(topics) partitionsForTopics.foreach(partitions => { val topic = partitions._1 val groupTopicDirs = new ZKGroupTopicDirs(groupId, topic) partitions._2.foreach(partition => { val path = groupTopicDirs.consumerOffsetDir + "/" + partition try { val data = zkUtils.readData(path) if (data != null) { offsets.put(new TopicPartition(topic, partition), data._1.toLong) logger.info( "Read offset - topic={}, partition={}, offset={}, path={}", Seq[AnyRef](topic, partition.toString, data._1, path) ) } } catch { case ex: Exception => offsets.put(new TopicPartition(topic, partition), 0L) logger.info( "Read offset - not exist: {}, topic={}, partition={}, path={}", Seq[AnyRef](ex.getMessage, topic, partition.toString, path) ) } }) }) offsets.toMap } def saveOffsets(offsetRanges: Seq[OffsetRange], groupId: String): Unit = { offsetRanges.foreach(range => { val groupTopicDirs = new ZKGroupTopicDirs(groupId, range.topic) val path = groupTopicDirs.consumerOffsetDir + "/" + range.partition zkUtils.updatePersistentPath(path, range.untilOffset.toString) logger.info( "Save offset - topic={}, partition={}, offset={}, path={}", Seq[AnyRef](range.topic, range.partition.toString, range.untilOffset.toString, path) ) }) }} offset会被存储在ZooKeeper的/consumers/[groupId]/offsets/[topic]/[partition]路径下。这种方式在分区数不是很大且批次间隔较长(如20秒)的情况下表现良好。
Spark Streaming的Checkpoint机制无疑是最简单的恢复方式,数据存储在HDFS中。然而,当Streaming程序代码发生变更时,重新打包运行会导致反序列化异常。由于Checkpoint首次持久化存储了整个jar包,这在新旧代码不一致时会导致问题。
为了解决这个问题,只能删除HDFS上的Checkpoint文件,但这会同时删除Kafka的offset信息,导致数据丢失。这使得Checkpoint机制在代码频繁变更的场景下不适用。
转载地址:http://fwsg.baihongyu.com/