blob: d24fe2f57db2b2ceab789605d4f5be36d0721b69 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crail.core;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.crail.CrailNode;
import org.apache.crail.CrailNodeType;
import org.apache.crail.Upcoming;
import org.apache.crail.conf.CrailConstants;
import org.apache.crail.rpc.RpcCreateFile;
import org.apache.crail.rpc.RpcDeleteFile;
import org.apache.crail.rpc.RpcErrors;
import org.apache.crail.rpc.RpcGetFile;
import org.apache.crail.rpc.RpcRenameFile;
import org.apache.crail.rpc.RpcVoid;
public abstract class CoreMetaDataOperation<R,T> implements Upcoming<T> {
protected static int RPC_PENDING = 0;
protected static int RPC_DONE = 1;
protected static int RPC_ERROR = 2;
private AtomicInteger status;
protected Future<R> rpcResult;
private T finalResult;
private Exception exception;
abstract T process(R tmp) throws Exception;
public CoreMetaDataOperation(Future<R> result){
this.rpcResult = result;
this.finalResult = null;
this.status = new AtomicInteger(RPC_PENDING);
this.exception = null;
}
@Override
public boolean isDone() {
if (status.get() == RPC_PENDING){
try {
if (rpcResult.isDone()){
R tmp = rpcResult.get();
finalResult = process(tmp);
status.set(RPC_DONE);
}
} catch (Exception e) {
status.set(RPC_ERROR);
this.exception = e;
}
}
return status.get() > 0;
}
@Override
public T get() throws InterruptedException, ExecutionException {
if (this.exception != null){
throw new ExecutionException(exception);
}
if (status.get() == RPC_PENDING){
try {
R tmp = rpcResult.get();
finalResult = process(tmp);
status.set(RPC_DONE);
} catch (Exception e) {
status.set(RPC_ERROR);
this.exception = e;
}
}
if (status.get() == RPC_DONE){
return finalResult;
} else if (status.get() == RPC_PENDING){
throw new InterruptedException("RPC timeout");
} else if (exception != null) {
throw new ExecutionException(exception);
} else {
throw new InterruptedException("RPC error");
}
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
if (this.exception != null){
throw new ExecutionException(exception);
}
if (status.get() == RPC_PENDING){
try {
R tmp = rpcResult.get(CrailConstants.DATA_TIMEOUT, TimeUnit.MILLISECONDS);
finalResult = process(tmp);
status.set(RPC_DONE);
} catch (Exception e) {
status.set(RPC_ERROR);
this.exception = e;
}
}
if (status.get() == RPC_DONE){
return finalResult;
} else if (status.get() == RPC_PENDING){
throw new InterruptedException("RPC timeout");
} else if (exception != null) {
throw new ExecutionException(exception);
} else {
throw new InterruptedException("RPC error");
}
}
public T early() throws Exception {
return this.get();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
}
class CreateNodeFuture extends CoreMetaDataOperation<RpcCreateFile, CrailNode> {
private CoreDataStore fs;
private String path;
private CrailNodeType type;
public CreateNodeFuture(CoreDataStore fs, String path, CrailNodeType type, Future<RpcCreateFile> fileRes) {
super(fileRes);
this.fs = fs;
this.path = path;
this.type = type;
}
@Override
CrailNode process(RpcCreateFile response) throws Exception {
return fs._createNode(path, type, response);
}
@Override
public CrailNode early() throws Exception {
switch(type){
case DATAFILE:
return new CoreEarlyFile(fs, path, type, this);
case DIRECTORY:
case MULTIFILE:
return null;
default:
return super.early();
}
}
}
class LookupNodeFuture extends CoreMetaDataOperation<RpcGetFile, CrailNode> {
private String path;
private CoreDataStore fs;
public LookupNodeFuture(CoreDataStore fs, String path, Future<RpcGetFile> fileRes) {
super(fileRes);
this.fs = fs;
this.path = path;
}
@Override
CrailNode process(RpcGetFile tmp) throws Exception {
return fs._lookupNode(tmp, path);
}
}
class DeleteNodeFuture extends CoreMetaDataOperation<RpcDeleteFile, CrailNode> {
private String path;
private boolean recursive;
private CoreDataStore fs;
public DeleteNodeFuture(CoreDataStore fs, String path, boolean recursive, Future<RpcDeleteFile> fileRes) {
super(fileRes);
this.fs = fs;
this.path = path;
this.recursive = recursive;
}
@Override
CrailNode process(RpcDeleteFile tmp) throws Exception {
return fs._delete(tmp, path, recursive);
}
}
class RenameNodeFuture extends CoreMetaDataOperation<RpcRenameFile, CrailNode> {
private String src;
private String dst;
private CoreDataStore fs;
public RenameNodeFuture(CoreDataStore fs, String src, String dst, Future<RpcRenameFile> fileRes) {
super(fileRes);
this.fs = fs;
this.src = src;
this.dst = dst;
}
@Override
CrailNode process(RpcRenameFile tmp) throws Exception {
return fs._rename(tmp, src, dst);
}
}
class SyncNodeFuture extends CoreMetaDataOperation<RpcVoid, Void> {
public SyncNodeFuture(Future<RpcVoid> fileRes) {
super(fileRes);
}
@Override
Void process(RpcVoid tmp) throws Exception {
if (tmp.getError() != RpcErrors.ERR_OK){
throw new Exception("sync: " + RpcErrors.messages[tmp.getError()]);
}
return null;
}
}
class NoOperation implements Future<Void> {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
@Override
public Void get() throws InterruptedException, ExecutionException {
return null;
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return null;
}
}