blob: 683d2ed0f7147dfeae471ceb7297bb969482a77b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.utils;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.collect.Range;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import org.apache.cassandra.cql3.CQLTester;
import static com.google.common.collect.Lists.newArrayList;
import static java.util.stream.Collectors.groupingBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class StatusLoggerTest extends CQLTester
{
private static final Logger log = LoggerFactory.getLogger(StatusLoggerTest.class);
@Test
public void testStatusLoggerPrintsStatusOnlyOnceWhenInvokedConcurrently() throws Exception
{
ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(StatusLogger.class);
InMemoryAppender inMemoryAppender = new InMemoryAppender();
logger.addAppender(inMemoryAppender);
logger.setLevel(Level.TRACE);
try
{
submitTwoLogRequestsConcurrently();
verifyOnlySingleStatusWasAppendedConcurrently(inMemoryAppender.events);
}
finally
{
assertTrue("Could not remove in memory appender", logger.detachAppender(inMemoryAppender));
}
}
private void submitTwoLogRequestsConcurrently() throws InterruptedException
{
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(StatusLogger::log);
executorService.submit(StatusLogger::log);
executorService.shutdown();
Assert.assertTrue(executorService.awaitTermination(1, TimeUnit.MINUTES));
}
private void verifyOnlySingleStatusWasAppendedConcurrently(List<ILoggingEvent> events)
{
Map<String, List<ILoggingEvent>> eventsByThread = events.stream().collect(groupingBy(ILoggingEvent::getThreadName));
List<String> threadNames = newArrayList(eventsByThread.keySet());
assertEquals("Expected events from 2 threads only", 2, threadNames.size());
List<ILoggingEvent> firstThreadEvents = eventsByThread.get(threadNames.get(0));
List<ILoggingEvent> secondThreadEvents = eventsByThread.get(threadNames.get(1));
assertTrue("Expected at least one event from the first thread", firstThreadEvents.size() >= 1);
assertTrue("Expected at least one event from the second thread", secondThreadEvents.size() >= 1);
if (areDisjunctive(firstThreadEvents, secondThreadEvents))
{
log.debug("Event time ranges are disjunctive - log invocations were made one after another");
}
else
{
verifyStatusWasPrintedAndBusyEventOccured(firstThreadEvents, secondThreadEvents);
}
}
private boolean areDisjunctive(List<ILoggingEvent> firstThreadEvents, List<ILoggingEvent> secondThreadEvents)
{
Range<Long> firstThreadTimeRange = timestampsRange(firstThreadEvents);
Range<Long> secondThreadTimeRange = timestampsRange(secondThreadEvents);
boolean connected = firstThreadTimeRange.isConnected(secondThreadTimeRange);
boolean disjunctive = !connected || firstThreadTimeRange.intersection(secondThreadTimeRange).isEmpty();
log.debug("Time ranges {}, {}, disjunctive={}", firstThreadTimeRange, secondThreadTimeRange, disjunctive);
return disjunctive;
}
private Range<Long> timestampsRange(List<ILoggingEvent> events)
{
List<Long> timestamps = events.stream().map(ILoggingEvent::getTimeStamp).collect(Collectors.toList());
Long min = timestamps.stream().min(Comparator.naturalOrder()).get();
Long max = timestamps.stream().max(Comparator.naturalOrder()).get();
// It's open on one side to cover a case when second status starts printing at the same timestamp that previous one was finished
return Range.closedOpen(min, max);
}
private void verifyStatusWasPrintedAndBusyEventOccured(List<ILoggingEvent> firstThreadEvents, List<ILoggingEvent> secondThreadEvents)
{
if (firstThreadEvents.size() > 1 && secondThreadEvents.size() > 1)
{
log.error("Both event lists contain more than one entry. First = {}, Second = {}", firstThreadEvents, secondThreadEvents);
fail("More that one status log was appended concurrently");
}
else if (firstThreadEvents.size() <= 1 && secondThreadEvents.size() <= 1)
{
log.error("No status log was recorded. First = {}, Second = {}", firstThreadEvents, secondThreadEvents);
fail("Status log was not appended");
}
else
{
log.info("Checking if logger was busy. First = {}, Second = {}", firstThreadEvents, secondThreadEvents);
assertTrue("One 'logger busy' entry was expected",
isLoggerBusyTheOnlyEvent(firstThreadEvents) || isLoggerBusyTheOnlyEvent(secondThreadEvents));
}
}
private boolean isLoggerBusyTheOnlyEvent(List<ILoggingEvent> events)
{
return events.size() == 1 &&
events.get(0).getMessage().equals("StatusLogger is busy") &&
events.get(0).getLevel() == Level.TRACE;
}
private static class InMemoryAppender extends AppenderBase<ILoggingEvent>
{
private final List<ILoggingEvent> events = newArrayList();
private InMemoryAppender()
{
start();
}
@Override
protected synchronized void append(ILoggingEvent event)
{
events.add(event);
}
}
}