blob: 49d8fb2d6d3170d75df505e131e3b4c833ec936f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.avro;
import com.google.common.annotations.VisibleForTesting;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.GlobFilter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.slider.common.SliderKeys;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.server.appmaster.state.NodeEntry;
import org.apache.slider.server.appmaster.state.NodeInstance;
import org.apache.slider.server.appmaster.state.RoleHistory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Locale;
import java.util.Map;
/**
* Write out the role history to an output stream.
*/
public class RoleHistoryWriter {
protected static final Logger log =
LoggerFactory.getLogger(RoleHistoryWriter.class);
/**
* Although Avro is designed to handle some changes, we still keep a version
* marker in the file to catch changes that are fundamentally incompatible
* at the semantic level -changes that require either a different
* parser or get rejected outright.
*/
public static final int ROLE_HISTORY_VERSION = 0x01;
/**
* Write out the history.
* This does not update the history's dirty/savetime fields
*
* @param out outstream
* @param history history
* @param savetime time in millis for the save time to go in as a record
* @return no of records written
* @throws IOException IO failures
*/
public long write(OutputStream out, RoleHistory history, long savetime)
throws IOException {
try {
DatumWriter<RoleHistoryRecord> writer =
new SpecificDatumWriter<>(RoleHistoryRecord.class);
RoleHistoryRecord record = createHeaderRecord(savetime, history);
int roles = history.getRoleSize();
Schema schema = record.getSchema();
Encoder encoder = EncoderFactory.get().jsonEncoder(schema, out);
writer.write(record, encoder);
// now write the rolemap record
writer.write(createRolemapRecord(history), encoder);
long count = 0;
//now for every role history entry, write out its record
Collection<NodeInstance> instances = history.cloneNodemap().values();
for (NodeInstance instance : instances) {
for (int role = 0; role < roles; role++) {
NodeEntry nodeEntry = instance.get(role);
if (nodeEntry != null) {
NodeEntryRecord ner = build(nodeEntry, role, instance.hostname);
record = new RoleHistoryRecord(ner);
writer.write(record, encoder);
count++;
}
}
}
// footer
RoleHistoryFooter footer = new RoleHistoryFooter();
footer.setCount(count);
writer.write(new RoleHistoryRecord(footer), encoder);
encoder.flush();
return count;
} finally {
out.close();
}
}
/**
* Create the header record
* @param savetime time of save
* @param history history
* @return a record to place at the head of the file
*/
private RoleHistoryRecord createHeaderRecord(long savetime, RoleHistory history) {
RoleHistoryHeader header = new RoleHistoryHeader();
header.setVersion(ROLE_HISTORY_VERSION);
header.setSaved(savetime);
header.setSavedx(Long.toHexString(savetime));
header.setSavedate(SliderUtils.toGMTString(savetime));
header.setRoles(history.getRoleSize());
return new RoleHistoryRecord(header);
}
/**
* Create the rolemap record
* @param history history
* @return a record to insert into the file
*/
private RoleHistoryRecord createRolemapRecord(RoleHistory history) {
RoleHistoryMapping entry = new RoleHistoryMapping();
Map<CharSequence, Integer> mapping = history.buildMappingForHistoryFile();
entry.setRolemap(mapping);
return new RoleHistoryRecord(entry);
}
/**
* Write the history information to a file
*
* @param fs filesystem
* @param path path
* @param overwrite overwrite flag
* @param history history
* @param savetime time in millis for the save time to go in as a record
* @return no of records written
* @throws IOException IO failures
*/
public long write(FileSystem fs,
Path path,
boolean overwrite,
RoleHistory history,
long savetime)
throws IOException {
FSDataOutputStream out = fs.create(path, overwrite);
return write(out, history, savetime);
}
/**
* Create the filename for a history file
* @param time time value
* @return a filename such that later filenames sort later in the directory
*/
public Path createHistoryFilename(Path historyPath, long time) {
String filename = String.format(Locale.ENGLISH,
SliderKeys.HISTORY_FILENAME_CREATION_PATTERN,
time);
Path path = new Path(historyPath, filename);
return path;
}
/**
* Build a {@link NodeEntryRecord} from a node entry; include whether
* the node is in use and when it was last used.
* @param entry entry count
* @param role role index
* @param hostname name
* @return the record
*/
private NodeEntryRecord build(NodeEntry entry, int role, String hostname) {
NodeEntryRecord record = new NodeEntryRecord(
hostname, role, entry.getLive() > 0, entry.getLastUsed()
);
return record;
}
/**
* Read a history, returning one that is ready to have its onThaw()
* method called
* @param in input source
* @return no. of entries read
* @throws IOException problems
*/
public LoadedRoleHistory read(InputStream in) throws
IOException,
BadConfigException {
try {
LoadedRoleHistory loadedRoleHistory = new LoadedRoleHistory();
DatumReader<RoleHistoryRecord> reader =
new SpecificDatumReader<>(RoleHistoryRecord.class);
Decoder decoder =
DecoderFactory.get().jsonDecoder(RoleHistoryRecord.getClassSchema(),
in);
//read header : no entry -> EOF
RoleHistoryRecord record = reader.read(null, decoder);
if (record == null) {
throw new IOException("Role History Header not found at start of file.");
}
Object entry = record.getEntry();
if (!(entry instanceof RoleHistoryHeader)) {
throw new IOException("Role History Header not found at start of file");
}
RoleHistoryHeader header = (RoleHistoryHeader) entry;
if (header.getVersion() != ROLE_HISTORY_VERSION) {
throw new IOException(
String.format("Can't read role file version %04x -need %04x",
header.getVersion(),
ROLE_HISTORY_VERSION));
}
loadedRoleHistory.setHeader(header);
RoleHistoryFooter footer = null;
int records = 0;
//go through reading data
try {
while (footer == null) {
record = reader.read(null, decoder);
if (record == null) {
throw new IOException("Null record after " + records + " records");
}
entry = record.getEntry();
if (entry instanceof RoleHistoryHeader) {
throw new IOException("Duplicate Role History Header found");
} else if (entry instanceof RoleHistoryMapping) {
// role history mapping entry
if (!loadedRoleHistory.roleMap.isEmpty()) {
// duplicate role maps are viewed as something to warn over, rather than fail
log.warn("Duplicate role map; ignoring");
} else {
RoleHistoryMapping historyMapping = (RoleHistoryMapping) entry;
loadedRoleHistory.buildMapping(historyMapping.getRolemap());
}
} else if (entry instanceof NodeEntryRecord) {
// normal record
records++;
NodeEntryRecord nodeEntryRecord = (NodeEntryRecord) entry;
loadedRoleHistory.add(nodeEntryRecord);
} else if (entry instanceof RoleHistoryFooter) {
//tail end of the file
footer = (RoleHistoryFooter) entry;
} else {
// this is to handle future versions, such as when rolling back
// from a later version of slider
log.warn("Discarding unknown record {}", entry);
}
}
} catch (EOFException e) {
EOFException ex = new EOFException(
"End of file reached after " + records + " records");
ex.initCause(e);
throw ex;
}
// at this point there should be no data left.
// check by reading and expecting a -1
if (in.read() > 0) {
// footer is in stream before the last record
throw new EOFException(
"File footer reached before end of file -after " + records +
" records");
}
if (records != footer.getCount()) {
log.warn("mismatch between no of records saved {} and number read {}",
footer.getCount(), records);
}
return loadedRoleHistory;
} finally {
in.close();
}
}
/**
* Read a role history from a path in a filesystem
* @param fs filesystem
* @param path path to the file
* @return the records read
* @throws IOException any problem
*/
public LoadedRoleHistory read(FileSystem fs, Path path)
throws IOException, BadConfigException {
FSDataInputStream instream = fs.open(path);
return read(instream);
}
/**
* Read from a resource in the classpath -used for testing
* @param resource resource
* @return the records read
* @throws IOException any problem
*/
public LoadedRoleHistory read(String resource)
throws IOException, BadConfigException {
return read(this.getClass().getClassLoader().getResourceAsStream(resource));
}
/**
* Find all history entries in a dir. The dir is created if it is
* not already defined.
*
* The scan uses the match pattern {@link SliderKeys#HISTORY_FILENAME_MATCH_PATTERN}
* while dropping empty files and directories which match the pattern.
* The list is then sorted with a comparator that sorts on filename,
* relying on the filename of newer created files being later than the old ones.
*
*
*
* @param fs filesystem
* @param dir dir to scan
* @param includeEmptyFiles should empty files be included in the result?
* @return a possibly empty list
* @throws IOException IO problems
* @throws FileNotFoundException if the target dir is actually a path
*/
public List<Path> findAllHistoryEntries(FileSystem fs,
Path dir,
boolean includeEmptyFiles) throws IOException {
assert fs != null;
assert dir != null;
if (!fs.exists(dir)) {
fs.mkdirs(dir);
} else if (!fs.isDirectory(dir)) {
throw new FileNotFoundException("Not a directory " + dir.toString());
}
PathFilter filter = new GlobFilter(SliderKeys.HISTORY_FILENAME_GLOB_PATTERN);
FileStatus[] stats = fs.listStatus(dir, filter);
List<Path> paths = new ArrayList<Path>(stats.length);
for (FileStatus stat : stats) {
log.debug("Possible entry: {}", stat.toString());
if (stat.isFile() && (includeEmptyFiles || stat.getLen() > 0)) {
paths.add(stat.getPath());
}
}
sortHistoryPaths(paths);
return paths;
}
@VisibleForTesting
public static void sortHistoryPaths(List<Path> paths) {
Collections.sort(paths, new NewerFilesFirst());
}
/**
* Iterate through the paths until one can be loaded
* @param paths paths to load
* @return the loaded history including the path -or null if all failed to load
*/
public LoadedRoleHistory attemptToReadHistory(FileSystem fileSystem,
List<Path> paths)
throws BadConfigException {
ListIterator<Path> pathIterator = paths.listIterator();
boolean success = false;
LoadedRoleHistory history = null;
while (!success && pathIterator.hasNext()) {
Path path = pathIterator.next();
try {
history = read(fileSystem, path);
//success
success = true;
history.setPath(path);
} catch (IOException e) {
log.info("Failed to read {}", path, e);
} catch (AvroTypeException e) {
log.warn("Failed to parse {}", path, e);
} catch (Exception e) {
// low level event logged @ warn level
log.warn("Exception while reading {}", path, e);
}
}
return history;
}
/**
* Try to load the history from a directory -a failure to load a specific
* file is downgraded to a log and the next older path attempted instead
* @param fs filesystem
* @param dir dir to load from
* @return the history loaded, including the path
* @throws IOException if indexing the history directory fails.
*/
public LoadedRoleHistory loadFromHistoryDir(FileSystem fs, Path dir)
throws IOException, BadConfigException {
assert fs != null: "null filesystem";
List<Path> entries = findAllHistoryEntries(fs, dir, false);
return attemptToReadHistory(fs, entries);
}
/**
* Delete all old history entries older than the one we want to keep. This
* uses the filename ordering to determine age, not timestamps
* @param fileSystem filesystem
* @param keep path to keep -used in thresholding the files
* @return the number of files deleted
* @throws FileNotFoundException if the path to keep is not present (safety
* check to stop the entire dir being purged)
* @throws IOException IO problems
*/
public int purgeOlderHistoryEntries(FileSystem fileSystem, Path keep)
throws IOException { assert fileSystem != null : "null filesystem";
if (!fileSystem.exists(keep)) {
throw new FileNotFoundException(keep.toString());
}
Path dir = keep.getParent();
log.debug("Purging entries in {} up to {}", dir, keep);
List<Path> paths = findAllHistoryEntries(fileSystem, dir, true);
Collections.sort(paths, new OlderFilesFirst());
int deleteCount = 0;
for (Path path : paths) {
if (path.equals(keep)) {
break;
} else {
log.debug("Deleting {}", path);
deleteCount++;
fileSystem.delete(path, false);
}
}
return deleteCount;
}
}