blob: 93acb45754e9111bd758fbb86cd9eea551767594 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import org.apache.nifi.distributed.cache.client.adapter.InboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.NullInboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
import java.io.IOException;
/**
* The {@link io.netty.channel.ChannelHandler} responsible for sending client requests and receiving server responses
* in the context of a distributed cache server.
*/
public class CacheClientRequestHandler extends ChannelInboundHandlerAdapter {
/**
* The object used to buffer and interpret the service response byte stream.
*/
private InboundAdapter inboundAdapter = new NullInboundAdapter();
/**
* The synchronization construct used to signal the client application that the server response has been received.
*/
private ChannelPromise channelPromise;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
final ByteBuf byteBuf = (ByteBuf) msg;
try {
final byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
inboundAdapter.queue(bytes);
} finally {
byteBuf.release();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
inboundAdapter.dequeue();
if (inboundAdapter.isComplete() && !channelPromise.isSuccess()) {
channelPromise.setSuccess();
}
}
/**
* Perform a synchronous method call to the server. The server is expected to write
* a byte stream response to the channel, which may be deserialized into a Java object
* by the caller.
*
* The receipt of data outside the context of a call to this method is unexpected; the data is dropped.
*
* @param channel the network channel used to make the request
* @param outboundAdapter the request payload, which might be a method name, and [0..n] concatenated arguments
* @param inboundAdapter the business logic to deserialize the server response
*/
public void invoke(final Channel channel, final OutboundAdapter outboundAdapter, final InboundAdapter inboundAdapter) throws IOException {
final CacheClientHandshakeHandler handshakeHandler = channel.pipeline().get(CacheClientHandshakeHandler.class);
handshakeHandler.waitHandshakeComplete();
if (handshakeHandler.getVersionNegotiator().getVersion() < outboundAdapter.getMinimumVersion()) {
throw new UnsupportedOperationException("Remote cache server doesn't support protocol version " + outboundAdapter.getMinimumVersion());
}
this.inboundAdapter = inboundAdapter;
channelPromise = channel.newPromise();
channel.writeAndFlush(Unpooled.wrappedBuffer(outboundAdapter.toBytes()));
channelPromise.awaitUninterruptibly();
this.inboundAdapter = new NullInboundAdapter();
}
}