Spark Streaming添加自定义SparkUI页面,展示Kafka消费进度

前言

在进行Spark任务开发时,我们有时想自己定制一个Spark UI界面,比如,笔者在进行Spark Streaming处理Kafka数据时,想要直观的了解Kafka消费状况以确定是否有积压。通常情况下,需要借助其它工具(如Kafka Manager等)进行观察。
为了更方便的观察Kafka的积压情况,笔者查阅资料,发现Spark UI是可以定制的,于是仿Kafka Manager定制了一个页面。

获取kafka

使用Kafka提供的API获取topic的信息和消费组的信息,代码如下

package org.apache.spark.ui.kafka
import java.lang.management.ManagementFactory
import java.util.Properties
import java.{lang, util}
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import scala.collection.JavaConversions._
object OffsetGetter {
    def getConsumerOffset(bootstrapServer: String, topic: String, group: String): Array[GroupOffsets] = {
        val props = new Properties()
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
        val pid = ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "streaming_offset_getter_" + pid)
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
        val admin = AdminClient.create(props)
        val listConsumerOffsets = admin.listConsumerGroupOffsets(group)
        val offsets: util.Map[TopicPartition, OffsetAndMetadata] = listConsumerOffsets.partitionsToOffsetAndMetadata().get()
        val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
        val partitionInfos: util.List[PartitionInfo] = consumer.partitionsFor(topic)
        val topicPartitions = partitionInfos.map(partitionInfo => new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
        val endOffsets: util.Map[TopicPartition, lang.Long] = consumer.endOffsets(topicPartitions)
        val groupOffsets: Array[GroupOffsets] = endOffsets.map(endOffset => {
            val topicPartition = endOffset._1
            val partition = topicPartition.partition()
            val logSize = endOffset._2
            val offset = if (offsets.contains(topicPartition)) {
                offsets.get(topicPartition).offset()
            } else {
                0
            }
            val lag = logSize - offset
            GroupOffsets(partition, offset, logSize, lag)
        }).toArray
            .sortBy(_.partition)
        groupOffsets
    }
    case class GroupOffsets(partition: Int, offset: Long, logSize: Long, lag: Long)
}

添加一个TAB标签

生成一个Tab标签类,继承SparkUITab类,并调用attachPage()方法,将Tab标签页添加到SparkUI页面上

package org.apache.spark.ui.kafka
import org.apache.spark.SparkException
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.ui.kafka.KafkaOffsetTab._
import org.apache.spark.ui.{SparkUI, SparkUITab}
class KafkaOffsetTab(val ssc: StreamingContext) extends SparkUITab(getSparkUI(ssc), "offsets") {
  private val STATIC_RESOURCE_DIR = "org/apache/spark/streaming/ui/static"
  attachPage(new KafkaOffsetPage(this))
  def attach() {
    getSparkUI(ssc).attachTab(this)
    getSparkUI(ssc).addStaticHandler(STATIC_RESOURCE_DIR, "/static/offsets")
  }
}
private[spark] object KafkaOffsetTab {
  def getSparkUI(ssc: StreamingContext): SparkUI = {
    ssc.sparkContext.ui.getOrElse {
      throw new SparkException("Parent SparkUI to attach this tab to not found!")
    }
  }
}

创建Offset页面

生成一个Page页面,继承WebUIPage,即生成页面要展示的内容。如下,生成了一个简单的页面,可以根据需要自行增减内容。

package org.apache.spark.ui.kafka
import javax.servlet.http.HttpServletRequest
import org.apache.spark.ui.{UIUtils, WebUIPage}
import scala.xml.Node
class KafkaOffsetPage(parent: KafkaOffsetTab) extends WebUIPage("") {
  def render(request: HttpServletRequest): Seq[Node] = {
    val offsets=OffsetGetter.getConsumerOffset("centos01:6667","topic","streaming")
    val totalLogSize: Long =offsets.map(_.logSize).sum
    val totalOffset=offsets.map(_.offset).sum
    val totalLag=offsets.map(_.lag).sum
    val tableHeader=
      <thead>
        <tr><th>Topic</th><th>Partition</th><th>LogSize</th><th>Consumer Offset</th><th>Lag</th></tr>
      </thead>
    val summary=
      <tr>
        <td>topic</td>
        <td></td>
        <td>{totalLogSize}</td>
        <td>{totalOffset}</td>
        <td>{totalLag}</td>
      </tr>
    val partitions=offsets.map(offset=>{
      <tr>
        <td></td>
        <td>{offset.partition}</td>
        <td>{offset.logSize}</td>
        <td>{offset.offset}</td>
        <td>{offset.lag}</td>
      </tr>
    })
    val content =
      <div class="col-md-12">
        <div class="card">
          <div class="card-body">
            <table class="table">
              {tableHeader}
              <tbody>
                {summary}
                {partitions}
              </tbody>
            </table>
          </div>
        </div>
      </div>
    UIUtils.headerSparkPage(request, "Offsets", content, parent, useDataTables = true)
  }
}
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