| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.federation.store.driver; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.IOException; |
| import java.lang.reflect.Method; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Random; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; |
| import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; |
| import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState; |
| import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils; |
| import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; |
| import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; |
| import org.apache.hadoop.hdfs.server.federation.store.records.DisabledNameservice; |
| import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; |
| import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; |
| import org.apache.hadoop.hdfs.server.federation.store.records.Query; |
| import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; |
| import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; |
| import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Base tests for the driver. The particular implementations will use this to |
| * test their functionality. |
| */ |
| public class TestStateStoreDriverBase { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestStateStoreDriverBase.class); |
| |
| private static StateStoreService stateStore; |
| private static Configuration conf; |
| |
| private static final Random RANDOM = new Random(); |
| |
| |
| /** |
| * Get the State Store driver. |
| * @return State Store driver. |
| */ |
| protected StateStoreDriver getStateStoreDriver() { |
| return stateStore.getDriver(); |
| } |
| |
| @After |
| public void cleanMetrics() { |
| if (stateStore != null) { |
| StateStoreMetrics metrics = stateStore.getMetrics(); |
| metrics.reset(); |
| } |
| } |
| |
| @AfterClass |
| public static void tearDownCluster() { |
| if (stateStore != null) { |
| stateStore.stop(); |
| } |
| } |
| |
| /** |
| * Get a new State Store using this configuration. |
| * |
| * @param config Configuration for the State Store. |
| * @throws Exception If we cannot get the State Store. |
| */ |
| public static void getStateStore(Configuration config) throws Exception { |
| conf = config; |
| stateStore = FederationStateStoreTestUtils.newStateStore(conf); |
| } |
| |
| private String generateRandomString() { |
| String randomString = "randomString-" + RANDOM.nextInt(); |
| return randomString; |
| } |
| |
| private long generateRandomLong() { |
| return RANDOM.nextLong(); |
| } |
| |
| @SuppressWarnings("rawtypes") |
| private <T extends Enum> T generateRandomEnum(Class<T> enumClass) { |
| int x = RANDOM.nextInt(enumClass.getEnumConstants().length); |
| T data = enumClass.getEnumConstants()[x]; |
| return data; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private <T extends BaseRecord> T generateFakeRecord(Class<T> recordClass) |
| throws IllegalArgumentException, IllegalAccessException, IOException { |
| |
| if (recordClass == MembershipState.class) { |
| return (T) MembershipState.newInstance(generateRandomString(), |
| generateRandomString(), generateRandomString(), |
| generateRandomString(), generateRandomString(), |
| generateRandomString(), generateRandomString(), |
| generateRandomString(), generateRandomString(), |
| generateRandomEnum(FederationNamenodeServiceState.class), false); |
| } else if (recordClass == MountTable.class) { |
| String src = "/" + generateRandomString(); |
| Map<String, String> destMap = Collections.singletonMap( |
| generateRandomString(), "/" + generateRandomString()); |
| return (T) MountTable.newInstance(src, destMap); |
| } else if (recordClass == RouterState.class) { |
| RouterState routerState = RouterState.newInstance(generateRandomString(), |
| generateRandomLong(), generateRandomEnum(RouterServiceState.class)); |
| StateStoreVersion version = generateFakeRecord(StateStoreVersion.class); |
| routerState.setStateStoreVersion(version); |
| return (T) routerState; |
| } else if (recordClass == DisabledNameservice.class) { |
| return (T) DisabledNameservice.newInstance(generateRandomString()); |
| } else if (recordClass == StateStoreVersion.class) { |
| return (T) StateStoreVersion.newInstance( |
| generateRandomLong(), generateRandomLong()); |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Validate if a record is the same. |
| * |
| * @param original Original record. |
| * @param committed Committed record. |
| * @param assertEquals Assert if the records are equal or just return. |
| * @return If the record is successfully validated. |
| */ |
| private boolean validateRecord( |
| BaseRecord original, BaseRecord committed, boolean assertEquals) { |
| |
| boolean ret = true; |
| |
| Map<String, Class<?>> fields = getFields(original); |
| for (String key : fields.keySet()) { |
| if (key.equals("dateModified") || |
| key.equals("dateCreated") || |
| key.equals("proto")) { |
| // Fields are updated/set on commit and fetch and may not match |
| // the fields that are initialized in a non-committed object. |
| continue; |
| } |
| Object data1 = getField(original, key); |
| Object data2 = getField(committed, key); |
| if (assertEquals) { |
| assertEquals("Field " + key + " does not match", data1, data2); |
| } else if (!data1.equals(data2)) { |
| ret = false; |
| } |
| } |
| |
| long now = stateStore.getDriver().getTime(); |
| assertTrue( |
| committed.getDateCreated() <= now && committed.getDateCreated() > 0); |
| assertTrue(committed.getDateModified() >= committed.getDateCreated()); |
| |
| return ret; |
| } |
| |
| public static void removeAll(StateStoreDriver driver) throws IOException { |
| driver.removeAll(MembershipState.class); |
| driver.removeAll(MountTable.class); |
| driver.removeAll(RouterState.class); |
| driver.removeAll(DisabledNameservice.class); |
| } |
| |
| public <T extends BaseRecord> void testInsert( |
| StateStoreDriver driver, Class<T> recordClass) |
| throws IllegalArgumentException, IllegalAccessException, IOException { |
| |
| assertTrue(driver.removeAll(recordClass)); |
| QueryResult<T> queryResult0 = driver.get(recordClass); |
| List<T> records0 = queryResult0.getRecords(); |
| assertTrue(records0.isEmpty()); |
| |
| // Insert single |
| BaseRecord record = generateFakeRecord(recordClass); |
| driver.put(record, true, false); |
| |
| // Verify |
| QueryResult<T> queryResult1 = driver.get(recordClass); |
| List<T> records1 = queryResult1.getRecords(); |
| assertEquals(1, records1.size()); |
| T record0 = records1.get(0); |
| validateRecord(record, record0, true); |
| |
| // Insert multiple |
| List<T> insertList = new ArrayList<>(); |
| for (int i = 0; i < 10; i++) { |
| T newRecord = generateFakeRecord(recordClass); |
| insertList.add(newRecord); |
| } |
| driver.putAll(insertList, true, false); |
| |
| // Verify |
| QueryResult<T> queryResult2 = driver.get(recordClass); |
| List<T> records2 = queryResult2.getRecords(); |
| assertEquals(11, records2.size()); |
| } |
| |
| public <T extends BaseRecord> void testFetchErrors(StateStoreDriver driver, |
| Class<T> clazz) throws IllegalAccessException, IOException { |
| |
| // Fetch empty list |
| driver.removeAll(clazz); |
| QueryResult<T> result0 = driver.get(clazz); |
| assertNotNull(result0); |
| List<T> records0 = result0.getRecords(); |
| assertEquals(records0.size(), 0); |
| |
| // Insert single |
| BaseRecord record = generateFakeRecord(clazz); |
| assertTrue(driver.put(record, true, false)); |
| |
| // Verify |
| QueryResult<T> result1 = driver.get(clazz); |
| List<T> records1 = result1.getRecords(); |
| assertEquals(1, records1.size()); |
| validateRecord(record, records1.get(0), true); |
| |
| // Test fetch single object with a bad query |
| final T fakeRecord = generateFakeRecord(clazz); |
| final Query<T> query = new Query<T>(fakeRecord); |
| T getRecord = driver.get(clazz, query); |
| assertNull(getRecord); |
| |
| // Test fetch multiple objects does not exist returns empty list |
| assertEquals(driver.getMultiple(clazz, query).size(), 0); |
| } |
| |
| public <T extends BaseRecord> void testPut( |
| StateStoreDriver driver, Class<T> clazz) |
| throws IllegalArgumentException, ReflectiveOperationException, |
| IOException, SecurityException { |
| |
| driver.removeAll(clazz); |
| QueryResult<T> records = driver.get(clazz); |
| assertTrue(records.getRecords().isEmpty()); |
| |
| // Insert multiple |
| List<T> insertList = new ArrayList<>(); |
| for (int i = 0; i < 10; i++) { |
| T newRecord = generateFakeRecord(clazz); |
| insertList.add(newRecord); |
| } |
| |
| // Verify |
| assertTrue(driver.putAll(insertList, false, true)); |
| records = driver.get(clazz); |
| assertEquals(records.getRecords().size(), 10); |
| |
| // Generate a new record with the same PK fields as an existing record |
| BaseRecord updatedRecord = generateFakeRecord(clazz); |
| BaseRecord existingRecord = records.getRecords().get(0); |
| Map<String, String> primaryKeys = existingRecord.getPrimaryKeys(); |
| for (Entry<String, String> entry : primaryKeys.entrySet()) { |
| String key = entry.getKey(); |
| String value = entry.getValue(); |
| Class<?> fieldType = getFieldType(existingRecord, key); |
| Object field = fromString(value, fieldType); |
| assertTrue(setField(updatedRecord, key, field)); |
| } |
| |
| // Attempt an update of an existing entry, but it is not allowed. |
| assertFalse(driver.put(updatedRecord, false, true)); |
| |
| // Verify no update occurred, all original records are unchanged |
| QueryResult<T> newRecords = driver.get(clazz); |
| assertEquals(10, newRecords.getRecords().size()); |
| assertEquals("A single entry was improperly updated in the store", 10, |
| countMatchingEntries(records.getRecords(), newRecords.getRecords())); |
| |
| // Update the entry (allowing updates) |
| assertTrue(driver.put(updatedRecord, true, false)); |
| |
| // Verify that one entry no longer matches the original set |
| newRecords = driver.get(clazz); |
| assertEquals(10, newRecords.getRecords().size()); |
| T record = records.getRecords().get(0); |
| if (record.hasOtherFields()) { |
| assertEquals( |
| "Record of type " + clazz + " not updated in the store", 9, |
| countMatchingEntries(records.getRecords(), newRecords.getRecords())); |
| } |
| } |
| |
| private int countMatchingEntries( |
| Collection<? extends BaseRecord> committedList, |
| Collection<? extends BaseRecord> matchList) { |
| |
| int matchingCount = 0; |
| for (BaseRecord committed : committedList) { |
| for (BaseRecord match : matchList) { |
| try { |
| if (match.getPrimaryKey().equals(committed.getPrimaryKey())) { |
| if (validateRecord(match, committed, false)) { |
| matchingCount++; |
| } |
| break; |
| } |
| } catch (Exception ex) { |
| } |
| } |
| } |
| return matchingCount; |
| } |
| |
| public <T extends BaseRecord> void testRemove( |
| StateStoreDriver driver, Class<T> clazz) |
| throws IllegalArgumentException, IllegalAccessException, IOException { |
| |
| // Remove all |
| assertTrue(driver.removeAll(clazz)); |
| QueryResult<T> records = driver.get(clazz); |
| assertTrue(records.getRecords().isEmpty()); |
| |
| // Insert multiple |
| List<T> insertList = new ArrayList<>(); |
| for (int i = 0; i < 10; i++) { |
| T newRecord = generateFakeRecord(clazz); |
| insertList.add(newRecord); |
| } |
| |
| // Verify |
| assertTrue(driver.putAll(insertList, false, true)); |
| records = driver.get(clazz); |
| assertEquals(records.getRecords().size(), 10); |
| |
| // Remove Single |
| assertTrue(driver.remove(records.getRecords().get(0))); |
| |
| // Verify |
| records = driver.get(clazz); |
| assertEquals(records.getRecords().size(), 9); |
| |
| // Remove with filter |
| final T firstRecord = records.getRecords().get(0); |
| final Query<T> query0 = new Query<T>(firstRecord); |
| assertTrue(driver.remove(clazz, query0) > 0); |
| |
| final T secondRecord = records.getRecords().get(1); |
| final Query<T> query1 = new Query<T>(secondRecord); |
| assertTrue(driver.remove(clazz, query1) > 0); |
| |
| // Verify |
| records = driver.get(clazz); |
| assertEquals(records.getRecords().size(), 7); |
| |
| // Remove all |
| assertTrue(driver.removeAll(clazz)); |
| |
| // Verify |
| records = driver.get(clazz); |
| assertTrue(records.getRecords().isEmpty()); |
| } |
| |
| public void testInsert(StateStoreDriver driver) |
| throws IllegalArgumentException, IllegalAccessException, IOException { |
| testInsert(driver, MembershipState.class); |
| testInsert(driver, MountTable.class); |
| testInsert(driver, RouterState.class); |
| testInsert(driver, DisabledNameservice.class); |
| } |
| |
| public void testPut(StateStoreDriver driver) |
| throws IllegalArgumentException, ReflectiveOperationException, |
| IOException, SecurityException { |
| testPut(driver, MembershipState.class); |
| testPut(driver, MountTable.class); |
| testPut(driver, RouterState.class); |
| testPut(driver, DisabledNameservice.class); |
| } |
| |
| public void testRemove(StateStoreDriver driver) |
| throws IllegalArgumentException, IllegalAccessException, IOException { |
| testRemove(driver, MembershipState.class); |
| testRemove(driver, MountTable.class); |
| testRemove(driver, RouterState.class); |
| testRemove(driver, DisabledNameservice.class); |
| } |
| |
| public void testFetchErrors(StateStoreDriver driver) |
| throws IllegalArgumentException, IllegalAccessException, IOException { |
| testFetchErrors(driver, MembershipState.class); |
| testFetchErrors(driver, MountTable.class); |
| testFetchErrors(driver, RouterState.class); |
| testFetchErrors(driver, DisabledNameservice.class); |
| } |
| |
| public void testMetrics(StateStoreDriver driver) |
| throws IOException, IllegalArgumentException, IllegalAccessException { |
| |
| MountTable insertRecord = |
| this.generateFakeRecord(MountTable.class); |
| |
| // Put single |
| StateStoreMetrics metrics = stateStore.getMetrics(); |
| assertEquals(0, metrics.getWriteOps()); |
| driver.put(insertRecord, true, false); |
| assertEquals(1, metrics.getWriteOps()); |
| |
| // Put multiple |
| metrics.reset(); |
| assertEquals(0, metrics.getWriteOps()); |
| driver.put(insertRecord, true, false); |
| assertEquals(1, metrics.getWriteOps()); |
| |
| // Get Single |
| metrics.reset(); |
| assertEquals(0, metrics.getReadOps()); |
| |
| final String querySourcePath = insertRecord.getSourcePath(); |
| MountTable partial = MountTable.newInstance(); |
| partial.setSourcePath(querySourcePath); |
| final Query<MountTable> query = new Query<>(partial); |
| driver.get(MountTable.class, query); |
| assertEquals(1, metrics.getReadOps()); |
| |
| // GetAll |
| metrics.reset(); |
| assertEquals(0, metrics.getReadOps()); |
| driver.get(MountTable.class); |
| assertEquals(1, metrics.getReadOps()); |
| |
| // GetMultiple |
| metrics.reset(); |
| assertEquals(0, metrics.getReadOps()); |
| driver.getMultiple(MountTable.class, query); |
| assertEquals(1, metrics.getReadOps()); |
| |
| // Insert fails |
| metrics.reset(); |
| assertEquals(0, metrics.getFailureOps()); |
| driver.put(insertRecord, false, true); |
| assertEquals(1, metrics.getFailureOps()); |
| |
| // Remove single |
| metrics.reset(); |
| assertEquals(0, metrics.getRemoveOps()); |
| driver.remove(insertRecord); |
| assertEquals(1, metrics.getRemoveOps()); |
| |
| // Remove multiple |
| metrics.reset(); |
| driver.put(insertRecord, true, false); |
| assertEquals(0, metrics.getRemoveOps()); |
| driver.remove(MountTable.class, query); |
| assertEquals(1, metrics.getRemoveOps()); |
| |
| // Remove all |
| metrics.reset(); |
| driver.put(insertRecord, true, false); |
| assertEquals(0, metrics.getRemoveOps()); |
| driver.removeAll(MountTable.class); |
| assertEquals(1, metrics.getRemoveOps()); |
| } |
| |
| /** |
| * Sets the value of a field on the object. |
| * |
| * @param fieldName The string name of the field. |
| * @param data The data to pass to the field's setter. |
| * |
| * @return True if successful, fails if failed. |
| */ |
| private static boolean setField( |
| BaseRecord record, String fieldName, Object data) { |
| |
| Method m = locateSetter(record, fieldName); |
| if (m != null) { |
| try { |
| m.invoke(record, data); |
| } catch (Exception e) { |
| LOG.error("Cannot set field " + fieldName + " on object " |
| + record.getClass().getName() + " to data " + data + " of type " |
| + data.getClass(), e); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Finds the appropriate setter for a field name. |
| * |
| * @param fieldName The legacy name of the field. |
| * @return The matching setter or null if not found. |
| */ |
| private static Method locateSetter(BaseRecord record, String fieldName) { |
| for (Method m : record.getClass().getMethods()) { |
| if (m.getName().equalsIgnoreCase("set" + fieldName)) { |
| return m; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Returns all serializable fields in the object. |
| * |
| * @return Map with the fields. |
| */ |
| private static Map<String, Class<?>> getFields(BaseRecord record) { |
| Map<String, Class<?>> getters = new HashMap<>(); |
| for (Method m : record.getClass().getDeclaredMethods()) { |
| if (m.getName().startsWith("get")) { |
| try { |
| Class<?> type = m.getReturnType(); |
| char[] c = m.getName().substring(3).toCharArray(); |
| c[0] = Character.toLowerCase(c[0]); |
| String key = new String(c); |
| getters.put(key, type); |
| } catch (Exception e) { |
| LOG.error("Cannot execute getter " + m.getName() |
| + " on object " + record); |
| } |
| } |
| } |
| return getters; |
| } |
| |
| /** |
| * Get the type of a field. |
| * |
| * @param fieldName |
| * @return Field type |
| */ |
| private static Class<?> getFieldType(BaseRecord record, String fieldName) { |
| Method m = locateGetter(record, fieldName); |
| return m.getReturnType(); |
| } |
| |
| /** |
| * Fetches the value for a field name. |
| * |
| * @param fieldName the legacy name of the field. |
| * @return The field data or null if not found. |
| */ |
| private static Object getField(BaseRecord record, String fieldName) { |
| Object result = null; |
| Method m = locateGetter(record, fieldName); |
| if (m != null) { |
| try { |
| result = m.invoke(record); |
| } catch (Exception e) { |
| LOG.error("Cannot get field " + fieldName + " on object " + record); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Finds the appropriate getter for a field name. |
| * |
| * @param fieldName The legacy name of the field. |
| * @return The matching getter or null if not found. |
| */ |
| private static Method locateGetter(BaseRecord record, String fieldName) { |
| for (Method m : record.getClass().getMethods()) { |
| if (m.getName().equalsIgnoreCase("get" + fieldName)) { |
| return m; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Expands a data object from the store into an record object. Default store |
| * data type is a String. Override if additional serialization is required. |
| * |
| * @param data Object containing the serialized data. Only string is |
| * supported. |
| * @param clazz Target object class to hold the deserialized data. |
| * @return An instance of the target data object initialized with the |
| * deserialized data. |
| */ |
| @Deprecated |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| private static <T> T fromString(String data, Class<T> clazz) { |
| |
| if (data.equals("null")) { |
| return null; |
| } else if (clazz == String.class) { |
| return (T) data; |
| } else if (clazz == Long.class || clazz == long.class) { |
| return (T) Long.valueOf(data); |
| } else if (clazz == Integer.class || clazz == int.class) { |
| return (T) Integer.valueOf(data); |
| } else if (clazz == Double.class || clazz == double.class) { |
| return (T) Double.valueOf(data); |
| } else if (clazz == Float.class || clazz == float.class) { |
| return (T) Float.valueOf(data); |
| } else if (clazz == Boolean.class || clazz == boolean.class) { |
| return (T) Boolean.valueOf(data); |
| } else if (clazz.isEnum()) { |
| return (T) Enum.valueOf((Class<Enum>) clazz, data); |
| } |
| return null; |
| } |
| } |