与kafka的初次相识

李国旺
     由于一些需要,需要接收大数据发出来的kafka消息,所以网上找了一些资料来学习,记录一下学习过程。
根据自己使用的场景:不关心从Kafka消费消息的顺序,不关心offset,仅仅关心数据能被消费就行,所以采用HighLevelConsumer的API  
首先引入maven配置
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.8.2.1</version>
</dependency>

抽象kafka consumer
public abstract class AbstractKafkaConsumer {
   protected Logger logger = LoggerFactory.getLogger(getClass());
   private String kafkaTopic;
   private String groupId;
   private String zookeeperConnect;
   private Integer partitionNumber;
   private ConsumerConnector consumer;
   private ExecutorService fixedThreadPool;

   public void start() {
      logger.info("Kafka Consumer Start...");
      createConsumer();
      fixedThreadPool = Executors.newFixedThreadPool(partitionNumber);
      addShutdownHook();

      Map<String, Integer> topicCountMap = new HashMap<>();
      topicCountMap.put(kafkaTopic, partitionNumber); // 数字是返回几个流,topic几个分区就配几个线程比较合理
      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
      List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(kafkaTopic);
      for (KafkaStream<byte[], byte[]> stream : streams) {
         fixedThreadPool.execute(() -> {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
               String message = new String(it.next().message());
               logger.info("Consumer端消费消息, messageContent=[{}]", message);
               try {
                  onBusiness(message);
               } catch (Exception e) {
                  logger.error("consumer kafka error.", e);
               }
            }
         });
      }

      logger.info("Consumer Start Compelted...");
   }

   private void createConsumer () {
      consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeperConnect, groupId));
   }
   private ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
      Properties props = new Properties();
      // zk集群地址
      props.put("zookeeper.connect", a_zookeeper); 
      // 消费组,唯一的标识
      props.put("group.id", a_groupId); 
      // zk的会话超时时间
      props.put("zookeeper.session.timeout.ms", "1000"); 
      // zk连接超时时间
      props.put("zookeeper.connection.timeout.ms", "60000"); 
      // 指定多久消费者更新offset到zookeeper中,注意offset更新时基于time而不是每次获得的消息
      props.put("zookeeper.sync.time.ms", "200");
      // 当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信
      props.put("auto.commit.interval.ms", "6000");
      // 如果zookeeper没有offset值或offset值超出范围,那么就给个初始的offset
      props.put("auto.offset.reset", "largest");
      return new ConsumerConfig(props);
   }

   private void shutdown() {
      if (consumer != null) {
         try {
            consumer.shutdown();
         } catch (Exception e) {
            logger.warn("Kafka Consumer shutdown.", e);
         }
      }
      if (fixedThreadPool != null) {
         fixedThreadPool.shutdown();
         try {
            if (!fixedThreadPool.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
               logger.warn("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
         } catch (InterruptedException e) {
            logger.warn("Interrupted during shutdown, exiting uncleanly");
         }
      }
   }
   private void addShutdownHook() {
      Runtime.getRuntime().addShutdownHook(new Thread(() -> {
         shutdown();
         logger.warn("Kafka Consumer group[{}] shutdown complete.", consumer);
      }));
   }
   protected abstract void onBusiness(String paramString);
   // 省略其他
}

自己业务代码继承该抽象类,实现onBusiness方法,根据拿到的message做一些逻辑。
public class RiderKafkaConsumer extends AbstractKafkaConsumer {
   public void onBusiness(String message) {
      // 这里面做自己的逻辑
   }
}

接着xml配置一下,注入bean,
<bean id="riderKafkaConsumer" class="com.dianwoba.rider.probe.consumer.RiderKafkaConsumer" init-method="start" destroy-method="shutdown">
    <property name="kafkaTopic" value="${kafka.xxx.topic}" />
    <property name="groupId" value="${kafka.xxx.group}"/>
    <property name="zookeeperConnect" value="${kafka.zk}"/>
    <property name="partitionNumber" value="${kafka.partitionNumber}"/>
</bean>

这里有几个对象需要说明一下:
  • ConsumerConnector: Consumer的连接器,这里基于ZK实现,是ZookeeperConsumerConnector
  • KafkaStream: 消息流,每个消费者线程都对应了一个消息流,消息会放入消息流的阻塞队列中
  • ConsumerIterator: 消费者迭代器,只有迭代器开始迭代获取数据时,才会返回给消费者

好了,到此为止消费kafka消费的代码就没有了,接下来我们看一下kafka的是如何拿到消息的。
注:由于kafka是有scala写的,在IDEA下查看源码的话,需要安装一个scala的插件。

首先看一下ConsumerConnector接口,这里还是java代码,
public interface ConsumerConnector {
     public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
     // 省略其他
}
该API的中心是一个由KafkaStream这个类实现的迭代器(iterator)。每个KafkaStream都代表着一个从一个或多个分区到一个或多个服务器的消息流。每个流都是使用单个线程进行处理的,所以,该API的使用者在该API的创建调用中可以提供所需的任意个数的流。这样,一个流可能会代表多个服务器分区的合并(同处理线程的数目相同),但每个分区只会把数据发送给一个流中。

好,接下来都是scala代码了,下面是createMessageStreams方法的实现类:
def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] =
  createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())

