使用异步日志改善服务性能

陈晟

  在提供高QPS/高并发的接口服务时,I/O往往会成为服务性能的瓶颈,例如服务需要记录调用日志,但当磁盘IO无法承担频繁的写请求时,就会导致接口超时,致使服务质量下降
  来看一下一个dubbo服务在繁忙时的线程状态信息

"DubboServerHandler-172.24.16.216:44880-thread-899" Id=1023 BLOCKED on org.apache.logging.log4j.core.appender.rolling.RollingRandomAccessFileManager@55811dcb owned by "DubboServerHandler-172.24.16.216:44880-thread-507" Id=629
    at org.apache.logging.log4j.core.appender.rolling.RollingFileManager.checkRollover(RollingFileManager.java:149)
    -  blocked on org.apache.logging.log4j.core.appender.rolling.RollingRandomAccessFileManager@55811dcb
    at org.apache.logging.log4j.core.appender.RollingRandomAccessFileAppender.append(RollingRandomAccessFileAppender.java:88)
    at org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:155)
    at org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:128)
    at org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:119)
    at org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:84)
    at org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:390)
    at org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:375)
    ...

    Number of locked synchronizers = 1
    - java.util.concurrent.ThreadPoolExecutor$Worker@6ff4f4ce

  在上面的线程日志中,可以发现当前线程在执行RollingFileManager.checkRollover方法时发生了竞争,致使线程出现了BLOCKED的状态,当这种情况频繁发生时,应用有限的线程池就会溢出,导致该应用无法提供服务
  RollingRandomAccessFileManager是Log4j用来记录日志的写入类,它会将日志写入磁盘文件,来看一下RollingFileManager.checkRollover干了些什么

/**
 * Determine if a rollover should occur.
 * @param event The LogEvent.
 */
public synchronized void checkRollover(final LogEvent event) {  
    if (triggeringPolicy.isTriggeringEvent(event)) {
        rollover();
    }
}

  这里可以看到RollingRandomAccessFileManager需要和其他线程竞争一个监控锁monitor,也就不难解释为什么那么多线程都会处于BLOCKED的状态。
  那么log4j的日志写入时,必然会发生锁竞争的情况,log4j有提供什么办法来提高自己日志写入的吞吐量么?答案是有的,log4j在他的2.x版本中提供了新的异步日志写入方法,AsyncLogger。配置异步日志的方法可以参看log4j的官方网站,这里不进行说明,重点来分析AsyncLogger的性能和其实现方法
  AsyncLogger的性能提升:   这个两个图来自Log4j官方,对比了新的AsynLogger在全异步,混合模式、之前的AsyncAppender的性能和sync写模式下的Log4j吞吐量,在线程数较多的情况下,性能差距越发明显

AsyncLogger的实现简述

  按个人的理解,异步Logger是让业务逻辑把日志信息放入Disruptor队列后可以直接返回,虽然最终还是写文件,但降低了最终对RollingRandomAccessFileManager的锁竞争,大大增加了日志的吞吐量

AsyncLogger的调用逻辑

1.Disruptor使用了一个RingBuffer替代队列,用生产者消费者指针替代锁。
2.生产者消费者指针使用CPU支持的整数自增,无需加锁并且速度很快。Java的实现在Unsafe package中。
重点贴一下几段代码,详细描述一下异步日志的调用过程
1.日志生产过程,即日志写入RingBuffer的过程:调用AsyncLoggerConfigDisruptor.tryEnqueue方法

public boolean tryEnqueue(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {  
    final LogEvent logEvent = prepareEvent(event);
    return disruptor.getRingBuffer().tryPublishEvent(translator, logEvent, asyncLoggerConfig);
}
public boolean tryPublishEvent(EventTranslator<E> translator)  
{
    try
    {
        //先获取序号,否则把日志放哪里呢
        final long sequence = sequencer.tryNext();
        //有了序号,就把translator和序号绑定在一起了
        translateAndPublish(translator, sequence);
        return true;
    }
    catch (InsufficientCapacityException e)
    {
        return false;//有这个异常说明队列满了
    }
}

具体写入过程:
申请数据位,写入数据

@Override
public long tryNext() throws InsufficientCapacityException  
{
    return tryNext(1);
}

/**
 * @see Sequencer#tryNext(int)
 */
@Override
public long tryNext(int n) throws InsufficientCapacityException  
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }

    long current;
    long next;

    do{
        current = cursor.get();//获取ringbuffer的当前序号
        next = current + n;//获取期望得到的用于存放本次日志请求的序号

        if (!hasAvailableCapacity(gatingSequences, n, current))//无可用的容量啦,抛出异常
        {
            throw InsufficientCapacityException.INSTANCE;
        }
    }
    while (!cursor.compareAndSet(current, next));
    //如果ringbuffer当前的序号和本次请求的当前序号即current,
    //则设置next或当前的序号,说白了就是,这段代码执行期间没有其他的线程改变当前序号,则直接更新

    return next;
}

