众所周知,log4j2在性能方面要比log4j提升不少,最典型的就是ringBuffer的使用,本文着重从运行原理分析一下,如何做到的性能提升,欢迎指正。
- 无锁队列ringbuffer的使用
1、使用asyncLogger,把待记录日志放入ringbuffer,如AsyncLoggerConfigDisruptor类代码:
public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) { if (loggerDisruptor.isUseThreadLocals()) { logWithThreadLocalTranslator(fqcn, level, marker, message, thrown); } else { // LOG4J2-1172: avoid storing non-JDK classes in ThreadLocals to avoid memory leaks in web apps logWithVarargTranslator(fqcn, level, marker, message, thrown); } }
2、RingBufferLogEventTranslator,把需要记录的日志放入该对象,然后放入ringbuffer,如下:
private void logWithThreadLocalTranslator(final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) { // Implementation note: this method is tuned for performance. MODIFY WITH CARE! final RingBufferLogEventTranslator translator = getCachedTranslator(); initTranslator(translator, fqcn, level, marker, message, thrown); initTranslatorThreadValues(translator); publish(translator); }
3、把translator放入ringbuffer队列
private void publish(final RingBufferLogEventTranslator translator) { if (!loggerDisruptor.tryPublish(translator)) {//先尝试直接放入,理论上只有队列满的情况下放置失败会 handleRingBufferFull(translator);//这里是处理队列满的情况下的逻辑 } }
3.1如何放入ringbuffer,如何协调消费者
简单介绍下disruptor框架,使用single-writer方式,向ringbuffer里放置数据,一个线程,分配一个固定的数据位,操作是两阶段操作:
public boolean tryPublishEvent(EventTranslatortranslator) { try { final long sequence = sequencer.tryNext();//先获取序号,否则把日志放哪里呢 translateAndPublish(translator, sequence);//有了序号,就把translator和序号绑定在一起了 return true; } catch (InsufficientCapacityException e) { return false;//有这个异常说明队列满了 } }
第一步:申请数据位,写入数据
@Override public long tryNext() throws InsufficientCapacityException { return tryNext(1); } /** * @see Sequencer#tryNext(int) */ @Override public long tryNext(int n) throws InsufficientCapacityException { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { current = cursor.get();//获取ringbuffer的当前序号 next = current + n;//获取期望得到的用于存放本次日志请求的序号 if (!hasAvailableCapacity(gatingSequences, n, current))//无可用的容量啦,抛出异常 { throw InsufficientCapacityException.INSTANCE; } } while (!cursor.compareAndSet(current, next));//如果ringbuffer当前的序号和本次请求的当前序号即current,//则设置next或当前的序号,说白了就是,这段代码执行期间没有其他的线程改变当前序号,则直接更新 return next; }
多线程操作需要由compareAndSet的cas操作保证,CAS是CPU级别的指令,把缓存和寄存器中的值同步至内存
第二步:数据提交
private void translateAndPublish(EventTranslatortranslator, long sequence) { try { translator.translateTo(get(sequence), sequence);//根据序号获取到该序列号对应的RingBufferLogEvent,translateTo()方法把translator内部的日志相关信息绑定到RingBufferLogEvent的value上 } finally { sequencer.publish(sequence);//标示该sequence已经被使用,唤醒其他等待线程 } }
public void publish(final long sequence) { setAvailable(sequence); waitStrategy.signalAllWhenBlocking();//唤醒其他消费线程 }
第三步:消费数据
public void run() { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); T event = null; long nextSequence = sequence.get() + 1L; try { while (true) { try { final long availableSequence = sequenceBarrier.waitFor(nextSequence); while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence);//从ringbuffer里获取待处理的元素,ringbuffer继承自dataProvider eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);//记录日志 nextSequence++; } sequence.set(availableSequence); } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (!running.get()) { break; } } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } } finally { notifyShutdown(); running.set(false); } }
3.2队列满的情况下,如何处理
private void handleRingBufferFull(final RingBufferLogEventTranslator translator) { final EventRoute eventRoute = loggerDisruptor.getEventRoute(translator.level); switch (eventRoute) { case ENQUEUE: loggerDisruptor.enqueueLogMessageInfo(translator);//这是异步的,看publishEvent方法 break; case SYNCHRONOUS://同步的 logMessageInCurrentThread(translator.fqcn, translator.level, translator.marker, translator.message, translator.thrown); break; case DISCARD://直接丢弃 break; default: throw new IllegalStateException("Unknown EventRoute " + eventRoute); } }
public void publishEvent(EventTranslatortranslator) { final long sequence = sequencer.next();//区别在这个方法 translateAndPublish(translator, sequence); }
@Override public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { current = cursor.get(); next = current + n; long wrapPoint = next - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { waitStrategy.signalAllWhenBlocking();//唤醒消费者,赶紧干活,活太多了,加紧吧 LockSupport.parkNanos(1); //线程挂起1纳秒// TODO, should we spin based on the wait strategy? continue; } gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next))//取不到序号,就一直取 { break; } } while (true); return next; }
- disruptor环境初始化
1、通过AsyncLoggerConfigDisruptor实现,初始化调用start方法,完成如下配置:ringbuffer大小、disruptor等待策略、生产者模式、消费者线程池、事件处理线程,log4j2使用的是单个消费者,多生产者,单线程池,如下代码:
public synchronized void start() { if (disruptor != null) { LOGGER.trace("AsyncLoggerConfigDisruptor not starting new disruptor for this configuration, " + "using existing object."); return; } LOGGER.trace("AsyncLoggerConfigDisruptor creating new disruptor for this configuration."); ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize"); final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLoggerConfig.WaitStrategy"); executor = Executors.newSingleThreadExecutor(THREAD_FACTORY); backgroundThreadId = DisruptorUtil.getExecutorThreadId(executor); asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create(); translator = mutable ? MUTABLE_TRANSLATOR : TRANSLATOR; factory = mutable ? MUTABLE_FACTORY : FACTORY; disruptor = new Disruptor<>(factory, ringBufferSize, executor, ProducerType.MULTI, waitStrategy); final ExceptionHandlererrorHandler = DisruptorUtil.getAsyncLoggerConfigExceptionHandler(); disruptor.handleExceptionsWith(errorHandler); final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()}; disruptor.handleEventsWith(handlers); LOGGER.debug("Starting AsyncLoggerConfig disruptor for this configuration with ringbufferSize={}, " + "waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy .getClass().getSimpleName(), errorHandler); disruptor.start(); }
2、Log4jEventWrapperHandler类实现把日志落地的功能,onEvent方法,最后会调用该logger绑定的默认appender, 所有的appender写日志,最终都会由父类AbstractOutputStreamAppender关联的OutputStreamManager完成,日志落地方法如下:
protected synchronized void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) { if (immediateFlush && byteBuffer.position() == 0) { writeToDestination(bytes, offset, length); flushDestination(); return; } if (length >= byteBuffer.capacity()) { // if request length exceeds buffer capacity, flush the buffer and write the data directly flush(); writeToDestination(bytes, offset, length); } else { if (length > byteBuffer.remaining()) { flush(); } byteBuffer.put(bytes, offset, length); } if (immediateFlush) { flush(); } }
- 性能分析:
因为要保证ByteBuffer的线程安全,所以此处会有线程锁,同log4j1一样,既然落地处都需要线程同步,但是为什么log4j2要比log4j性能好呢?
分析认为:log4j1写日志多线程情况是阻塞的,log4j2不会阻塞,生产者只负责生产,通过ringbuffer的无阻塞内存队列作为缓冲,多生产者多线程的竞争是通过CAS实现,性能较高,至于最后落地,虽然两者都会调用synchronized方法写入日志,log4j2的asynclogger支持多个消费者,每个消费者取一批待处理的日志,类似于分段,用于提高性能。
- 异步写入的数据丢失问题
有人也许会问,既然使用了队列,那么服务宕机的情况会不会丢失日志呢?这个确实有可能。
1、log4j2的asynclogger也有一定的保证措施,不需要我们手动实现,当发生服务中止时,它会检查是否还有待消费的日志,如果有,循环200次,每次sleep 50ms,给消费者争取时间,直至消费完毕,如果在这期间仍然没有消费完,数据有可能丢失,代码示例如下:
public synchronized void stop() { final Disruptortemp = disruptor; if (temp == null) { LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor for this configuration already shut down."); return; // disruptor was already shut down by another thread } LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor for this configuration."); // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown(). disruptor = null; // client code fails with NPE if log after stop = OK // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed, // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU, // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain. for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) { try { Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while } catch (final InterruptedException e) { // ignored } } temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor executor for this configuration."); executor.shutdown(); // finally, kill the processor thread executor = null; // release reference to allow GC if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) { LOGGER.trace("AsyncLoggerConfigDisruptor: {} discarded {} events.", asyncQueueFullPolicy, DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy)); } }
2、log4j1使用异步的时候,如何保证系统宕机时的日志丢失问题,解决方案需要手动实现一个hook,服务停止时,完成数据流的flush