blob: b0924d3d6d6ffbaf5e65366583dfd34e702692bb [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crail.storage.tcp;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.file.Paths;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.crail.conf.CrailConfiguration;
import org.apache.crail.conf.CrailConstants;
import org.apache.crail.storage.StorageResource;
import org.apache.crail.storage.StorageServer;
import org.apache.crail.storage.StorageUtils;
import org.apache.crail.utils.CrailUtils;
import org.slf4j.Logger;
import com.ibm.narpc.NaRPCServerChannel;
import com.ibm.narpc.NaRPCServerEndpoint;
import com.ibm.narpc.NaRPCServerGroup;
import com.ibm.narpc.NaRPCService;
public class TcpStorageServer implements Runnable, StorageServer, NaRPCService<TcpStorageRequest, TcpStorageResponse> {
private static final Logger LOG = CrailUtils.getLogger();
private NaRPCServerGroup<TcpStorageRequest, TcpStorageResponse> serverGroup;
private NaRPCServerEndpoint<TcpStorageRequest, TcpStorageResponse> serverEndpoint;
private InetSocketAddress address;
private boolean alive;
private long regions;
private long keys;
private ConcurrentHashMap<Integer, ByteBuffer> dataBuffers;
private String dataDirPath;
@Override
public void init(CrailConfiguration conf, String[] args) throws Exception {
TcpStorageConstants.init(conf, args);
this.serverGroup = new NaRPCServerGroup<TcpStorageRequest, TcpStorageResponse>(this, TcpStorageConstants.STORAGE_TCP_QUEUE_DEPTH, (int) CrailConstants.BLOCK_SIZE*2, false, TcpStorageConstants.STORAGE_TCP_CORES);
this.serverEndpoint = serverGroup.createServerEndpoint();
this.address = StorageUtils.getDataNodeAddress(TcpStorageConstants.STORAGE_TCP_INTERFACE, TcpStorageConstants.STORAGE_TCP_PORT);
serverEndpoint.bind(address);
this.alive = false;
this.regions = TcpStorageConstants.STORAGE_TCP_STORAGE_LIMIT/TcpStorageConstants.STORAGE_TCP_ALLOCATION_SIZE;
this.keys = 0;
this.dataBuffers = new ConcurrentHashMap<Integer, ByteBuffer>();
this.dataDirPath = StorageUtils.getDatanodeDirectory(TcpStorageConstants.STORAGE_TCP_DATA_PATH, address);
StorageUtils.clean(TcpStorageConstants.STORAGE_TCP_DATA_PATH, dataDirPath);
}
@Override
public void printConf(Logger logger) {
TcpStorageConstants.printConf(logger);
}
@Override
public StorageResource allocateResource() throws Exception {
StorageResource resource = null;
if (keys < regions){
int fileId = (int) keys++;
String dataFilePath = Paths.get(dataDirPath, Integer.toString(fileId)).toString();
RandomAccessFile dataFile = new RandomAccessFile(dataFilePath, "rw");
FileChannel dataChannel = dataFile.getChannel();
ByteBuffer buffer = dataChannel.map(MapMode.READ_WRITE, 0, TcpStorageConstants.STORAGE_TCP_ALLOCATION_SIZE);
dataBuffers.put(fileId, buffer);
dataFile.close();
dataChannel.close();
long address = CrailUtils.getAddress(buffer);
resource = StorageResource.createResource(address, buffer.capacity(), fileId);
}
return resource;
}
@Override
public InetSocketAddress getAddress() {
return address;
}
@Override
public boolean isAlive() {
return alive;
}
@Override
public void run() {
try {
LOG.info("running TCP storage server, address " + address);
this.alive = true;
while(true){
NaRPCServerChannel endpoint = serverEndpoint.accept();
LOG.info("new connection " + endpoint.address());
}
} catch(Exception e){
e.printStackTrace();
}
}
@Override
public TcpStorageRequest createRequest() {
return new TcpStorageRequest();
}
@Override
public TcpStorageResponse processRequest(TcpStorageRequest request) {
if (request.type() == TcpStorageProtocol.REQ_WRITE){
TcpStorageRequest.WriteRequest writeRequest = request.getWriteRequest();
ByteBuffer buffer = dataBuffers.get(writeRequest.getKey()).duplicate();
long offset = writeRequest.getAddress() - CrailUtils.getAddress(buffer);
// LOG.info("processing write request, key " + writeRequest.getKey() + ", address " + writeRequest.getAddress() + ", length " + writeRequest.length() + ", remaining " + writeRequest.getBuffer().remaining() + ", offset " + offset);
buffer.clear().position((int) offset);
buffer.put(writeRequest.getBuffer());
TcpStorageResponse.WriteResponse writeResponse = new TcpStorageResponse.WriteResponse(writeRequest.length());
return new TcpStorageResponse(writeResponse);
} else if (request.type() == TcpStorageProtocol.REQ_READ){
TcpStorageRequest.ReadRequest readRequest = request.getReadRequest();
ByteBuffer buffer = dataBuffers.get(readRequest.getKey()).duplicate();
long offset = readRequest.getAddress() - CrailUtils.getAddress(buffer);
// LOG.info("processing read request, address " + readRequest.getAddress() + ", length " + readRequest.length() + ", offset " + offset);
long limit = offset + readRequest.length();
buffer.clear().position((int) offset).limit((int) limit);
TcpStorageResponse.ReadResponse readResponse = new TcpStorageResponse.ReadResponse(buffer);
return new TcpStorageResponse(readResponse);
} else {
LOG.info("processing unknown request");
return new TcpStorageResponse(TcpStorageProtocol.RET_RPC_UNKNOWN);
}
}
@Override
public void addEndpoint(NaRPCServerChannel channel){
}
@Override
public void removeEndpoint(NaRPCServerChannel channel){
}
}