blob: db98a39334360b7fcb7ed3ab5db95653ba893710 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.redis.state;
import com.google.common.primitives.UnsignedBytes;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.storm.redis.common.commands.RedisCommands;
import org.apache.storm.redis.common.container.RedisCommandsInstanceContainer;
import org.apache.storm.state.DefaultStateSerializer;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import redis.clients.util.SafeEncoder;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
/**
* Unit tests for {@link RedisKeyValueState}
*
* NOTE: The type of key for mockMap is String, which should be byte[],
* since but byte[] doesn't implement equals() so taking workaround to make life happier.
* It shouldn't make issues on Redis side, since raw type of Redis is binary.
*/
public class RedisKeyValueStateTest {
RedisCommandsInstanceContainer mockContainer;
RedisCommands mockCommands;
RedisKeyValueState<String, String> keyValueState;
@Before
public void setUp() {
final NavigableMap<byte[], NavigableMap<byte[], byte[]>> mockMap =
new ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator());
mockContainer = Mockito.mock(RedisCommandsInstanceContainer.class);
mockCommands = Mockito.mock(RedisCommands.class);
Mockito.when(mockContainer.getInstance()).thenReturn(mockCommands);
ArgumentCaptor<String> stringArgumentCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> stringArgumentCaptor2 = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Map> mapArgumentCaptor = ArgumentCaptor.forClass(Map.class);
Mockito.when(mockCommands.exists(Mockito.any(byte[].class)))
.thenAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
return exists(mockMap, (byte[]) args[0]);
}
});
Mockito.when(mockCommands.del(Mockito.any(byte[].class)))
.thenAnswer(new Answer<Long>() {
@Override
public Long answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
return del(mockMap, (byte[]) args[0]);
}
});
Mockito.when(mockCommands.hmset(Mockito.any(byte[].class), Mockito.anyMap()))
.thenAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
return hmset(mockMap, (byte[]) args[0], (Map<byte[], byte[]>) args[1]);
}
});
Mockito.when(mockCommands.hget(Mockito.any(byte[].class), Mockito.any(byte[].class)))
.thenAnswer(new Answer<byte[]>() {
@Override
public byte[] answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
return hget(mockMap, (byte[]) args[0], (byte[]) args[1]);
}
});
Mockito.when(mockCommands.hdel(Mockito.any(byte[].class), Mockito.<byte[]>anyVararg()))
.thenAnswer(new Answer<Long>() {
@Override
public Long answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
int argsSize = args.length;
byte[][] fields = Arrays.asList(args).subList(1, argsSize).toArray(new byte[argsSize - 1][]);
return hdel(mockMap, (byte[]) args[0], fields);
}
});
Mockito.when(mockCommands.exists(Mockito.anyString()))
.thenAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
return exists(mockMap, (String) args[0]);
}
});
Mockito.when(mockCommands.hmset(Mockito.anyString(), Mockito.anyMap()))
.thenAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
return hmset(mockMap, (String) args[0], (Map<String, String>) args[1]);
}
});
Mockito.when(mockCommands.hgetAll(Mockito.anyString()))
.thenAnswer(new Answer<Map<String, String>>() {
@Override
public Map<String, String> answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
return hgetAll(mockMap, (String) args[0]);
}
});
keyValueState = new RedisKeyValueState<String, String>("test", mockContainer, new DefaultStateSerializer<String>(),
new DefaultStateSerializer<String>());
}
@Test
public void testPutAndGet() throws Exception {
keyValueState.put("a", "1");
keyValueState.put("b", "2");
assertEquals("1", keyValueState.get("a"));
assertEquals("2", keyValueState.get("b"));
assertEquals(null, keyValueState.get("c"));
}
@Test
public void testPutAndDelete() throws Exception {
keyValueState.put("a", "1");
keyValueState.put("b", "2");
assertEquals("1", keyValueState.get("a"));
assertEquals("2", keyValueState.get("b"));
assertEquals(null, keyValueState.get("c"));
assertEquals("1", keyValueState.delete("a"));
assertEquals(null, keyValueState.get("a"));
assertEquals("2", keyValueState.get("b"));
assertEquals(null, keyValueState.get("c"));
}
@Test
public void testPrepareCommitRollback() throws Exception {
keyValueState.put("a", "1");
keyValueState.put("b", "2");
keyValueState.prepareCommit(1);
keyValueState.put("c", "3");
assertArrayEquals(new String[]{ "1", "2", "3" }, getValues());
keyValueState.rollback();
assertArrayEquals(new String[]{ null, null, null }, getValues());
keyValueState.put("a", "1");
keyValueState.put("b", "2");
keyValueState.prepareCommit(1);
keyValueState.commit(1);
keyValueState.put("c", "3");
assertArrayEquals(new String[]{ "1", "2", "3" }, getValues());
keyValueState.rollback();
assertArrayEquals(new String[]{ "1", "2", null }, getValues());
keyValueState.put("c", "3");
assertEquals("2", keyValueState.delete("b"));
assertEquals("3", keyValueState.delete("c"));
assertArrayEquals(new String[]{ "1", null, null }, getValues());
keyValueState.prepareCommit(2);
assertArrayEquals(new String[]{ "1", null, null }, getValues());
keyValueState.commit(2);
assertArrayEquals(new String[]{ "1", null, null }, getValues());
keyValueState.put("b", "2");
keyValueState.prepareCommit(3);
keyValueState.put("c", "3");
assertArrayEquals(new String[]{ "1", "2", "3" }, getValues());
keyValueState.rollback();
assertArrayEquals(new String[]{ "1", null, null }, getValues());
}
private String[] getValues() {
return new String[]{
keyValueState.get("a"),
keyValueState.get("b"),
keyValueState.get("c")
};
}
private Boolean exists(NavigableMap<byte[], NavigableMap<byte[], byte[]>> mockMap, byte[] key) {
return mockMap.containsKey(key);
}
private String hmset(NavigableMap<byte[], NavigableMap<byte[], byte[]>> mockMap, byte[] key, Map<byte[], byte[]> value) {
NavigableMap<byte[], byte[]> currentValue = mockMap.get(key);
if (currentValue == null) {
currentValue = new TreeMap<>(UnsignedBytes.lexicographicalComparator());
}
for (Map.Entry<byte[], byte[]> entry : value.entrySet()) {
currentValue.put(entry.getKey(), entry.getValue());
}
mockMap.put(key, currentValue);
return "";
}
private Long del(NavigableMap<byte[], NavigableMap<byte[], byte[]>> mockMap, byte[] key) {
if (mockMap.remove(key) == null) {
return 0L;
} else {
return 1L;
}
}
private byte[] hget(NavigableMap<byte[], NavigableMap<byte[], byte[]>> mockMap, byte[] namespace, byte[] key) {
if (mockMap.containsKey(namespace)) {
return mockMap.get(namespace).get(key);
}
return null;
}
private Long hdel(NavigableMap<byte[], NavigableMap<byte[], byte[]>> mockMap, byte[] namespace, byte[]... keys) {
Long count = 0L;
for (byte[] key : keys) {
if (mockMap.get(namespace).remove(key) != null) count++;
}
return count;
}
private Boolean exists(NavigableMap<byte[], NavigableMap<byte[], byte[]>> mockMap, String key) {
return mockMap.containsKey(SafeEncoder.encode(key));
}
private String hmset(NavigableMap<byte[], NavigableMap<byte[], byte[]>> mockMap, String key, Map<String, String> value) {
NavigableMap<byte[], byte[]> currentValue = mockMap.get(SafeEncoder.encode(key));
if (currentValue == null) {
currentValue = new TreeMap<>(UnsignedBytes.lexicographicalComparator());
}
for (Map.Entry<String, String> entry : value.entrySet()) {
currentValue.put(SafeEncoder.encode(entry.getKey()), SafeEncoder.encode(entry.getValue()));
}
mockMap.put(SafeEncoder.encode(key), currentValue);
return "";
}
private Map<String, String> hgetAll(NavigableMap<byte[], NavigableMap<byte[], byte[]>> mockMap, String key) {
Map<byte[], byte[]> currentValue = mockMap.get(SafeEncoder.encode(key));
Map<String, String> converted = new HashMap<>(currentValue.size());
for (Map.Entry<byte[], byte[]> entry : currentValue.entrySet()) {
converted.put(SafeEncoder.encode(entry.getKey()), SafeEncoder.encode(entry.getValue()));
}
return converted;
}
}