blob: 4e53c72bec8a8b2aabb782562175f8db0cb8af83 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3Utils;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.nfs.nfs3.request.CREATE3Request;
import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
import org.apache.hadoop.oncrpc.RegistrationClient;
import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.RpcReply;
import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.SimpleTcpClient;
import org.apache.hadoop.oncrpc.SimpleTcpClientHandler;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.CredentialsNone;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
public class TestOutOfOrderWrite {
public final static Logger LOG =
LoggerFactory.getLogger(TestOutOfOrderWrite.class);
static FileHandle handle = null;
static Channel channel;
static byte[] data1 = new byte[1000];
static byte[] data2 = new byte[1000];
static byte[] data3 = new byte[1000];
static XDR create() {
XDR request = new XDR();
RpcCall.getInstance(0x8000004c, Nfs3Constant.PROGRAM, Nfs3Constant.VERSION,
Nfs3Constant.NFSPROC3.CREATE.getValue(), new CredentialsNone(),
new VerifierNone()).write(request);
SetAttr3 objAttr = new SetAttr3();
CREATE3Request createReq = new CREATE3Request(new FileHandle("/"),
"out-of-order-write" + System.currentTimeMillis(), 0, objAttr, 0);
createReq.serialize(request);
return request;
}
static XDR write(FileHandle handle, int xid, long offset, int count,
byte[] data) {
XDR request = new XDR();
RpcCall.getInstance(xid, Nfs3Constant.PROGRAM, Nfs3Constant.VERSION,
Nfs3Constant.NFSPROC3.CREATE.getValue(), new CredentialsNone(),
new VerifierNone()).write(request);
WRITE3Request write1 = new WRITE3Request(handle, offset, count,
WriteStableHow.UNSTABLE, ByteBuffer.wrap(data));
write1.serialize(request);
return request;
}
static void testRequest(XDR request) {
RegistrationClient registrationClient = new RegistrationClient("localhost",
Nfs3Constant.SUN_RPCBIND, request);
registrationClient.run();
}
static class WriteHandler extends SimpleTcpClientHandler {
public WriteHandler(XDR request) {
super(request);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
// Get handle from create response
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
XDR rsp = new XDR(buf.array());
if (rsp.getBytes().length == 0) {
LOG.info("rsp length is zero, why?");
return;
}
LOG.info("rsp length=" + rsp.getBytes().length);
RpcReply reply = RpcReply.read(rsp);
int xid = reply.getXid();
// Only process the create response
if (xid != 0x8000004c) {
return;
}
int status = rsp.readInt();
if (status != Nfs3Status.NFS3_OK) {
LOG.error("Create failed, status =" + status);
return;
}
LOG.info("Create succeeded");
rsp.readBoolean(); // value follow
handle = new FileHandle();
handle.deserialize(rsp);
channel = e.getChannel();
}
}
static class WriteClient extends SimpleTcpClient {
public WriteClient(String host, int port, XDR request, Boolean oneShot) {
super(host, port, request, oneShot);
}
@Override
protected ChannelPipelineFactory setPipelineFactory() {
this.pipelineFactory = new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
return Channels.pipeline(
RpcUtil.constructRpcFrameDecoder(),
new WriteHandler(request));
}
};
return this.pipelineFactory;
}
}
public static void main(String[] args) throws InterruptedException {
Arrays.fill(data1, (byte) 7);
Arrays.fill(data2, (byte) 8);
Arrays.fill(data3, (byte) 9);
// NFS3 Create request
NfsConfiguration conf = new NfsConfiguration();
WriteClient client = new WriteClient("localhost", conf.getInt(
NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), create(), false);
client.run();
while (handle == null) {
Thread.sleep(1000);
System.out.println("handle is still null...");
}
LOG.info("Send write1 request");
XDR writeReq;
writeReq = write(handle, 0x8000005c, 2000, 1000, data3);
Nfs3Utils.writeChannel(channel, writeReq, 1);
writeReq = write(handle, 0x8000005d, 1000, 1000, data2);
Nfs3Utils.writeChannel(channel, writeReq, 2);
writeReq = write(handle, 0x8000005e, 0, 1000, data1);
Nfs3Utils.writeChannel(channel, writeReq, 3);
// TODO: convert to Junit test, and validate result automatically
}
}