博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
log4j2性能剖析
阅读量:5749 次
发布时间:2019-06-18

本文共 12002 字,大约阅读时间需要 40 分钟。

hot3.png

众所周知,log4j2在性能方面要比log4j提升不少,最典型的就是ringBuffer的使用,本文着重从运行原理分析一下,如何做到的性能提升,欢迎指正。

  • 无锁队列ringbuffer的使用

1、使用asyncLogger,把待记录日志放入ringbuffer,如AsyncLoggerConfigDisruptor类代码:

public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message,            final Throwable thrown) {        if (loggerDisruptor.isUseThreadLocals()) {            logWithThreadLocalTranslator(fqcn, level, marker, message, thrown);        } else {            // LOG4J2-1172: avoid storing non-JDK classes in ThreadLocals to avoid memory leaks in web apps            logWithVarargTranslator(fqcn, level, marker, message, thrown);        }    }

2、RingBufferLogEventTranslator,把需要记录的日志放入该对象,然后放入ringbuffer,如下:

private void logWithThreadLocalTranslator(final String fqcn, final Level level, final Marker marker,            final Message message, final Throwable thrown) {        // Implementation note: this method is tuned for performance. MODIFY WITH CARE!        final RingBufferLogEventTranslator translator = getCachedTranslator();        initTranslator(translator, fqcn, level, marker, message, thrown);        initTranslatorThreadValues(translator);        publish(translator);    }

3、把translator放入ringbuffer队列

private void publish(final RingBufferLogEventTranslator translator) {        if (!loggerDisruptor.tryPublish(translator)) {//先尝试直接放入,理论上只有队列满的情况下放置失败会            handleRingBufferFull(translator);//这里是处理队列满的情况下的逻辑        }    }

3.1如何放入ringbuffer,如何协调消费者

简单介绍下disruptor框架,使用single-writer方式,向ringbuffer里放置数据,一个线程,分配一个固定的数据位,操作是两阶段操作:

public boolean tryPublishEvent(EventTranslator
translator) { try { final long sequence = sequencer.tryNext();//先获取序号,否则把日志放哪里呢 translateAndPublish(translator, sequence);//有了序号,就把translator和序号绑定在一起了 return true; } catch (InsufficientCapacityException e) { return false;//有这个异常说明队列满了 } }

第一步:申请数据位,写入数据

@Override    public long tryNext() throws InsufficientCapacityException    {        return tryNext(1);    }    /**     * @see Sequencer#tryNext(int)     */    @Override    public long tryNext(int n) throws InsufficientCapacityException    {        if (n < 1)        {            throw new IllegalArgumentException("n must be > 0");        }        long current;        long next;        do        {            current = cursor.get();//获取ringbuffer的当前序号            next = current + n;//获取期望得到的用于存放本次日志请求的序号            if (!hasAvailableCapacity(gatingSequences, n, current))//无可用的容量啦,抛出异常            {                throw InsufficientCapacityException.INSTANCE;            }        }        while (!cursor.compareAndSet(current, next));//如果ringbuffer当前的序号和本次请求的当前序号即current,//则设置next或当前的序号,说白了就是,这段代码执行期间没有其他的线程改变当前序号,则直接更新        return next;    }

多线程操作需要由compareAndSet的cas操作保证,CAS是CPU级别的指令,把缓存和寄存器中的值同步至内存

第二步:数据提交

private void translateAndPublish(EventTranslator
translator, long sequence) { try { translator.translateTo(get(sequence), sequence);//根据序号获取到该序列号对应的RingBufferLogEvent,translateTo()方法把translator内部的日志相关信息绑定到RingBufferLogEvent的value上 } finally { sequencer.publish(sequence);//标示该sequence已经被使用,唤醒其他等待线程 } }
public void publish(final long sequence)    {        setAvailable(sequence);        waitStrategy.signalAllWhenBlocking();//唤醒其他消费线程    }

第三步:消费数据

public void run()    {        if (!running.compareAndSet(false, true))        {            throw new IllegalStateException("Thread is already running");        }        sequenceBarrier.clearAlert();        notifyStart();        T event = null;        long nextSequence = sequence.get() + 1L;        try        {            while (true)            {                try                {                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);                    while (nextSequence <= availableSequence)                    {                        event = dataProvider.get(nextSequence);//从ringbuffer里获取待处理的元素,ringbuffer继承自dataProvider                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);//记录日志                        nextSequence++;                    }                    sequence.set(availableSequence);                }                catch (final TimeoutException e)                {                    notifyTimeout(sequence.get());                }                catch (final AlertException ex)                {                    if (!running.get())                    {                        break;                    }                }                catch (final Throwable ex)                {                    exceptionHandler.handleEventException(ex, nextSequence, event);                    sequence.set(nextSequence);                    nextSequence++;                }            }        }        finally        {            notifyShutdown();            running.set(false);        }    }

3.2队列满的情况下,如何处理

private void handleRingBufferFull(final RingBufferLogEventTranslator translator) {        final EventRoute eventRoute = loggerDisruptor.getEventRoute(translator.level);        switch (eventRoute) {            case ENQUEUE:                loggerDisruptor.enqueueLogMessageInfo(translator);//这是异步的,看publishEvent方法                break;            case SYNCHRONOUS://同步的                logMessageInCurrentThread(translator.fqcn, translator.level, translator.marker, translator.message,                        translator.thrown);                break;            case DISCARD://直接丢弃                break;            default:                throw new IllegalStateException("Unknown EventRoute " + eventRoute);        }    }
public void publishEvent(EventTranslator
translator) { final long sequence = sequencer.next();//区别在这个方法 translateAndPublish(translator, sequence); }
@Override    public long next(int n)    {        if (n < 1)        {            throw new IllegalArgumentException("n must be > 0");        }        long current;        long next;        do        {            current = cursor.get();            next = current + n;            long wrapPoint = next - bufferSize;            long cachedGatingSequence = gatingSequenceCache.get();            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)            {                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);                if (wrapPoint > gatingSequence)                {                    waitStrategy.signalAllWhenBlocking();//唤醒消费者,赶紧干活,活太多了,加紧吧                    LockSupport.parkNanos(1); //线程挂起1纳秒// TODO, should we spin based on the wait strategy?                    continue;                }                gatingSequenceCache.set(gatingSequence);            }            else if (cursor.compareAndSet(current, next))//取不到序号,就一直取            {                break;            }        }        while (true);        return next;    }

 

  • disruptor环境初始化

1、通过AsyncLoggerConfigDisruptor实现,初始化调用start方法,完成如下配置:ringbuffer大小、disruptor等待策略、生产者模式、消费者线程池、事件处理线程,log4j2使用的是单个消费者,多生产者,单线程池,如下代码:

public synchronized void start() {        if (disruptor != null) {            LOGGER.trace("AsyncLoggerConfigDisruptor not starting new disruptor for this configuration, "                    + "using existing object.");            return;        }        LOGGER.trace("AsyncLoggerConfigDisruptor creating new disruptor for this configuration.");        ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize");        final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLoggerConfig.WaitStrategy");        executor = Executors.newSingleThreadExecutor(THREAD_FACTORY);        backgroundThreadId = DisruptorUtil.getExecutorThreadId(executor);        asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();        translator = mutable ? MUTABLE_TRANSLATOR : TRANSLATOR;        factory = mutable ? MUTABLE_FACTORY : FACTORY;        disruptor = new Disruptor<>(factory, ringBufferSize, executor, ProducerType.MULTI, waitStrategy);        final ExceptionHandler
errorHandler = DisruptorUtil.getAsyncLoggerConfigExceptionHandler(); disruptor.handleExceptionsWith(errorHandler); final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()}; disruptor.handleEventsWith(handlers); LOGGER.debug("Starting AsyncLoggerConfig disruptor for this configuration with ringbufferSize={}, " + "waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy .getClass().getSimpleName(), errorHandler); disruptor.start(); }

2、Log4jEventWrapperHandler类实现把日志落地的功能,onEvent方法,最后会调用该logger绑定的默认appender, 所有的appender写日志,最终都会由父类AbstractOutputStreamAppender关联的OutputStreamManager完成,日志落地方法如下:

protected synchronized void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {        if (immediateFlush && byteBuffer.position() == 0) {            writeToDestination(bytes, offset, length);            flushDestination();            return;        }        if (length >= byteBuffer.capacity()) {            // if request length exceeds buffer capacity, flush the buffer and write the data directly            flush();            writeToDestination(bytes, offset, length);        } else {            if (length > byteBuffer.remaining()) {                flush();            }            byteBuffer.put(bytes, offset, length);        }        if (immediateFlush) {            flush();        }    }
  • 性能分析:

因为要保证ByteBuffer的线程安全,所以此处会有线程锁,同log4j1一样,既然落地处都需要线程同步,但是为什么log4j2要比log4j性能好呢?

分析认为:log4j1写日志多线程情况是阻塞的,log4j2不会阻塞,生产者只负责生产,通过ringbuffer的无阻塞内存队列作为缓冲,多生产者多线程的竞争是通过CAS实现,性能较高,至于最后落地,虽然两者都会调用synchronized方法写入日志,log4j2的asynclogger支持多个消费者,每个消费者取一批待处理的日志,类似于分段,用于提高性能。

  • 异步写入的数据丢失问题

有人也许会问,既然使用了队列,那么服务宕机的情况会不会丢失日志呢?这个确实有可能。

1、log4j2的asynclogger也有一定的保证措施,不需要我们手动实现,当发生服务中止时,它会检查是否还有待消费的日志,如果有,循环200次,每次sleep 50ms,给消费者争取时间,直至消费完毕,如果在这期间仍然没有消费完,数据有可能丢失,代码示例如下:

public synchronized void stop() {        final Disruptor
temp = disruptor; if (temp == null) { LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor for this configuration already shut down."); return; // disruptor was already shut down by another thread } LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor for this configuration."); // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown(). disruptor = null; // client code fails with NPE if log after stop = OK // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed, // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU, // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain. for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) { try { Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while } catch (final InterruptedException e) { // ignored } } temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor executor for this configuration."); executor.shutdown(); // finally, kill the processor thread executor = null; // release reference to allow GC if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) { LOGGER.trace("AsyncLoggerConfigDisruptor: {} discarded {} events.", asyncQueueFullPolicy, DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy)); } }

2、log4j1使用异步的时候,如何保证系统宕机时的日志丢失问题,解决方案需要手动实现一个hook,服务停止时,完成数据流的flush

转载于:https://my.oschina.net/13426421702/blog/904249

你可能感兴趣的文章
iOS 绕过相册权限漏洞
查看>>
我的友情链接
查看>>
XP 安装ORACLE
查看>>
八、 vSphere 6.7 U1(八):分布式交换机配置(vMotion迁移网段)
查看>>
[转载] 中华典故故事(孙刚)——19 万岁
查看>>
修改hosts文件里面的主机名,oralce asm无法启动
查看>>
Maven学习总结(十)——使用Maven编译项目gbk的不可映射问题
查看>>
php5编译安装常见错误和解决办法集锦
查看>>
Linux远程访问及控制
查看>>
MongoDB实战系列之五:mongodb的分片配置
查看>>
Unable to determine local host from URL REPOSITORY_URL=http://
查看>>
Java Tomcat SSL 服务端/客户端双向认证(二)
查看>>
java基础(1)
查看>>
ORACLE配置,修改tnsnames.ora文件实例
查看>>
用户无法在输入框中键入数字
查看>>
Workstation服务无法启动导致无法访问文件服务器
查看>>
.Net组件程序设计之远程调用(二)
查看>>
ant中文教程
查看>>
Linux常用命令(一)
查看>>
WSUS数据库远端存储条件下切换域及数据库迁移
查看>>