这里采用默认的解码器,接下来再看
def createMessageStreams[K,V](
      topicCountMap: java.util.Map[String,java.lang.Integer],
      keyDecoder: Decoder[K],
      valueDecoder: Decoder[V])
    : java.util.Map[String,java.util.List[KafkaStream[K,V]]] = {

  if (messageStreamCreated.getAndSet(true))
    throw new MessageStreamsExistException(this.getClass.getSimpleName +
                                 " can create message streams at most once",null)
  val scalaTopicCountMap: Map[String, Int] = {
    import JavaConversions._
    Map.empty[String, Int] ++ (topicCountMap.asInstanceOf[java.util.Map[String, Int]]: mutable.Map[String, Int])
  }
  val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder)
  val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]]
  for ((topic, streams) <- scalaReturn) {
    var javaStreamList = new java.util.ArrayList[KafkaStream[K,V]]
    for (stream <- streams)
      javaStreamList.add(stream)
    ret.put(topic, javaStreamList)
  }
  ret
}

messageStreamCreated是有个AtomicBoolean类型,这里可以理解为是个锁,
scalaTopicCountMap是把java版的topicCountMap转化为scala中的对象,
好,接着就是获取Map[String,List[KafkaStream[K,V]]了(这里是scala的map),
接着继续追踪代码,
def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
    : Map[String,List[KafkaStream[K,V]]] = {
  debug("entering consume ")
  if (topicCountMap  null)
    throw new RuntimeException("topicCountMap is null")

  val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)

  val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic

  // make a list of (queue,stream) pairs, one pair for each threadId
  val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
    threadIdSet.map(_ => {
      val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
      val stream = new KafkaStream[K,V](
        queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
      (queue, stream)
    })
  ).flatten.toList

  val dirs = new ZKGroupDirs(config.groupId)
  registerConsumerInZK(dirs, consumerIdString, topicCount)
  reinitializeConsumer(topicCount, queuesAndStreams)

  loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]
}

这段代码片段在ZookeeperConsumerConnector 中,好了,重头戏来了...我们先来看一下consumer所在的类ZookeeperConsumerConnector。
首先看一下ZookeeperConsumerConnector里面的属性和方法:
private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                val enableFetcher: Boolean) // for testing only
        extends ConsumerConnector with Logging with KafkaMetricsGroup {
  private val isShuttingDown = new AtomicBoolean(false)
  private val rebalanceLock = new Object
  private var fetcher: Option[ConsumerFetcherManager] = None
  private var zkClient: ZkClient = null
  private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
  private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long]
  private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]]
  private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
  private val messageStreamCreated = new AtomicBoolean(false)

  private var sessionExpirationListener: ZKSessionExpireListener = null
  private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null
  private var loadBalancerListener: ZKRebalancerListener = null

  // 省略部分代码
  val consumerIdString = {
    ...
  }
  this.logIdent = "[" + consumerIdString + "], "

  connectZk()  // 第一步:创建ZkUtils,会创建对应的ZkConnection和ZkClient
  createFetcher() // 第二步:创建ConsumerFetcherManager,消费者fetcher线程
  ensureOffsetManagerConnected() // 第三步:确保连接上OffsetManager.

  if (config.autoCommitEnable) { // 第四步:启动定时提交offset线程
    scheduler.startup
    info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
    scheduler.schedule("kafka-consumer-autocommit",
                       autoCommit,
                       delay = config.autoCommitIntervalMs,
                       period = config.autoCommitIntervalMs,
                       unit = TimeUnit.MILLISECONDS)
  }

在创建ZookeeperConsumerConnector时,有几个初始化方法需要事先执行.
     1、因为消费者要和ZK通信,所以connectZk会确保连接上ZooKeeper. 
     2、消费者要消费数据,需要有抓取线程,所有的抓取线程交给ConsumerFetcherManager统一管理.
     3、定时提交线程会使用OffsetManager建立的通道定时提交offset到zk或者kafka

