Merge pull request #1198 from rmannibucau/rmannibucau/reduce-logservice-overhead
rewrite the CicularBuffer to reduce logservice overhead
diff --git a/log/src/main/java/org/apache/karaf/log/core/internal/CircularBuffer.java b/log/src/main/java/org/apache/karaf/log/core/internal/CircularBuffer.java
index a8482b0..4b9da9f 100644
--- a/log/src/main/java/org/apache/karaf/log/core/internal/CircularBuffer.java
+++ b/log/src/main/java/org/apache/karaf/log/core/internal/CircularBuffer.java
@@ -18,92 +18,74 @@
*/
package org.apache.karaf.log.core.internal;
-import java.lang.reflect.Array;
-import java.util.ArrayList;
+import org.ops4j.pax.logging.spi.PaxLoggingEvent;
+
import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.IntFunction;
+import java.util.stream.IntStream;
+
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.toList;
/**
- * An array that only keeps the last N elements added
+ * An array that only keeps the last N elements added.
+ * <p>
+ * It is likely that it writes way more than it reads (add vs getElements) since logs should be continuous appended
+ * but their query should be quite rare so we want to optimize the append path.
+ * <p>
+ * Important: it can happen a small inconsistency between add() and getElements() but the fact getElements()
+ * sorts the data makes it hurtless and it avoids to have a lock in this buffer which must keep a "0-overhead"
+ * on the runtime.
*/
-public class CircularBuffer<T> {
+public class CircularBuffer {
- private T[] elements;
- private transient int start;
- private transient int end;
- private transient boolean full;
- private final int maxElements;
- private Class<?> type;
+ private final AtomicInteger currentIdx = new AtomicInteger(0);
+ private final AtomicReferenceArray<PaxLoggingEvent> buffer;
- public CircularBuffer(int size, Class<?> type) {
+ public CircularBuffer(int size) {
if (size <= 0) {
throw new IllegalArgumentException("The size must be greater than 0");
}
- this.type = type;
- maxElements = size;
- clear();
+ this.buffer = new AtomicReferenceArray<>(size);
}
- private int size() {
- if (end == start) {
- return full ? maxElements : 0;
- } else if (end < start) {
- return maxElements - start + end;
- } else {
- return end - start;
- }
+ public int maxSize() {
+ return buffer.length();
}
- @SuppressWarnings("unchecked")
- public synchronized void clear() {
- start = 0;
- end = 0;
- full = false;
- elements = (T[])Array.newInstance(type, maxElements);
- }
-
- public synchronized void add(T element) {
+ public void add(final PaxLoggingEvent element) {
if (null == element) {
- throw new NullPointerException("Attempted to add null object to buffer");
+ throw new NullPointerException("Attempted to add null object to buffer");
}
- if (full) {
- increaseStart();
- }
- elements[end] = element;
- increaseEnd();
-
+ doAdd(element);
}
- private void increaseStart() {
- start++;
- if (start >= maxElements) {
- start = 0;
- }
+ public List<PaxLoggingEvent> getElements(final int requestedCount) {
+ final int max = Math.min(buffer.length(), requestedCount);
+ final int current = currentIdx.get() % buffer.length();
+ return collectEvents(max, idx -> buffer.get((current + idx) % buffer.length()));
}
- private void increaseEnd() {
- end++;
- if (end >= maxElements) {
- end = 0;
- }
- if (end == start) {
- full = true;
- }
+ private List<PaxLoggingEvent> collectEvents(final int max, final IntFunction<PaxLoggingEvent> mapper) {
+ return IntStream.range(0, max)
+ .mapToObj(mapper)
+ .filter(Objects::nonNull) // not initialized yet
+ .sorted(comparing(PaxLoggingEvent::getTimeStamp)) // not critical but better when dumped
+ .collect(toList());
}
- public synchronized Iterable<T> getElements() {
- return getElements(size());
+ private void doAdd(final PaxLoggingEvent element) {
+ final int idx = currentIdx.getAndUpdate(value -> {
+ final int newValue = value + 1;
+ if (newValue >= buffer.length()) {
+ return 0;
+ }
+ return newValue;
+ }) % buffer.length();
+ buffer.set(idx, element);
}
-
- public synchronized Iterable<T> getElements(int nb) {
- int s = size();
- nb = Math.min(Math.max(0, nb), s);
- List<T> result = new ArrayList<>();
- for (int i = 0; i < nb; i++) {
- result.add(elements[(i + s - nb + start) % maxElements]);
- }
- return result;
- }
-
-
}
diff --git a/log/src/main/java/org/apache/karaf/log/core/internal/LogServiceImpl.java b/log/src/main/java/org/apache/karaf/log/core/internal/LogServiceImpl.java
index d0c52fb..2fdfa32 100644
--- a/log/src/main/java/org/apache/karaf/log/core/internal/LogServiceImpl.java
+++ b/log/src/main/java/org/apache/karaf/log/core/internal/LogServiceImpl.java
@@ -34,14 +34,14 @@
static final String CONFIGURATION_PID = "org.ops4j.pax.logging";
private final ConfigurationAdmin configAdmin;
- private final CircularBuffer<PaxLoggingEvent> buffer;
+ private volatile CircularBuffer buffer;
private List<PaxAppender> appenders;
public LogServiceImpl(ConfigurationAdmin configAdmin, int size) {
this.configAdmin = configAdmin;
this.appenders = new CopyOnWriteArrayList<>();
- this.buffer = new CircularBuffer<>(size, PaxLoggingEvent.class);
+ this.buffer = new CircularBuffer(size);
}
private LogServiceInternal getDelegate(Dictionary<String, Object> config) {
@@ -126,7 +126,7 @@
@Override
public Iterable<PaxLoggingEvent> getEvents() {
- return buffer.getElements();
+ return buffer.getElements(buffer.maxSize());
}
@Override
@@ -135,8 +135,9 @@
}
@Override
- public void clearEvents() {
- buffer.clear();
+ public void clearEvents() { // just reset the buffer, reduce the number of "write locked" operations in the buffer
+ final int size = this.buffer.maxSize();
+ this.buffer = new CircularBuffer(size);
}
@Override
@@ -172,7 +173,7 @@
}
@Override
- public synchronized void doAppend(PaxLoggingEvent event) {
+ public void doAppend(PaxLoggingEvent event) {
event.getProperties(); // ensure MDC properties are copied
KarafLogEvent eventCopy = new KarafLogEvent(event);
this.buffer.add(eventCopy);
diff --git a/log/src/test/java/org/apache/karaf/log/core/internal/CircularBufferTest.java b/log/src/test/java/org/apache/karaf/log/core/internal/CircularBufferTest.java
new file mode 100644
index 0000000..dfd1c1f
--- /dev/null
+++ b/log/src/test/java/org/apache/karaf/log/core/internal/CircularBufferTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.karaf.log.core.internal;
+
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.junit.Test;
+import org.ops4j.pax.logging.log4j2.internal.spi.PaxLoggingEventImpl;
+import org.ops4j.pax.logging.spi.PaxLoggingEvent;
+
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toList;
+import static org.junit.Assert.assertEquals;
+
+public class CircularBufferTest {
+ @Test
+ public void use() {
+ final CircularBuffer buffer = new CircularBuffer(3);
+ assertEquals(0, buffer.getElements(buffer.maxSize()).size());
+ final PaxLoggingEvent e1 = addAndAssertEvent(buffer);
+ final PaxLoggingEvent e2 = addAndAssertEvent(buffer, e1);
+ final PaxLoggingEvent e3 = addAndAssertEvent(buffer, e1, e2);
+ final PaxLoggingEvent e4 = addAndAssertEvent(buffer, e2, e3);
+ final PaxLoggingEvent e5 = addAndAssertEvent(buffer, e3, e4);
+ final PaxLoggingEvent e6 = addAndAssertEvent(buffer, e4, e5);
+ addAndAssertEvent(buffer, e5, e6);
+ }
+
+ private PaxLoggingEvent addAndAssertEvent(final CircularBuffer buffer, final PaxLoggingEvent... previous) {
+ final PaxLoggingEvent e4 = newEvent();
+ buffer.add(e4);
+ assertEquals(Stream.concat(Stream.of(previous), Stream.of(e4)).collect(toList()), buffer.getElements(buffer.maxSize()));
+ return e4;
+ }
+
+ private PaxLoggingEvent newEvent() {
+ return new PaxLoggingEventImpl(new Log4jLogEvent());
+ }
+}