高性能的事件总线Mbassador介绍及源码分析

吴琦隆

Mbassador作为2017年你不能错过的Java类库之一,实现了发布-订阅模式的轻量级的,高性能的事件总线。它易于使用,并功能丰富,易于扩展,而同时又保证资源的高效利用和高性能。下面简单的介绍一下什么是事件总线,以及用一个简单的例子来分析一下mbassador的实现方式。

什么是事件总线

事件总线类似于消息总线,能让不同系统或模块通过共享的接口集合传递消息。它类似于计算机系统中的通信总线,是CPU,主存和外设之间通信的焦点。事件总线的消息传递会经过以下五个步骤
- 创建:发送者创建消息并组装其数据
- 发送:发送者把消息添加到通道中
- 传送:事件总线把消息从发送者的系统转移到接受者的系统
- 接收:接受者从通道中读取消息
- 处理:接受者处理读到的消息

为何使用事件总线

  • 异步通信:发送者不必等待接收者接收和处理消息,甚至不必等待事件总线传送消息,发送者只需等待消息发送,即事件总线将消息存储在通道中,就可以执行其它操作,同时消息由后台传送。
  • 可变的处理速度:异步通信让发送者可以按照自己的速度把请求批量发送给接受者,接受者也可以按照自己的速度处理请求,这就使得所有应用不必因等待彼此而浪费时间。

Mbassador示例

Mbassador是注解驱动的,不同于观察者模式中的观察者需要实现一个特定接口,这里只需要一个Listener注解就可表明观察者身份,并且Handler注解下的消息处理方法可以根据参数类型的不同处理不同的消息

public class MbassadorTest {

    @Listener(references = References.Strong)
    public class SimpleFileListener {

        /**
         * 接收String类型消息,进行异步处理
         * @param s
         * @throws InterruptedException
         */
        @Handler(delivery = Invoke.Asynchronously)
        public void expensiveOperation(String s) throws InterruptedException {
            TimeUnit.SECONDS.sleep(2);
            System.out.println("---this is asyncHandler:" + s);
        }
    }

    @Test
    public void test() throws InterruptedException {
        //初始化一个事件总线
        IBusConfiguration configuration = new BusConfiguration()
                .addFeature(Feature.SyncPubSub.Default())
                .addFeature(Feature.AsynchronousHandlerInvocation.Default(2,2))
                .addFeature(Feature.AsynchronousMessageDispatch.Default().setNumberOfMessageDispatchers(4));
        MBassador bus = new MBassador(configuration);
        //SimpleFileListener接收消息
        bus.subscribe(new SimpleFileListener());
        //异步地发送String类型的消息
        for (int i = 0; i < 10; i++) {
            bus.post("message").asynchronously();
        }
        System.out.println("next...");
        TimeUnit.SECONDS.sleep(20);
    }
}

运行程序,先输出:

next...  

这表明了消息分发的异步性,发送者只需要使用asynchronously()方法,不必等待返回结果,就可以继续执行它自己的流程。
接着,控制台以间隔2s的速率输出

---this is asyncHandler:message
---this is asyncHandler:message

这表明了消息处理的异步性,接收者不是依次处理消息,而是有着“自己的处理速率”

消息分发的异步性

是借助于阻塞队列BlockingQueue实现的,当调用异步发送的asynchronously()方法时,消息被封装成MessagePublication放入AbstractSyncAsyncMessageBus的阻塞队列中:

private final BlockingQueue<IMessagePublication> pendingMessages;  
protected IMessagePublication addAsynchronousPublication(MessagePublication publication) {  
        try {
            pendingMessages.put(publication);
            return publication.markScheduled();
        } catch (InterruptedException e) {
            handlePublicationError(new InternalPublicationError(e, "Error while adding an asynchronous message publication", publication));
            return publication;
        }
    }

AbstractSyncAsyncMessageBus在初始化时会根据MessageDispatcher的数量新建数个消费者,他们在不同的线程中不断尝试从队列取出MessagePublication进行消息发送:

 private void initDispatcherThreads(Feature.AsynchronousMessageDispatch configuration) {
        for (int i = 0; i < configuration.getNumberOfMessageDispatchers(); i++) {
            //根据Dispatchers的数量新建数个线程一直运行,消费放入队列中的消息
            Thread dispatcher = configuration.getDispatcherThreadFactory().newThread(new Runnable() {
                public void run() {
                    while (true) {
                        IMessagePublication publication = null;
                        try {
                            publication = pendingMessages.take();
                            publication.execute();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        } catch(Throwable t){
                            handlePublicationError(new InternalPublicationError(t, "Error in asynchronous dispatch",publication));
                        }
                    }
                }
            });
            dispatcher.setName("MsgDispatcher-"+i);
            dispatchers.add(dispatcher);
            dispatcher.start();
        }
    }

上面Dispatchers的数量的数量就是在示例程序中设置的(Feature.AsynchronousMessageDispatch.Default().setNumberOfMessageDispatchers(4),默认的线程数量是2

消息处理的异步性

IHandlerInvocation接口的不同实现,决定了消息处理的异步性,同步方式的ReflectiveHandlerInvocation按顺序处理消息:

@Override
    public void invoke(final Object listener, final Object message, MessagePublication publication){
        final Method handler = getContext().getHandler().getMethod();
        try {
            handler.invoke(listener, message);
        } catch (IllegalAccessException e) {

而指定了@Handler(delivery = Invoke.Asynchronously)属性,实现为AsynchronousHandlerInvocation,使用线程池处理消息:

@Override
    public void invoke(final Object listener, final Object message, final MessagePublication publication){
        executor.execute(new Runnable() {
            @Override
            public void run() {
                    delegate.invoke(listener, message, publication);
            }
        });
    }

线程池的属性是在MBassador()初始化时确定,示例程序中我们设置为Feature.AsynchronousHandlerInvocation.Default(2,2)

public static final AsynchronousHandlerInvocation Default(int minThreadCount, int maxThreadCount){  
            return new AsynchronousHandlerInvocation().setExecutor(new ThreadPoolExecutor(minThreadCount, maxThreadCount, 1,
                    TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), MessageHandlerThreadFactory));
        }

即核心线程数与最大线程数都为2,因此,示例程序每隔两秒打印两条信息,默认线程池的corePoolSize是计算机的核心数,maximumPoolSize是corePoolSize*2。

上面分析的就是MBassador作为实现Publish–subscribe模式事件总线的核心代码,实现了异步通信并且借助阻塞队列和线程池可以合理控制消息发送和消息处理的速率。当然,它还具有很多其他优秀的特性,比如注解驱动的特性使它易于使用,使用消息信封@Enveloped可用于将不同类型的消息传递给单个处理程序,使用@Filter可以实现消息过滤,以及消息接收者发生异常时可以使用IPublicationErrorHandler进行异常处理等,这里就不多做介绍了,大家可以在实际使用中进行尝试。