数据提交至RingBuffer中

private void translateAndPublish(EventTranslator<E> translator, long sequence)  
{
    try
    {
    //根据序号获取到该序列号对应的RingBufferLogEvent,translateTo()方法把translator内部的日志相关信息绑定到RingBufferLogEvent的value上
        translator.translateTo(get(sequence), sequence);

    }
    finally
    {
     //标示该sequence已经被使用,唤醒其他等待线程
     sequencer.publish(sequence);
    }
}
public void publish(final long sequence){  
    setAvailable(sequence);
    waitStrategy.signalAllWhenBlocking();//唤醒其他消费线程
}

2.日志消费过程,即从RingBuffer中取出,写入日志文件

public void run()  
{
    if (!running.compareAndSet(false, true))
    {
        throw new IllegalStateException("Thread is already running");
    }
    sequenceBarrier.clearAlert();

    notifyStart();

    T event = null;
    long nextSequence = sequence.get() + 1L;
    try
    {
        while (true)
        {
            try
            {
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);

                while (nextSequence <= availableSequence)
                {
                //从ringbuffer里获取待处理的元素,ringbuffer继承自dataProvider
                    event = dataProvider.get(nextSequence);
            //记录日志
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }

                sequence.set(availableSequence);
            }
            catch (final TimeoutException e)
            {
                notifyTimeout(sequence.get());
            }
            catch (final AlertException ex)
            {
                if (!running.get())
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                exceptionHandler.handleEventException(ex, nextSequence, event);
                sequence.set(nextSequence);
                nextSequence++;
            }
        }
    }
    finally
    {
        notifyShutdown();
        running.set(false);
    }
}

  onEvent方法,最后会调用该logger绑定的默认appender, 所有的appender写日志,最终都会由父类AbstractOutputStreamAppender关联的OutputStreamManager完成,日志落地方法如下:

protected synchronized void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {  
    if (immediateFlush && byteBuffer.position() == 0) {
        writeToDestination(bytes, offset, length);
        flushDestination();
        return;
    }
    if (length >= byteBuffer.capacity()) {
        // if request length exceeds buffer capacity, flush the buffer and write the data directly
        flush();
        writeToDestination(bytes, offset, length);
    } else {
        if (length > byteBuffer.remaining()) {
            flush();
        }
        byteBuffer.put(bytes, offset, length);
    }
    if (immediateFlush) {
        flush();
    }
}

  因为要保证ByteBuffer的线程安全,所以此处的write方法会有线程锁,同log4j1一样,既然落地处都需要线程同步,但是为什么log4j2要比log4j性能好呢?
  个人分析认为:log4j1写日志多线程情况是阻塞的,log4j2不会阻塞,生产者只负责生产,通过ringbuffer的无阻塞内存队列作为缓冲,多生产者多线程的竞争是通过CAS实现,性能较高,至于最后落地,虽然两者都会调用synchronized方法写入日志,log4j2的asynclogger支持多个消费者,每个消费者取一批待处理的日志,类似于分段,用于提高性能。

  尽管AsyncLogger 能够大幅度的提高性能,但是也会带来一些问题,下面是翻译官方的文档的Trade-offs:
Benefits:
  1.Higher throughput,达到相对于sync logger的6-68倍的吞吐量
  2.Lower logging latency,latency是调用Logger.log直到return的时间,asyncLogger的latency比syncLogger以及基于queue的aysncAppender都要低,不仅平均latency低,而且99%、95%latency 也都低于后两者
  3.降低极端大的日志量时候的延迟尖峰
Drawbacks:
  1.Error handling, 如果在打印日志的时候出现错误,使用asyncLogger,业务是不知道异常的(可以通过配置ExceptionHandler处理异常),如果打印日志是业务逻辑的一部分,不建议使用asyncLogger
  2.打印一些可变的内容的时候,使用asyncLogger 会出现问题。大部分时间,不需要担心这点,Log4j确保了类似于 logger.debug("My object is {}", myObject),使用myObject在打印日志的时刻的版本打印(Log4j 所有打印都日志都是封装到Message的实现类里,存储在 final String里),不管之后是否改变。但是log4j也支持一些了可变的Message,如 MapMessage and StructuredDataMessage ,这些如果在打印日志时候改变,就有问题了