blob: 4242ba136ecf8ac7e4e5a7780a142f1493d34e21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.model.history
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.Path
import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest
import org.apache.slider.server.appmaster.model.mock.MockFactory
import org.apache.slider.server.appmaster.state.NodeEntry
import org.apache.slider.server.appmaster.state.NodeInstance
import org.apache.slider.server.appmaster.state.RoleHistory
import org.apache.slider.server.avro.RoleHistoryWriter
import org.junit.Test
@Slf4j
//@CompileStatic
class TestRoleHistoryRW extends BaseMockAppStateTest {
static long time = System.currentTimeMillis();
@Override
String getTestName() {
return "TestHistoryRW"
}
@Test
public void testWriteReadEmpty() throws Throwable {
RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES)
roleHistory.onStart(fs, historyPath)
Path history = roleHistory.saveHistory(time++)
assert fs.isFile(history)
RoleHistoryWriter historyWriter = new RoleHistoryWriter();
historyWriter.read(fs, history, roleHistory)
}
@Test
public void testWriteReadData() throws Throwable {
RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES)
assert !roleHistory.onStart(fs, historyPath)
String addr = "localhost"
NodeInstance instance = roleHistory.getOrCreateNodeInstance(addr)
NodeEntry ne1 = instance.getOrCreate(0)
ne1.lastUsed = 0xf00d
Path history = roleHistory.saveHistory(time++)
assert fs.isFile(history)
RoleHistoryWriter historyWriter = new RoleHistoryWriter();
RoleHistory rh2 = new RoleHistory(MockFactory.ROLES)
assert 0 < historyWriter.read(fs, history, rh2)
NodeInstance ni2 = rh2.getExistingNodeInstance(addr)
assert ni2 != null
NodeEntry ne2 = ni2.get(0)
assert ne2 !=null
assert ne2.lastUsed == ne1.lastUsed
}
@Test
public void testWriteReadActiveData() throws Throwable {
RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES)
roleHistory.onStart(fs, historyPath)
String addr = "localhost"
String addr2 = "rack1server5"
NodeInstance localhost = roleHistory.getOrCreateNodeInstance(addr)
NodeEntry orig1 = localhost.getOrCreate(0)
orig1.lastUsed = 0x10
NodeInstance rack1server5 = roleHistory.getOrCreateNodeInstance(addr2)
NodeEntry orig2 = rack1server5.getOrCreate(1)
orig2.live = 3
assert !orig2.available
NodeEntry orig3 = localhost.getOrCreate(1)
orig3.lastUsed = 0x20
orig3.live = 1
assert !orig3.available
orig3.release()
assert orig3.available
roleHistory.dump()
long savetime = 0x0001000
Path history = roleHistory.saveHistory(savetime)
assert fs.isFile(history)
describe("Loaded")
log.info("testWriteReadActiveData in $history")
RoleHistoryWriter historyWriter = new RoleHistoryWriter();
RoleHistory rh2 = new RoleHistory(MockFactory.ROLES)
assert 3 == historyWriter.read(fs, history, rh2)
rh2.dump()
assert rh2.clusterSize == 2;
NodeInstance ni2 = rh2.getExistingNodeInstance(addr)
assert ni2 != null
NodeEntry loadedNE = ni2.get(0)
assert loadedNE.lastUsed == orig1.lastUsed
NodeInstance ni2b = rh2.getExistingNodeInstance(addr2)
assert ni2b != null
NodeEntry loadedNE2 = ni2b.get(1)
assert loadedNE2 != null
assert loadedNE2.lastUsed == savetime
assert rh2.thawedDataTime == savetime
// now thaw it
rh2.buildAvailableNodeLists();
describe("thawing")
rh2.dump();
List<NodeInstance> available0 = rh2.cloneAvailableList(0)
assert available0.size() == 1
NodeInstance entry = available0.get(0)
assert entry.hostname == "localhost"
assert entry == localhost
List<NodeInstance> available1 = rh2.cloneAvailableList(1)
assert available1.size() == 2
//and verify that even if last used was set, the save time is picked up
assert entry.get(1).lastUsed == roleHistory.saveTime
}
@Test
public void testWriteThaw() throws Throwable {
RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES)
assert !roleHistory.onStart(fs, historyPath)
String addr = "localhost"
NodeInstance instance = roleHistory.getOrCreateNodeInstance(addr)
NodeEntry ne1 = instance.getOrCreate(0)
ne1.lastUsed = 0xf00d
Path history = roleHistory.saveHistory(time++)
long savetime =roleHistory.saveTime;
assert fs.isFile(history)
RoleHistory rh2 = new RoleHistory(MockFactory.ROLES)
assert rh2.onStart(fs, historyPath)
NodeInstance ni2 = rh2.getExistingNodeInstance(addr)
assert ni2 != null
NodeEntry ne2 = ni2.get(0)
assert ne2 != null
assert ne2.lastUsed == ne1.lastUsed
assert rh2.thawedDataTime == savetime
}
@Test
public void testPurgeOlderEntries() throws Throwable {
RoleHistoryWriter historyWriter = new RoleHistoryWriter();
time = 1;
Path file1 = touch(historyWriter, time++)
Path file2 = touch(historyWriter, time++)
Path file3 = touch(historyWriter, time++)
Path file4 = touch(historyWriter, time++)
Path file5 = touch(historyWriter, time++)
Path file6 = touch(historyWriter, time++)
assert historyWriter.purgeOlderHistoryEntries(fs, file1) == 0
assert historyWriter.purgeOlderHistoryEntries(fs, file2) == 1
assert historyWriter.purgeOlderHistoryEntries(fs, file2) == 0
assert historyWriter.purgeOlderHistoryEntries(fs, file5) == 3
assert historyWriter.purgeOlderHistoryEntries(fs, file6) == 1
try {
// make an impossible assertion that will fail if the method
// actually completes
assert -1 == historyWriter.purgeOlderHistoryEntries(fs, file1)
} catch (FileNotFoundException ignored) {
// expected
}
}
public Path touch(RoleHistoryWriter historyWriter, long time){
Path path = historyWriter.createHistoryFilename(historyPath, time);
FSDataOutputStream out = fs.create(path);
out.close()
return path
}
@Test
public void testSkipEmptyFileOnRead() throws Throwable {
describe "verify that empty histories are skipped on read; old histories purged"
RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES)
roleHistory.onStart(fs, historyPath)
time = 0
Path oldhistory = roleHistory.saveHistory(time++)
String addr = "localhost"
NodeInstance instance = roleHistory.getOrCreateNodeInstance(addr)
NodeEntry ne1 = instance.getOrCreate(0)
ne1.lastUsed = 0xf00d
Path goodhistory = roleHistory.saveHistory(time++)
RoleHistoryWriter historyWriter = new RoleHistoryWriter();
Path touched = touch(historyWriter, time++)
RoleHistory rh2 = new RoleHistory(MockFactory.ROLES)
assert rh2.onStart(fs, historyPath)
NodeInstance ni2 = rh2.getExistingNodeInstance(addr)
assert ni2 != null
//and assert the older file got purged
assert !fs.exists(oldhistory)
assert fs.exists(goodhistory)
assert fs.exists(touched )
}
@Test
public void testSkipBrokenFileOnRead() throws Throwable {
describe "verify that empty histories are skipped on read; old histories purged"
RoleHistory roleHistory = new RoleHistory(MockFactory.ROLES)
roleHistory.onStart(fs, historyPath)
time = 0
Path oldhistory = roleHistory.saveHistory(time++)
String addr = "localhost"
NodeInstance instance = roleHistory.getOrCreateNodeInstance(addr)
NodeEntry ne1 = instance.getOrCreate(0)
ne1.lastUsed = 0xf00d
Path goodhistory = roleHistory.saveHistory(time++)
RoleHistoryWriter historyWriter = new RoleHistoryWriter();
Path badfile = historyWriter.createHistoryFilename(historyPath, time++)
FSDataOutputStream out = fs.create(badfile)
out.writeBytes("{broken:true}")
out.close()
RoleHistory rh2 = new RoleHistory(MockFactory.ROLES)
describe("IGNORE STACK TRACE BELOW")
assert rh2.onStart(fs, historyPath)
describe( "IGNORE STACK TRACE ABOVE")
NodeInstance ni2 = rh2.getExistingNodeInstance(addr)
assert ni2 != null
//and assert the older file got purged
assert !fs.exists(oldhistory)
assert fs.exists(goodhistory)
assert fs.exists(badfile )
}
}