前言
在进行Spark任务开发时,我们有时想自己定制一个Spark UI界面,比如,笔者在进行Spark Streaming处理Kafka数据时,想要直观的了解Kafka消费状况以确定是否有积压。通常情况下,需要借助其它工具(如Kafka Manager等)进行观察。
为了更方便的观察Kafka的积压情况,笔者查阅资料,发现Spark UI是可以定制的,于是仿Kafka Manager定制了一个页面。
获取kafka
使用Kafka提供的API获取topic的信息和消费组的信息,代码如下
package org.apache.spark.ui.kafka
import java.lang.management.ManagementFactory
import java.util.Properties
import java.{lang, util}
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import scala.collection.JavaConversions._
object OffsetGetter {
def getConsumerOffset(bootstrapServer: String, topic: String, group: String): Array[GroupOffsets] = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
val pid = ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
props.put(ConsumerConfig.GROUP_ID_CONFIG, "streaming_offset_getter_" + pid)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val admin = AdminClient.create(props)
val listConsumerOffsets = admin.listConsumerGroupOffsets(group)
val offsets: util.Map[TopicPartition, OffsetAndMetadata] = listConsumerOffsets.partitionsToOffsetAndMetadata().get()
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
val partitionInfos: util.List[PartitionInfo] = consumer.partitionsFor(topic)
val topicPartitions = partitionInfos.map(partitionInfo => new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
val endOffsets: util.Map[TopicPartition, lang.Long] = consumer.endOffsets(topicPartitions)
val groupOffsets: Array[GroupOffsets] = endOffsets.map(endOffset => {
val topicPartition = endOffset._1
val partition = topicPartition.partition()
val logSize = endOffset._2
val offset = if (offsets.contains(topicPartition)) {
offsets.get(topicPartition).offset()
} else {
0
}
val lag = logSize - offset
GroupOffsets(partition, offset, logSize, lag)
}).toArray
.sortBy(_.partition)
groupOffsets
}
case class GroupOffsets(partition: Int, offset: Long, logSize: Long, lag: Long)
}
添加一个TAB标签
生成一个Tab标签类,继承SparkUITab类,并调用attachPage()方法,将Tab标签页添加到SparkUI页面上
package org.apache.spark.ui.kafka
import org.apache.spark.SparkException
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.ui.kafka.KafkaOffsetTab._
import org.apache.spark.ui.{SparkUI, SparkUITab}
class KafkaOffsetTab(val ssc: StreamingContext) extends SparkUITab(getSparkUI(ssc), "offsets") {
private val STATIC_RESOURCE_DIR = "org/apache/spark/streaming/ui/static"
attachPage(new KafkaOffsetPage(this))
def attach() {
getSparkUI(ssc).attachTab(this)
getSparkUI(ssc).addStaticHandler(STATIC_RESOURCE_DIR, "/static/offsets")
}
}
private[spark] object KafkaOffsetTab {
def getSparkUI(ssc: StreamingContext): SparkUI = {
ssc.sparkContext.ui.getOrElse {
throw new SparkException("Parent SparkUI to attach this tab to not found!")
}
}
}
创建Offset页面
生成一个Page页面,继承WebUIPage,即生成页面要展示的内容。如下,生成了一个简单的页面,可以根据需要自行增减内容。
package org.apache.spark.ui.kafka
import javax.servlet.http.HttpServletRequest
import org.apache.spark.ui.{UIUtils, WebUIPage}
import scala.xml.Node
class KafkaOffsetPage(parent: KafkaOffsetTab) extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
val offsets=OffsetGetter.getConsumerOffset("centos01:6667","topic","streaming")
val totalLogSize: Long =offsets.map(_.logSize).sum
val totalOffset=offsets.map(_.offset).sum
val totalLag=offsets.map(_.lag).sum
val tableHeader=
<thead>
<tr><th>Topic</th><th>Partition</th><th>LogSize</th><th>Consumer Offset</th><th>Lag</th></tr>
</thead>
val summary=
<tr>
<td>topic</td>
<td></td>
<td>{totalLogSize}</td>
<td>{totalOffset}</td>
<td>{totalLag}</td>
</tr>
val partitions=offsets.map(offset=>{
<tr>
<td></td>
<td>{offset.partition}</td>
<td>{offset.logSize}</td>
<td>{offset.offset}</td>
<td>{offset.lag}</td>
</tr>
})
val content =
<div class="col-md-12">
<div class="card">
<div class="card-body">
<table class="table">
{tableHeader}
<tbody>
{summary}
{partitions}
</tbody>
</table>
</div>
</div>
</div>
UIUtils.headerSparkPage(request, "Offsets", content, parent, useDataTables = true)
}
}