blob: 3e39801352e707d471ec18f23bb655aba9935640 [file] [log] [blame]
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.lookup.cache;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.SettableFuture;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.core.Event;
import io.druid.java.util.emitter.core.LoggingEmitter;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.http.client.HttpClient;
import io.druid.java.util.http.client.Request;
import io.druid.java.util.http.client.response.HttpResponseHandler;
import io.druid.java.util.http.client.response.SequenceInputStreamResponseHandler;
import io.druid.audit.AuditInfo;
import io.druid.common.config.JacksonConfigManager;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.query.lookup.LookupsState;
import io.druid.server.http.HostAndPortWithScheme;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class LookupCoordinatorManagerTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final ObjectMapper mapper = new DefaultObjectMapper();
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createStrictMock(DruidNodeDiscoveryProvider.class);
private final LookupNodeDiscovery lookupNodeDiscovery = EasyMock.createStrictMock(
LookupNodeDiscovery.class);
private final HttpClient client = EasyMock.createStrictMock(HttpClient.class);
private final JacksonConfigManager configManager = EasyMock.createStrictMock(JacksonConfigManager.class);
private final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig = new LookupCoordinatorManagerConfig();
private static final String LOOKUP_TIER = "lookup_tier";
private static final String SINGLE_LOOKUP_NAME = "lookupName";
private static final LookupExtractorFactoryMapContainer SINGLE_LOOKUP_SPEC_V0 =
new LookupExtractorFactoryMapContainer(
"v0",
ImmutableMap.<String, Object>of("k0", "v0")
);
private static final LookupExtractorFactoryMapContainer SINGLE_LOOKUP_SPEC_V1 =
new LookupExtractorFactoryMapContainer(
"v1",
ImmutableMap.<String, Object>of("k1", "v1")
);
private static final Map<String, LookupExtractorFactoryMapContainer> SINGLE_LOOKUP_MAP_V0 = ImmutableMap.of(
SINGLE_LOOKUP_NAME,
SINGLE_LOOKUP_SPEC_V0
);
private static final Map<String, LookupExtractorFactoryMapContainer> SINGLE_LOOKUP_MAP_V1 = ImmutableMap.of(
SINGLE_LOOKUP_NAME,
SINGLE_LOOKUP_SPEC_V1
);
private static final Map<String, Map<String, LookupExtractorFactoryMapContainer>> TIERED_LOOKUP_MAP_V0 = ImmutableMap.of(
LOOKUP_TIER,
SINGLE_LOOKUP_MAP_V0
);
private static final Map<String, Map<String, LookupExtractorFactoryMapContainer>> TIERED_LOOKUP_MAP_V1 = ImmutableMap.of(
LOOKUP_TIER,
SINGLE_LOOKUP_MAP_V1
);
private static final Map<String, Map<String, LookupExtractorFactoryMapContainer>> EMPTY_TIERED_LOOKUP = ImmutableMap.of();
private static final LookupsState<LookupExtractorFactoryMapContainer> LOOKUPS_STATE = new LookupsState<>(
SINGLE_LOOKUP_MAP_V0,
SINGLE_LOOKUP_MAP_V1,
Collections.EMPTY_SET
);
private static final AtomicLong EVENT_EMITS = new AtomicLong(0L);
private static ServiceEmitter SERVICE_EMITTER;
@BeforeClass
public static void setUpStatic()
{
LoggingEmitter loggingEmitter = EasyMock.createNiceMock(LoggingEmitter.class);
EasyMock.replay(loggingEmitter);
SERVICE_EMITTER = new ServiceEmitter("", "", loggingEmitter)
{
@Override
public void emit(Event event)
{
EVENT_EMITS.incrementAndGet();
super.emit(event);
}
};
EmittingLogger.registerEmitter(SERVICE_EMITTER);
}
@Before
public void setUp() throws IOException
{
SERVICE_EMITTER.flush();
EVENT_EMITS.set(0L);
EasyMock.reset(lookupNodeDiscovery);
EasyMock.reset(configManager);
EasyMock.expect(
configManager.watch(
EasyMock.eq(LookupCoordinatorManager.LOOKUP_CONFIG_KEY),
EasyMock.<TypeReference>anyObject(),
EasyMock.<AtomicReference>isNull()
)
).andReturn(
new AtomicReference<>(null)
).anyTimes();
EasyMock.expect(
configManager.watch(
EasyMock.eq(LookupCoordinatorManager.OLD_LOOKUP_CONFIG_KEY),
EasyMock.<TypeReference>anyObject(),
EasyMock.<AtomicReference>isNull()
)
).andReturn(
new AtomicReference<>(null)
).anyTimes();
EasyMock.replay(configManager);
}
@After
public void tearDown() throws IOException
{
SERVICE_EMITTER.flush();
Assert.assertEquals(0, EVENT_EMITS.get());
}
@Test
public void testUpdateNodeWithSuccess() throws Exception
{
final HttpResponseHandler<InputStream, InputStream> responseHandler = EasyMock.createStrictMock(HttpResponseHandler.class);
final SettableFuture<InputStream> future = SettableFuture.create();
future.set(
new ByteArrayInputStream(
StringUtils.toUtf8(
mapper.writeValueAsString(
LOOKUPS_STATE
)
)
)
);
EasyMock.expect(client.go(
EasyMock.<Request>anyObject(),
EasyMock.<SequenceInputStreamResponseHandler>anyObject(),
EasyMock.<Duration>anyObject()
)).andReturn(future).once();
EasyMock.replay(client, responseHandler);
final LookupCoordinatorManager.LookupsCommunicator lookupsCommunicator =
new LookupCoordinatorManager.LookupsCommunicator(client, lookupCoordinatorManagerConfig, mapper)
{
@Override
HttpResponseHandler<InputStream, InputStream> makeResponseHandler(
final AtomicInteger returnCode,
final AtomicReference<String> reasonString
)
{
returnCode.set(Response.Status.ACCEPTED.getStatusCode());
reasonString.set("");
return responseHandler;
}
};
LookupsState<LookupExtractorFactoryMapContainer> resp = lookupsCommunicator.updateNode(
HostAndPortWithScheme.fromString("localhost"),
LOOKUPS_STATE
);
EasyMock.verify(client, responseHandler);
Assert.assertEquals(resp, LOOKUPS_STATE);
}
@Test
public void testUpdateNodeRespondedWithNotOkErrorCode() throws Exception
{
final HttpResponseHandler<InputStream, InputStream> responseHandler = EasyMock.createStrictMock(HttpResponseHandler.class);
final SettableFuture<InputStream> future = SettableFuture.create();
future.set(new ByteArrayInputStream(StringUtils.toUtf8("server failed")));
EasyMock.expect(client.go(
EasyMock.<Request>anyObject(),
EasyMock.<SequenceInputStreamResponseHandler>anyObject(),
EasyMock.<Duration>anyObject()
)).andReturn(future).once();
EasyMock.replay(client, responseHandler);
final LookupCoordinatorManager.LookupsCommunicator lookupsCommunicator =
new LookupCoordinatorManager.LookupsCommunicator(client, lookupCoordinatorManagerConfig, mapper)
{
@Override
HttpResponseHandler<InputStream, InputStream> makeResponseHandler(
final AtomicInteger returnCode,
final AtomicReference<String> reasonString
)
{
returnCode.set(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
reasonString.set("");
return responseHandler;
}
};
try {
lookupsCommunicator.updateNode(
HostAndPortWithScheme.fromString("localhost"),
LOOKUPS_STATE
);
Assert.fail();
}
catch (IOException ex) {
}
EasyMock.verify(client, responseHandler);
}
@Test
public void testUpdateNodeReturnsWeird() throws Exception
{
final HttpResponseHandler<InputStream, InputStream> responseHandler = EasyMock.createStrictMock(HttpResponseHandler.class);
final SettableFuture<InputStream> future = SettableFuture.create();
future.set(new ByteArrayInputStream(StringUtils.toUtf8("weird")));
EasyMock.expect(client.go(
EasyMock.<Request>anyObject(),
EasyMock.<SequenceInputStreamResponseHandler>anyObject(),
EasyMock.<Duration>anyObject()
)).andReturn(future).once();
EasyMock.replay(client, responseHandler);
final LookupCoordinatorManager.LookupsCommunicator lookupsCommunicator =
new LookupCoordinatorManager.LookupsCommunicator(client, lookupCoordinatorManagerConfig, mapper)
{
@Override
HttpResponseHandler<InputStream, InputStream> makeResponseHandler(
final AtomicInteger returnCode,
final AtomicReference<String> reasonString
)
{
returnCode.set(Response.Status.ACCEPTED.getStatusCode());
reasonString.set("");
return responseHandler;
}
};
try {
lookupsCommunicator.updateNode(
HostAndPortWithScheme.fromString("localhost"),
LOOKUPS_STATE
);
Assert.fail();
}
catch (IOException ex) {
}
EasyMock.verify(client, responseHandler);
}
@Test
public void testUpdateNodeInterrupted() throws Exception
{
final HttpResponseHandler<InputStream, InputStream> responseHandler = EasyMock.createStrictMock(HttpResponseHandler.class);
final SettableFuture<InputStream> future = SettableFuture.create();
EasyMock.expect(client.go(
EasyMock.<Request>anyObject(),
EasyMock.<SequenceInputStreamResponseHandler>anyObject(),
EasyMock.<Duration>anyObject()
)).andReturn(future).once();
EasyMock.replay(client, responseHandler);
final LookupCoordinatorManager.LookupsCommunicator lookupsCommunicator =
new LookupCoordinatorManager.LookupsCommunicator(client, lookupCoordinatorManagerConfig, mapper)
{
@Override
HttpResponseHandler<InputStream, InputStream> makeResponseHandler(
final AtomicInteger returnCode,
final AtomicReference<String> reasonString
)
{
returnCode.set(Response.Status.ACCEPTED.getStatusCode());
reasonString.set("");
return responseHandler;
}
};
Thread.currentThread().interrupt();
try {
lookupsCommunicator.updateNode(
HostAndPortWithScheme.fromString("localhost"),
LOOKUPS_STATE
);
Assert.fail();
}
catch (InterruptedException ex) {
}
finally {
//clear the interrupt
Thread.interrupted();
}
EasyMock.verify(client, responseHandler);
}
@Test
public void testGetLookupsStateNodeWithSuccess() throws Exception
{
final HttpResponseHandler<InputStream, InputStream> responseHandler = EasyMock.createStrictMock(HttpResponseHandler.class);
final SettableFuture<InputStream> future = SettableFuture.create();
future.set(
new ByteArrayInputStream(
StringUtils.toUtf8(
mapper.writeValueAsString(
LOOKUPS_STATE
)
)
)
);
EasyMock.expect(client.go(
EasyMock.<Request>anyObject(),
EasyMock.<SequenceInputStreamResponseHandler>anyObject(),
EasyMock.<Duration>anyObject()
)).andReturn(future).once();
EasyMock.replay(client, responseHandler);
final LookupCoordinatorManager.LookupsCommunicator lookupsCommunicator =
new LookupCoordinatorManager.LookupsCommunicator(client, lookupCoordinatorManagerConfig, mapper)
{
@Override
HttpResponseHandler<InputStream, InputStream> makeResponseHandler(
final AtomicInteger returnCode,
final AtomicReference<String> reasonString
)
{
returnCode.set(Response.Status.OK.getStatusCode());
reasonString.set("");
return responseHandler;
}
};
LookupsState<LookupExtractorFactoryMapContainer> resp = lookupsCommunicator.getLookupStateForNode(
HostAndPortWithScheme.fromString("localhost")
);
EasyMock.verify(client, responseHandler);
Assert.assertEquals(resp, LOOKUPS_STATE);
}
@Test
public void testGetLookupsStateNodeRespondedWithNotOkErrorCode() throws Exception
{
final HttpResponseHandler<InputStream, InputStream> responseHandler = EasyMock.createStrictMock(HttpResponseHandler.class);
final SettableFuture<InputStream> future = SettableFuture.create();
future.set(new ByteArrayInputStream(StringUtils.toUtf8("server failed")));
EasyMock.expect(client.go(
EasyMock.<Request>anyObject(),
EasyMock.<SequenceInputStreamResponseHandler>anyObject(),
EasyMock.<Duration>anyObject()
)).andReturn(future).once();
EasyMock.replay(client, responseHandler);
final LookupCoordinatorManager.LookupsCommunicator lookupsCommunicator =
new LookupCoordinatorManager.LookupsCommunicator(client, lookupCoordinatorManagerConfig, mapper)
{
@Override
HttpResponseHandler<InputStream, InputStream> makeResponseHandler(
final AtomicInteger returnCode,
final AtomicReference<String> reasonString
)
{
returnCode.set(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
reasonString.set("");
return responseHandler;
}
};
try {
lookupsCommunicator.getLookupStateForNode(
HostAndPortWithScheme.fromString("localhost")
);
Assert.fail();
}
catch (IOException ex) {
}
EasyMock.verify(client, responseHandler);
}
@Test
public void testGetLookupsStateNodeReturnsWeird() throws Exception
{
final HttpResponseHandler<InputStream, InputStream> responseHandler = EasyMock.createStrictMock(HttpResponseHandler.class);
final SettableFuture<InputStream> future = SettableFuture.create();
future.set(new ByteArrayInputStream(StringUtils.toUtf8("weird")));
EasyMock.expect(client.go(
EasyMock.<Request>anyObject(),
EasyMock.<SequenceInputStreamResponseHandler>anyObject(),
EasyMock.<Duration>anyObject()
)).andReturn(future).once();
EasyMock.replay(client, responseHandler);
final LookupCoordinatorManager.LookupsCommunicator lookupsCommunicator =
new LookupCoordinatorManager.LookupsCommunicator(client, lookupCoordinatorManagerConfig, mapper)
{
@Override
HttpResponseHandler<InputStream, InputStream> makeResponseHandler(
final AtomicInteger returnCode,
final AtomicReference<String> reasonString
)
{
returnCode.set(Response.Status.ACCEPTED.getStatusCode());
reasonString.set("");
return responseHandler;
}
};
try {
lookupsCommunicator.getLookupStateForNode(
HostAndPortWithScheme.fromString("localhost")
);
Assert.fail();
}
catch (IOException ex) {
}
EasyMock.verify(client, responseHandler);
}
@Test
public void testGetLookupsStateNodeInterrupted() throws Exception
{
final HttpResponseHandler<InputStream, InputStream> responseHandler = EasyMock.createStrictMock(HttpResponseHandler.class);
final SettableFuture<InputStream> future = SettableFuture.create();
EasyMock.expect(
client.go(
EasyMock.<Request>anyObject(),
EasyMock.<SequenceInputStreamResponseHandler>anyObject(),
EasyMock.<Duration>anyObject()
)).andReturn(future).once();
EasyMock.replay(client, responseHandler);
final LookupCoordinatorManager.LookupsCommunicator lookupsCommunicator =
new LookupCoordinatorManager.LookupsCommunicator(client, lookupCoordinatorManagerConfig, mapper)
{
@Override
HttpResponseHandler<InputStream, InputStream> makeResponseHandler(
final AtomicInteger returnCode,
final AtomicReference<String> reasonString
)
{
returnCode.set(Response.Status.ACCEPTED.getStatusCode());
reasonString.set("");
return responseHandler;
}
};
Thread.currentThread().interrupt();
try {
lookupsCommunicator.getLookupStateForNode(
HostAndPortWithScheme.fromString("localhost")
);
Assert.fail();
}
catch (InterruptedException ex) {
}
finally {
//clear the interrupt
Thread.interrupted();
}
EasyMock.verify(client, responseHandler);
}
@Test
public void testUpdateLookupsFailsUnitialized() throws Exception
{
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
)
{
@Override
public Map<String, Map<String, LookupExtractorFactoryMapContainer>> getKnownLookups()
{
return null;
}
};
manager.start();
final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost");
expectedException.expect(ISE.class);
manager.updateLookups(TIERED_LOOKUP_MAP_V0, auditInfo);
}
@Test
public void testUpdateLookupsInitialization() throws Exception
{
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
)
{
@Override
public Map<String, Map<String, LookupExtractorFactoryMapContainer>> getKnownLookups()
{
return null;
}
};
manager.start();
final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost");
EasyMock.reset(configManager);
EasyMock.expect(
configManager.set(
EasyMock.eq(LookupCoordinatorManager.LOOKUP_CONFIG_KEY),
EasyMock.eq(EMPTY_TIERED_LOOKUP),
EasyMock.eq(auditInfo)
)).andReturn(true).once();
EasyMock.replay(configManager);
manager.updateLookups(EMPTY_TIERED_LOOKUP, auditInfo);
EasyMock.verify(configManager);
}
@Test
public void testUpdateLookupAdds() throws Exception
{
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
)
{
@Override
public Map<String, Map<String, LookupExtractorFactoryMapContainer>> getKnownLookups()
{
return EMPTY_TIERED_LOOKUP;
}
};
manager.start();
final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost");
EasyMock.reset(configManager);
EasyMock.expect(configManager.set(
EasyMock.eq(LookupCoordinatorManager.LOOKUP_CONFIG_KEY),
EasyMock.eq(TIERED_LOOKUP_MAP_V0),
EasyMock.eq(auditInfo)
)).andReturn(true).once();
EasyMock.replay(configManager);
manager.updateLookup(LOOKUP_TIER, SINGLE_LOOKUP_NAME, SINGLE_LOOKUP_SPEC_V0, auditInfo);
EasyMock.verify(configManager);
}
@Test
public void testUpdateLookupsAddsNewLookup() throws Exception
{
final LookupExtractorFactoryMapContainer ignore = new LookupExtractorFactoryMapContainer(
"v0",
ImmutableMap.<String, Object>of("prop", "old")
);
final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost");
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
)
{
@Override
public Map<String, Map<String, LookupExtractorFactoryMapContainer>> getKnownLookups()
{
return ImmutableMap.<String, Map<String, LookupExtractorFactoryMapContainer>>of(
LOOKUP_TIER + "1",
ImmutableMap.of(
"foo1", new LookupExtractorFactoryMapContainer(
"v0", ImmutableMap.<String, Object>of("prop", "old")
)
),
LOOKUP_TIER + "2",
ImmutableMap.of("ignore", ignore)
);
}
};
manager.start();
final LookupExtractorFactoryMapContainer newSpec = new LookupExtractorFactoryMapContainer(
"v1",
ImmutableMap.<String, Object>of("prop", "new")
);
EasyMock.reset(configManager);
EasyMock.expect(
configManager.set(
EasyMock.eq(LookupCoordinatorManager.LOOKUP_CONFIG_KEY),
EasyMock.eq(ImmutableMap.<String, Map<String, LookupExtractorFactoryMapContainer>>of(
LOOKUP_TIER + "1", ImmutableMap.of(
"foo1", ignore,
"foo2", newSpec
),
LOOKUP_TIER + "2", ImmutableMap.of("ignore", ignore)
)),
EasyMock.eq(auditInfo)
)
).andReturn(true).once();
EasyMock.replay(configManager);
Assert.assertTrue(
manager.updateLookups(
ImmutableMap.<String, Map<String, LookupExtractorFactoryMapContainer>>of(
LOOKUP_TIER + "1", ImmutableMap.of(
"foo2",
newSpec
)
), auditInfo));
EasyMock.verify(configManager);
}
@Test
public void testUpdateLookupsOnlyUpdatesToTier() throws Exception
{
final LookupExtractorFactoryMapContainer ignore = new LookupExtractorFactoryMapContainer(
"v0",
ImmutableMap.<String, Object>of("prop", "old")
);
final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost");
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
)
{
@Override
public Map<String, Map<String, LookupExtractorFactoryMapContainer>> getKnownLookups()
{
return ImmutableMap.<String, Map<String, LookupExtractorFactoryMapContainer>>of(
LOOKUP_TIER + "1",
ImmutableMap.of(
"foo", new LookupExtractorFactoryMapContainer(
"v0",
ImmutableMap.<String, Object>of("prop", "new")
)
),
LOOKUP_TIER + "2",
ImmutableMap.of("ignore", ignore)
);
}
};
manager.start();
final LookupExtractorFactoryMapContainer newSpec = new LookupExtractorFactoryMapContainer(
"v1",
ImmutableMap.<String, Object>of("prop", "new")
);
EasyMock.reset(configManager);
EasyMock.expect(
configManager.set(
EasyMock.eq(LookupCoordinatorManager.LOOKUP_CONFIG_KEY),
EasyMock.eq(ImmutableMap.<String, Map<String, LookupExtractorFactoryMapContainer>>of(
LOOKUP_TIER + "1", ImmutableMap.of("foo", newSpec),
LOOKUP_TIER + "2", ImmutableMap.of("ignore", ignore)
)),
EasyMock.eq(auditInfo)
)
).andReturn(true).once();
EasyMock.replay(configManager);
Assert.assertTrue(
manager.updateLookups(
ImmutableMap.<String, Map<String, LookupExtractorFactoryMapContainer>>of(
LOOKUP_TIER + "1", ImmutableMap.of(
"foo",
newSpec
)
), auditInfo));
EasyMock.verify(configManager);
}
@Test
public void testUpdateLookupsUpdates() throws Exception
{
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
)
{
@Override
public Map<String, Map<String, LookupExtractorFactoryMapContainer>> getKnownLookups()
{
return TIERED_LOOKUP_MAP_V0;
}
};
manager.start();
final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost");
EasyMock.reset(configManager);
EasyMock.expect(configManager.set(
EasyMock.eq(LookupCoordinatorManager.LOOKUP_CONFIG_KEY),
EasyMock.eq(TIERED_LOOKUP_MAP_V1),
EasyMock.eq(auditInfo)
)).andReturn(true).once();
EasyMock.replay(configManager);
manager.updateLookups(TIERED_LOOKUP_MAP_V1, auditInfo);
EasyMock.verify(configManager);
}
@Test
public void testUpdateLookupFailsSameVersionUpdates() throws Exception
{
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
)
{
@Override
public Map<String, Map<String, LookupExtractorFactoryMapContainer>> getKnownLookups()
{
return TIERED_LOOKUP_MAP_V0;
}
};
manager.start();
final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost");
try {
manager.updateLookups(TIERED_LOOKUP_MAP_V0, auditInfo);
Assert.fail();
}
catch (IAE ex) {
}
}
@Test
public void testUpdateLookupsAddsNewTier() throws Exception
{
final LookupExtractorFactoryMapContainer ignore = new LookupExtractorFactoryMapContainer(
"v0",
ImmutableMap.<String, Object>of("prop", "old")
);
final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost");
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
)
{
@Override
public Map<String, Map<String, LookupExtractorFactoryMapContainer>> getKnownLookups()
{
return ImmutableMap.<String, Map<String, LookupExtractorFactoryMapContainer>>of(
LOOKUP_TIER + "2",
ImmutableMap.of("ignore", ignore)
);
}
};
manager.start();
final LookupExtractorFactoryMapContainer newSpec = new LookupExtractorFactoryMapContainer(
"v1",
ImmutableMap.<String, Object>of("prop", "new")
);
EasyMock.reset(configManager);
EasyMock.expect(
configManager.set(
EasyMock.eq(LookupCoordinatorManager.LOOKUP_CONFIG_KEY),
EasyMock.eq(ImmutableMap.<String, Map<String, LookupExtractorFactoryMapContainer>>of(
LOOKUP_TIER + "1", ImmutableMap.of("foo", newSpec),
LOOKUP_TIER + "2", ImmutableMap.of("ignore", ignore)
)),
EasyMock.eq(auditInfo)
)
).andReturn(true).once();
EasyMock.replay(configManager);
Assert.assertTrue(manager.updateLookups(ImmutableMap.<String, Map<String, LookupExtractorFactoryMapContainer>>of(
LOOKUP_TIER + "1", ImmutableMap.of(
"foo",
newSpec
)
), auditInfo));
EasyMock.verify(configManager);
}
@Test
public void testDeleteLookup() throws Exception
{
final LookupExtractorFactoryMapContainer ignore = new LookupExtractorFactoryMapContainer(
"v0",
ImmutableMap.<String, Object>of("lookup", "ignore")
);
final LookupExtractorFactoryMapContainer lookup = new LookupExtractorFactoryMapContainer(
"v0",
ImmutableMap.<String, Object>of("lookup", "foo")
);
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
)
{
@Override
public Map<String, Map<String, LookupExtractorFactoryMapContainer>> getKnownLookups()
{
return ImmutableMap.<String, Map<String, LookupExtractorFactoryMapContainer>>of(LOOKUP_TIER, ImmutableMap.of(
"foo", lookup,
"ignore", ignore
));
}
};
manager.start();
final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost");
EasyMock.reset(configManager);
EasyMock.expect(
configManager.set(
EasyMock.eq(LookupCoordinatorManager.LOOKUP_CONFIG_KEY),
EasyMock.eq(
ImmutableMap.of(
LOOKUP_TIER, ImmutableMap.of(
"ignore", ignore
)
)
),
EasyMock.eq(auditInfo)
)
).andReturn(true).once();
EasyMock.replay(configManager);
Assert.assertTrue(manager.deleteLookup(LOOKUP_TIER, "foo", auditInfo));
EasyMock.verify(configManager);
}
@Test
public void testDeleteLookupIgnoresMissing() throws Exception
{
final LookupExtractorFactoryMapContainer ignore = new LookupExtractorFactoryMapContainer(
"v0",
ImmutableMap.<String, Object>of("lookup", "ignore")
);
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
)
{
@Override
public Map<String, Map<String, LookupExtractorFactoryMapContainer>> getKnownLookups()
{
return ImmutableMap.<String, Map<String, LookupExtractorFactoryMapContainer>>of(
LOOKUP_TIER,
ImmutableMap.of("ignore", ignore)
);
}
};
manager.start();
final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost");
Assert.assertFalse(manager.deleteLookup(LOOKUP_TIER, "foo", auditInfo));
}
@Test
public void testDeleteLookupIgnoresNotReady() throws Exception
{
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
)
{
@Override
public Map<String, Map<String, LookupExtractorFactoryMapContainer>> getKnownLookups()
{
return null;
}
};
manager.start();
final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost");
Assert.assertFalse(manager.deleteLookup(LOOKUP_TIER, "foo", auditInfo));
}
@Test
public void testGetLookup() throws Exception
{
final LookupExtractorFactoryMapContainer lookup = new LookupExtractorFactoryMapContainer(
"v0",
ImmutableMap.<String, Object>of("lookup", "foo")
);
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
)
{
@Override
public Map<String, Map<String, LookupExtractorFactoryMapContainer>> getKnownLookups()
{
return ImmutableMap.<String, Map<String, LookupExtractorFactoryMapContainer>>of(LOOKUP_TIER, ImmutableMap.of(
"foo",
lookup
));
}
};
Assert.assertEquals(lookup, manager.getLookup(LOOKUP_TIER, "foo"));
Assert.assertNull(manager.getLookup(LOOKUP_TIER, "does not exit"));
Assert.assertNull(manager.getLookup("not a tier", "foo"));
}
@Test
public void testGetLookupIgnoresMalformed() throws Exception
{
final LookupExtractorFactoryMapContainer lookup = new LookupExtractorFactoryMapContainer(
"v0",
ImmutableMap.<String, Object>of("lookup", "foo")
);
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
)
{
@Override
public Map<String, Map<String, LookupExtractorFactoryMapContainer>> getKnownLookups()
{
return ImmutableMap.<String, Map<String, LookupExtractorFactoryMapContainer>>of(LOOKUP_TIER, ImmutableMap.of(
"foo", lookup,
"bar", new LookupExtractorFactoryMapContainer("v0", ImmutableMap.<String, Object>of())
));
}
};
Assert.assertEquals(lookup, manager.getLookup(LOOKUP_TIER, "foo"));
Assert.assertNull(manager.getLookup(LOOKUP_TIER, "does not exit"));
Assert.assertNull(manager.getLookup("not a tier", "foo"));
}
@Test
public void testGetLookupIgnoresNotReady() throws Exception
{
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
)
{
@Override
public Map<String, Map<String, LookupExtractorFactoryMapContainer>> getKnownLookups()
{
return null;
}
};
Assert.assertNull(manager.getLookup(LOOKUP_TIER, "foo"));
}
@Test(timeout = 2_000)
public void testLookupManagementLoop() throws Exception
{
Map<String, LookupExtractorFactoryMapContainer> lookup1 = ImmutableMap.of(
"lookup1", new LookupExtractorFactoryMapContainer(
"v1", ImmutableMap.of("k1", "v1")
)
);
Map<String, Map<String, LookupExtractorFactoryMapContainer>> configuredLookups =
ImmutableMap.of(
"tier1",
lookup1
);
EasyMock.reset(configManager);
EasyMock.expect(configManager.watch(
EasyMock.eq(LookupCoordinatorManager.LOOKUP_CONFIG_KEY),
EasyMock.<TypeReference>anyObject(),
EasyMock.<AtomicReference>isNull()
)).andReturn(
new AtomicReference<>(configuredLookups)).once();
EasyMock.replay(configManager);
HostAndPortWithScheme host1 = HostAndPortWithScheme.fromParts("http", "host1", 1234);
HostAndPortWithScheme host2 = HostAndPortWithScheme.fromParts("http", "host2", 3456);
EasyMock.reset(lookupNodeDiscovery);
EasyMock.expect(
lookupNodeDiscovery.getNodesInTier("tier1")
).andReturn(ImmutableList.of(host1, host2)).anyTimes();
EasyMock.replay(lookupNodeDiscovery);
LookupCoordinatorManager.LookupsCommunicator lookupsCommunicator = EasyMock.createMock(LookupCoordinatorManager.LookupsCommunicator.class);
EasyMock.expect(
lookupsCommunicator.getLookupStateForNode(
host1
)
).andReturn(
new LookupsState<>(
ImmutableMap.of("lookup0", new LookupExtractorFactoryMapContainer("v1", ImmutableMap.of("k0", "v0"))), null, null
)
).once();
LookupsState<LookupExtractorFactoryMapContainer> host1UpdatedState = new LookupsState<>(
lookup1, null, null
);
EasyMock.expect(
lookupsCommunicator.updateNode(
host1,
new LookupsState<>(
null,
lookup1,
ImmutableSet.of("lookup0")
)
)
).andReturn(
host1UpdatedState
).once();
EasyMock.expect(
lookupsCommunicator.getLookupStateForNode(
host2
)
).andReturn(
new LookupsState<>(
ImmutableMap.of("lookup3", new LookupExtractorFactoryMapContainer("v1", ImmutableMap.of("k0", "v0")),
"lookup1", new LookupExtractorFactoryMapContainer("v0", ImmutableMap.of("k0", "v0"))),
null, null
)
).once();
LookupsState<LookupExtractorFactoryMapContainer> host2UpdatedState = new LookupsState<>(
null, lookup1, null
);
EasyMock.expect(
lookupsCommunicator.updateNode(
host2,
new LookupsState<>(
null,
lookup1,
ImmutableSet.of("lookup3")
)
)
).andReturn(
host2UpdatedState
).once();
EasyMock.replay(lookupsCommunicator);
LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig = new LookupCoordinatorManagerConfig()
{
@Override
public long getInitialDelay()
{
return 1;
}
@Override
public int getThreadPoolSize()
{
return 2;
}
};
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
druidNodeDiscoveryProvider,
configManager,
lookupCoordinatorManagerConfig,
lookupsCommunicator,
lookupNodeDiscovery
);
Assert.assertTrue(manager.knownOldState.get().isEmpty());
manager.start();
Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> expectedKnownState = ImmutableMap.of(
host1.getHostAndPort(),
host1UpdatedState,
host2.getHostAndPort(),
host2UpdatedState
);
while (!expectedKnownState.equals(manager.knownOldState.get())) {
Thread.sleep(100);
}
EasyMock.verify(lookupNodeDiscovery, configManager, lookupsCommunicator);
}
@Test
public void testGetToBeLoadedOnNode()
{
LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
);
LookupsState<LookupExtractorFactoryMapContainer> currNodeState = new LookupsState<>(
ImmutableMap.of("lookup0", new LookupExtractorFactoryMapContainer("v1", ImmutableMap.of("k0", "v0")),
"lookup1", new LookupExtractorFactoryMapContainer("v1", ImmutableMap.of("k1", "v1"))
),
ImmutableMap.of("lookup2", new LookupExtractorFactoryMapContainer("v1", ImmutableMap.of("k2", "v2")),
"lookup3", new LookupExtractorFactoryMapContainer("v1", ImmutableMap.of("k3", "v3"))
),
ImmutableSet.of("lookup2", "lookup4")
);
Map<String, LookupExtractorFactoryMapContainer> stateToBe = ImmutableMap.of(
"lookup0", new LookupExtractorFactoryMapContainer("v1", ImmutableMap.of("k0", "v0")),
"lookup1", new LookupExtractorFactoryMapContainer("v2", ImmutableMap.of("k1", "v1")),
"lookup2", new LookupExtractorFactoryMapContainer("v1", ImmutableMap.of("k2", "v2"))
);
Assert.assertEquals(
ImmutableMap.of(
"lookup1", new LookupExtractorFactoryMapContainer("v2", ImmutableMap.of("k1", "v1")),
"lookup2", new LookupExtractorFactoryMapContainer("v1", ImmutableMap.of("k2", "v2"))
),
manager.getToBeLoadedOnNode(currNodeState, stateToBe)
);
}
@Test
public void testToBeDropped()
{
LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
);
LookupsState<LookupExtractorFactoryMapContainer> currNodeState = new LookupsState<>(
ImmutableMap.of("lookup0", new LookupExtractorFactoryMapContainer("v1", ImmutableMap.of("k0", "v0")),
"lookup1", new LookupExtractorFactoryMapContainer("v1", ImmutableMap.of("k1", "v1"))
),
ImmutableMap.of("lookup2", new LookupExtractorFactoryMapContainer("v1", ImmutableMap.of("k2", "v2")),
"lookup3", new LookupExtractorFactoryMapContainer("v1", ImmutableMap.of("k3", "v3"))
),
ImmutableSet.of("lookup2", "lookup4")
);
Map<String, LookupExtractorFactoryMapContainer> stateToBe = ImmutableMap.of(
"lookup0", new LookupExtractorFactoryMapContainer("v1", ImmutableMap.of("k0", "v0"))
);
Assert.assertEquals(
ImmutableSet.of("lookup1", "lookup3"),
manager.getToBeDroppedFromNode(currNodeState, stateToBe)
);
}
@Test
public void testStartStop() throws Exception
{
EasyMock.reset(configManager);
EasyMock.expect(configManager.watch(
EasyMock.eq(LookupCoordinatorManager.LOOKUP_CONFIG_KEY),
EasyMock.<TypeReference>anyObject(),
EasyMock.<AtomicReference>isNull()
)).andReturn(new AtomicReference<List<LookupExtractorFactoryMapContainer>>(null)).once();
EasyMock.expect(configManager.watch(
EasyMock.eq(LookupCoordinatorManager.OLD_LOOKUP_CONFIG_KEY),
EasyMock.<TypeReference>anyObject(),
EasyMock.<AtomicReference>isNull()
)).andReturn(new AtomicReference<List<Map<String, Object>>>(null)).once();
EasyMock.replay(configManager);
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
);
Assert.assertFalse(manager.isStarted());
manager.start();
Assert.assertTrue(manager.awaitStarted(1));
Assert.assertTrue(manager.backgroundManagerIsRunning());
Assert.assertFalse(manager.waitForBackgroundTermination(10));
manager.stop();
Assert.assertFalse(manager.awaitStarted(1));
Assert.assertTrue(manager.waitForBackgroundTermination(10));
Assert.assertFalse(manager.backgroundManagerIsRunning());
EasyMock.verify(configManager);
}
@Test
public void testMultipleStartStop() throws Exception
{
EasyMock.reset(configManager);
EasyMock.expect(configManager.watch(
EasyMock.eq(LookupCoordinatorManager.LOOKUP_CONFIG_KEY),
EasyMock.<TypeReference>anyObject(),
EasyMock.<AtomicReference>isNull()
)).andReturn(
new AtomicReference<>(Collections.EMPTY_MAP)).anyTimes();
EasyMock.replay(configManager);
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
lookupCoordinatorManagerConfig
);
Assert.assertFalse(manager.awaitStarted(1));
manager.start();
Assert.assertTrue(manager.awaitStarted(1));
Assert.assertTrue(manager.backgroundManagerIsRunning());
Assert.assertFalse(manager.waitForBackgroundTermination(10));
manager.stop();
Assert.assertFalse(manager.awaitStarted(1));
Assert.assertTrue(manager.waitForBackgroundTermination(10));
Assert.assertFalse(manager.backgroundManagerIsRunning());
manager.start();
Assert.assertTrue(manager.awaitStarted(1));
Assert.assertTrue(manager.backgroundManagerIsRunning());
Assert.assertFalse(manager.waitForBackgroundTermination(10));
manager.stop();
Assert.assertFalse(manager.awaitStarted(1));
Assert.assertTrue(manager.waitForBackgroundTermination(10));
Assert.assertFalse(manager.backgroundManagerIsRunning());
manager.start();
Assert.assertTrue(manager.awaitStarted(1));
Assert.assertTrue(manager.backgroundManagerIsRunning());
Assert.assertFalse(manager.waitForBackgroundTermination(10));
manager.stop();
Assert.assertFalse(manager.awaitStarted(1));
Assert.assertTrue(manager.waitForBackgroundTermination(10));
Assert.assertFalse(manager.backgroundManagerIsRunning());
EasyMock.verify(configManager);
}
@Test
public void testLookupDiscoverAll() throws Exception
{
final Set<String> fakeChildren = ImmutableSet.of("tier1", "tier2");
EasyMock.reset(lookupNodeDiscovery);
EasyMock.expect(lookupNodeDiscovery.getAllTiers())
.andReturn(fakeChildren)
.once();
EasyMock.replay(lookupNodeDiscovery);
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
druidNodeDiscoveryProvider,
configManager,
lookupCoordinatorManagerConfig,
EasyMock.createMock(LookupCoordinatorManager.LookupsCommunicator.class),
lookupNodeDiscovery
);
manager.start();
Assert.assertEquals(fakeChildren, manager.discoverTiers());
EasyMock.verify(lookupNodeDiscovery);
}
@Test
public void testDiscoverNodesInTier() throws Exception
{
EasyMock.reset(lookupNodeDiscovery);
EasyMock.expect(lookupNodeDiscovery.getNodesInTier("tier"))
.andReturn(
ImmutableSet.of(
HostAndPortWithScheme.fromParts("http", "h1", 8080),
HostAndPortWithScheme.fromParts("http", "h2", 8080)
)
)
.once();
EasyMock.replay(lookupNodeDiscovery);
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
druidNodeDiscoveryProvider,
configManager,
lookupCoordinatorManagerConfig,
EasyMock.createMock(LookupCoordinatorManager.LookupsCommunicator.class),
lookupNodeDiscovery
);
manager.start();
Assert.assertEquals(
ImmutableSet.of(
HostAndPort.fromParts("h1", 8080),
HostAndPort.fromParts("h2", 8080)
),
ImmutableSet.copyOf(manager.discoverNodesInTier("tier")));
EasyMock.verify(lookupNodeDiscovery);
}
//tests that lookups stored in db from 0.10.0 are converted and restored.
@Test
public void testBackwardCompatibilityMigration() throws Exception
{
EasyMock.reset(configManager);
EasyMock.expect(configManager.watch(
EasyMock.eq(LookupCoordinatorManager.LOOKUP_CONFIG_KEY),
EasyMock.<TypeReference>anyObject(),
EasyMock.<AtomicReference>isNull()
)).andReturn(
new AtomicReference<Map<String, Map<String, Map<String, Object>>>>(null)).once();
EasyMock.expect(
configManager.watch(
EasyMock.eq(LookupCoordinatorManager.OLD_LOOKUP_CONFIG_KEY),
EasyMock.<TypeReference>anyObject(),
EasyMock.<AtomicReference>isNull()
)
).andReturn(
new AtomicReference<Map<String, Map<String, Map<String, Object>>>>(
ImmutableMap.<String, Map<String, Map<String, Object>>>of(
"tier1",
ImmutableMap.<String, Map<String, Object>>of("lookup1", ImmutableMap.<String, Object>of("k", "v"))
)
)
).once();
EasyMock.expect(
configManager.set(
EasyMock.eq(LookupCoordinatorManager.LOOKUP_CONFIG_KEY),
EasyMock.eq(
ImmutableMap.<String, Map<String, LookupExtractorFactoryMapContainer>>of(
"tier1",
ImmutableMap.of(
"lookup1",
new LookupExtractorFactoryMapContainer(
null,
ImmutableMap.<String, Object>of("k", "v")
)
)
)
),
EasyMock.anyObject(AuditInfo.class)
)
).andReturn(true).once();
EasyMock.replay(configManager);
final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client,
druidNodeDiscoveryProvider,
mapper,
configManager,
new LookupCoordinatorManagerConfig()
{
@Override
public long getPeriod()
{
return 1;
}
}
);
manager.start();
EasyMock.verify(configManager);
}
}