blob: 50f0920a5b894e7015e4100d6d7a393fa7b6ba16 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.storage.kv;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.table.TableSpec;
import org.junit.Test;
import junit.framework.Assert;
public class TestRocksDbTableDescriptor {
@Test
public void testMinimal() {
new RocksDbTableDescriptor<Integer, String>("1")
.validate();
}
@Test
public void testSerde() {
TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1")
.withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
.getTableSpec();
Assert.assertNotNull(tableSpec.getSerde());
Assert.assertEquals(tableSpec.getSerde().getKeySerde().getClass(), IntegerSerde.class);
Assert.assertEquals(tableSpec.getSerde().getValueSerde().getClass(), StringSerde.class);
}
@Test
public void testTableSpec() {
TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1")
.withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
.withBlockSize(1)
.withCacheSize(2)
.withCompactionStyle("fifo")
.withCompressionType("snappy")
.withMaxLogFileSize(3)
.withNumLogFilesToKeep(4)
.withNumWriteBuffers(5)
.withObjectCacheSize(6)
.withTtl(7)
.withWriteBatchSize(8)
.withWriteBufferSize(9)
.withConfig("rocksdb.abc", "xyz")
.getTableSpec();
Assert.assertNotNull(tableSpec.getSerde());
Assert.assertNotNull(tableSpec.getSerde().getKeySerde());
Assert.assertNotNull(tableSpec.getSerde().getValueSerde());
Assert.assertEquals("1", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_BLOCK_SIZE_BYTES));
Assert.assertEquals("2", getConfig(tableSpec, RocksDbTableDescriptor.CONTAINER_CACHE_SIZE_BYTES));
Assert.assertEquals("3", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_MAX_LOG_FILE_SIZE_BYTES));
Assert.assertEquals("4", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_KEEP_LOG_FILE_NUM));
Assert.assertEquals("5", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_NUM_WRITE_BUFFERS));
Assert.assertEquals("6", getConfig(tableSpec, RocksDbTableDescriptor.OBJECT_CACHE_SIZE));
Assert.assertEquals("7", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_TTL_MS));
Assert.assertEquals("8", getConfig(tableSpec, RocksDbTableDescriptor.WRITE_BATCH_SIZE));
Assert.assertEquals("9", getConfig(tableSpec, RocksDbTableDescriptor.CONTAINER_WRITE_BUFFER_SIZE_BYTES));
Assert.assertEquals("snappy", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_COMPRESSION));
Assert.assertEquals("fifo", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_COMPACTION_STYLE));
Assert.assertEquals("xyz", getConfig(tableSpec, "abc"));
Assert.assertEquals("false", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG));
}
@Test
public void testTableSpecWithChangelogEnabled() {
TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1")
.withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
.withChangelogStream("changelog-$tream")
.withChangelogReplicationFactor(10)
.getTableSpec();
Assert.assertEquals("10", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_REPLICATION_FACTOR));
Assert.assertEquals("changelog-$tream", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_STREAM));
Assert.assertEquals("true", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG));
}
private String getConfig(TableSpec tableSpec, String key) {
return tableSpec.getConfig().get("rocksdb." + key);
}
}