好了,启动过程有了,我们回过头来继续快看一下consume方法,
1、consumerIdString是根据consumer.id来获得的,如果consumer.id=null,则等于主机名+时间戳+uuid(截取八位)组成,确保每个consumer在消费组中的编号都是唯一的。

2、
val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)
val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic

def getConsumerThreadIdsPerTopic = TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap)

def makeConsumerThreadIdsPerTopic(consumerIdString: String,
                                  topicCountMap: Map[String,  Int]) = {
  val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[ConsumerThreadId]]()
  for ((topic, nConsumers) <- topicCountMap) { // 每个topic有几个消费线程
    val consumerSet = new mutable.HashSet[ConsumerThreadId] // 一个消费者线程对应一个ConsumerThreadId
    assert(nConsumers >= 1)
    for (i <- 0 until nConsumers)
      consumerSet += ConsumerThreadId(consumerIdString, i)
    consumerThreadIdsPerTopicMap.put(topic, consumerSet) // 每个topic都有多个Consumer线程,但是只有一个消费者进程
  }
  consumerThreadIdsPerTopicMap // topic到消费者线程集合的映射
}
一个消费者,对一个topic可以使用多个线程一起消费(一个进程可以有多个线程),当然一个消费者也可以消费多个topic。这里得到每个topic到消费者线程集合的映射.

// 先抛出来一个问题,这里只是准备了队列和流,数据什么时候填充呢??
val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
  threadIdSet.map(_ => {
    val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
    val stream = new KafkaStream[K,V](
      queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
    (queue, stream)
  }) 
).flatten.toList // threadIdSet是个集合,外层的topicThreadIds.values也是集合,所以用flatten压扁为queue-stream对
对于消费者而言,它只要指定要消费的topic和线程数量就可以了,每个线程都对应了一个队列,每个队列都是一个消息流.

3、registerConsumerInZK,消费者需要向ZK注册一个临时节点,路径为:/consumers/[group_id]/ids/[consumer_id],内容为订阅的topic
4、reinitializeConsumer这个方法里面声明一些listener, 当前Consumer在ZK注册之后,需要重新初始化Consumer. 对于全新的消费者,注册多个监听器,在zk的对应节点的注册事件发生时,会回调监听器的方法.
private def reinitializeConsumer[K,V](topicCount: TopicCount,
  queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {
  val dirs = new ZKGroupDirs(config.groupId)
  // ②listener to consumer and partition changes
  if (loadBalancerListener  null) {
    val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
    loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString,
      topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
  }
  // ② create listener for session expired event if not exist yet
  if (sessionExpirationListener  null) sessionExpirationListener =
    new ZKSessionExpireListener(dirs, consumerIdString, topicCount, loadBalancerListener)
  // ③ create listener for topic partition change event if not exist yet
  if (topicPartitionChangeListener  null)
    topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)

  // listener to consumer and partition changes
  zkUtils.zkClient.subscribeStateChanges(sessionExpirationListener)
  zkUtils.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
  // register on broker partition path changes.
  topicStreamsMap.foreach { topicAndStreams =>
    zkUtils.zkClient.subscribeDataChanges(BrokerTopicsPath+"/"+topicAndStreams._1, topicPartitionChangeListener)
  }

  // ④explicitly trigger load balancing for this consumer
  loadBalancerListener.syncedRebalance()
}
① 向/consumers/[group_id]/ids注册Child变更事件的loadBalancerListener,当消费组下的消费者发生变化时调用
② 注册sessionExpirationListener,监听状态变化事件.在session失效重新创建session时调用对于全新的消费者,注册多个监听器,在zk的对应节点的注册事件发生时,会回调监听器的方法.
③向/brokers/topics/[topic]注册Data变更事件的topicPartitionChangeListener,在topic数据发生变化时调用
④调用loadBalancerListener.syncedRebalance(), 会调用reblance方法进行consumer的初始化工作

好,重点看一下④
对于第fetcher的启动,我们可以一直跟踪到updateFetcher这个方法,这个里面调用了
ConsumerFetcherManager的startConnections方法


再往下,可以看到,程序中为每个partition启动了一个fetcherThread


而在每个fetcherThread中,会具体获取数据,并将其压入在consume方法中生成的blockingQueue中

好了,之前抛出来的问题这里有了答案。
到此为止,kafka的启动过程以及获取数据的流程就结束了。