blob: 2802f0938bcb804053a4cc17b8dc9efd6275f52c [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crail.namenode;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.crail.conf.CrailConstants;
import org.apache.crail.rpc.RpcErrors;
import org.apache.crail.rpc.RpcNameNodeService;
import org.apache.crail.rpc.RpcProtocol;
import org.apache.crail.utils.CrailUtils;
import org.slf4j.Logger;
public class LogService {
public static final Logger LOG = CrailUtils.getLogger();
private ConcurrentHashMap<Long, Long> tokens;
private FileOutputStream outStream;
private FileChannel outChannel;
private ByteBuffer header;
private ByteBuffer payload;
public LogService() throws IOException {
File file = new File(CrailConstants.NAMENODE_LOG);
if (!file.exists()){
file.createNewFile();
}
outStream = new FileOutputStream(CrailConstants.NAMENODE_LOG, true);
outChannel = outStream.getChannel();
header = ByteBuffer.allocate(4);
payload = ByteBuffer.allocate(512);
tokens = new ConcurrentHashMap<Long, Long>();
}
public void writeRecord(LogRecord record) throws IOException{
payload.clear();
record.write(payload);
payload.flip();
header.clear();
header.putInt(payload.remaining());
header.flip();
outChannel.write(header);
while(payload.hasRemaining()){
outChannel.write(payload);
}
}
public void replay(RpcNameNodeService service) throws Exception {
File file = new File(CrailConstants.NAMENODE_LOG);
if (!file.exists()){
return;
}
FileInputStream inStream = new FileInputStream(CrailConstants.NAMENODE_LOG);
FileChannel inChannel = inStream.getChannel();
LogRecord record = new LogRecord();
LogResponse response = new LogResponse();
header.clear();
int ret = inChannel.read(header);
while(ret > 0){
header.flip();
int size = header.getInt();
payload.clear().limit(size);
while(payload.hasRemaining()){
inChannel.read(payload);
}
payload.flip();
record.update(payload);
processServerEvent(service, record, response);
header.clear();
ret = inChannel.read(header);
}
inChannel.close();
inStream.close();
}
public void close() throws IOException{
outChannel.close();
outStream.close();
}
private void processServerEvent(RpcNameNodeService service, LogRecord record, LogResponse response) {
short error = RpcErrors.ERR_OK;
try {
switch(record.getCmd()) {
case RpcProtocol.CMD_CREATE_FILE:
error = service.createFile(record.createFile(), response.createFile(), response);
long fd = response.createFile().getFile().getFd();
long token = response.createFile().getFile().getToken();
tokens.put(response.createFile().getFile().getFd(), response.createFile().getFile().getToken());
tokens.put(response.createFile().getParent().getFd(), response.createFile().getParent().getToken());
break;
case RpcProtocol.CMD_SET_FILE:
record.setFile().getFileInfo().setToken(tokens.get(record.setFile().getFileInfo().getFd()));
error = service.setFile(record.setFile(), response.getVoid(), response);
break;
case RpcProtocol.CMD_REMOVE_FILE:
error = service.removeFile(record.removeFile(), response.delFile(), response);
break;
case RpcProtocol.CMD_RENAME_FILE:
error = service.renameFile(record.renameFile(), response.getRename(), response);
break;
case RpcProtocol.CMD_GET_BLOCK:
record.getBlock().setToken(tokens.get(record.getBlock().getFd()));
error = service.getBlock(record.getBlock(), response.getBlock(), response);
break;
case RpcProtocol.CMD_SET_BLOCK:
error = service.setBlock(record.setBlock(), response.getVoid(), response);
break;
default:
error = RpcErrors.ERR_INVALID_RPC_CMD;
LOG.info("Rpc command not valid, opcode " + record.getCmd());
}
} catch(Exception e){
error = RpcErrors.ERR_UNKNOWN;
LOG.info(RpcErrors.messages[RpcErrors.ERR_UNKNOWN] + e.getMessage());
e.printStackTrace();
}
}
}