| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.storage.kv |
| |
| import java.util.Arrays |
| |
| import org.apache.samza.storage.StoreProperties |
| import org.junit.Assert._ |
| import org.junit.{After, Before, Test} |
| import org.mockito.Mockito._ |
| |
| class TestKeyValueStorageEngine { |
| var engine: KeyValueStorageEngine[String, String] = null |
| var metrics: KeyValueStorageEngineMetrics = null |
| var now: Long = 0L |
| |
| @Before |
| def setup() { |
| val wrapperKv = new MockKeyValueStore() |
| val rawKv = mock(classOf[KeyValueStore[Array[Byte], Array[Byte]]]) |
| val properties = mock(classOf[StoreProperties]) |
| metrics = new KeyValueStorageEngineMetrics |
| engine = new KeyValueStorageEngine[String, String](properties, wrapperKv, rawKv, metrics, clock = () => { getNextTimestamp() }) |
| } |
| |
| @After |
| def teardown() { |
| engine.close() |
| } |
| |
| @Test |
| def testGetAndPut(): Unit = { |
| var prevGets = metrics.gets.getCount |
| var prevGetNsSnapshotSize = metrics.getNs.getSnapshot.getSize |
| var valueForK1 = engine.get("k1") |
| assertNull("k1 is not existing before put", valueForK1) |
| assertEquals("get counter increments by 1", 1, metrics.gets.getCount - prevGets) |
| assertEquals("get timer has 1 additional data point" , 1, metrics.getNs.getSnapshot.getSize - prevGetNsSnapshotSize) |
| |
| var prevPuts = metrics.puts.getCount |
| var prevPutNsSnapshotSize = metrics.putNs.getSnapshot.getSize |
| engine.put("k1", "v1") |
| assertEquals("put counter increments by 1", 1, metrics.puts.getCount - prevPuts) |
| assertEquals("put timer has 1 additional data point", 1, metrics.putNs.getSnapshot.getSize - prevPutNsSnapshotSize) |
| |
| assertEquals("k1 is existing after put and the value for k1 is v1", "v1", engine.get("k1")) |
| } |
| |
| @Test |
| def testDelete(): Unit = { |
| engine.put("k1", "v1") |
| engine.put("k2", "v2") |
| assertNotNull("k1 is existing before being deleted", engine.get("k1")) |
| var prevDeletes = metrics.deletes.getCount |
| var prevDeleteNsSnapshotSize = metrics.deleteNs.getSnapshot.getSize |
| engine.delete("k1") |
| assertNull("k1 is not existing since it has been deleted", engine.get("k1")) |
| assertEquals("k2 is still existing after deleting k1", "v2", engine.get("k2")) |
| assertEquals("delete counter increments by 1", 1, metrics.deletes.getCount - prevDeletes) |
| assertEquals("delete timer has 1 additional data point", 1, metrics.deleteNs.getSnapshot.getSize - prevDeleteNsSnapshotSize) |
| } |
| |
| @Test |
| def testFlush(): Unit = { |
| var prevFlushes = metrics.flushes.getCount |
| var prevFlushNsSnapshotSize = metrics.flushNs.getSnapshot.getSize |
| engine.flush() |
| assertEquals("flush counter increments by 1", 1, metrics.flushes.getCount - prevFlushes) |
| assertEquals("flush timer has 1 additional data point", 1, metrics.flushNs.getSnapshot.getSize - prevFlushNsSnapshotSize) |
| } |
| |
| |
| @Test |
| def testIterator() { |
| val keys = Arrays.asList("k1", |
| "k2", |
| "k3") |
| val values = Arrays.asList("v1", |
| "v2", |
| "v3") |
| |
| for (i <- 0 until 3) { |
| engine.put(keys.get(i), values.get(i)) |
| } |
| |
| // test all iterator |
| var prevAlls = metrics.alls.getCount |
| var prevAllNsSnapshotSize = metrics.allNs.getSnapshot.getSize |
| var iter = engine.all() |
| assertEquals("all counter increments by 1", 1, metrics.alls.getCount - prevAlls) |
| assertEquals("all timer has 1 additional data point", 1, metrics.allNs.getSnapshot.getSize - prevAllNsSnapshotSize) |
| for (i <- 0 until 3) { |
| assertTrue("iterator has next for 3 times", iter.hasNext) |
| val entry = iter.next() |
| assertEquals(entry.getKey, keys.get(i)) |
| assertEquals(entry.getValue, values.get(i)) |
| } |
| assertFalse("no next after iterating all 3 keys", iter.hasNext) |
| |
| // test range iterator |
| var prevRanges = metrics.ranges.getCount |
| var prevRangeNsSnapshotSize = metrics.rangeNs.getSnapshot.getSize |
| iter = engine.range(keys.get(0), keys.get(2)) |
| assertEquals("range counter increments by 1", 1, metrics.ranges.getCount - prevRanges) |
| assertEquals("range timer has 1 additional data point", 1, metrics.rangeNs.getSnapshot.getSize - prevRangeNsSnapshotSize) |
| for (i <- 0 until 2) { //only iterate the first 2 keys in the range |
| assertTrue("iterator has next for twice", iter.hasNext) |
| val entry = iter.next() |
| assertEquals(entry.getKey, keys.get(i)) |
| assertEquals(entry.getValue, values.get(i)) |
| } |
| assertFalse("no next after iterating 2 keys in the range", iter.hasNext) |
| } |
| |
| def getNextTimestamp(): Long = { |
| now += 1 |
| now |
| } |
| } |