blob: 3551aa20c6140a1f34e53eca57c3dd2d382674b0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.segment.standby.server;
import java.security.cert.CertificateException;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.compression.SnappyFrameEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.CharsetUtil;
import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
import org.apache.jackrabbit.oak.segment.file.FileStore;
import org.apache.jackrabbit.oak.segment.standby.codec.GetBlobResponseEncoder;
import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadResponseEncoder;
import org.apache.jackrabbit.oak.segment.standby.codec.GetReferencesResponseEncoder;
import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentResponseEncoder;
import org.apache.jackrabbit.oak.segment.standby.codec.RequestDecoder;
import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class StandbyServer implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(StandbyServer.class);
/**
* If a persisted head state cannot be acquired in less than this timeout,
* the 'get head' request from the client will be discarded.
*/
static final long READ_HEAD_TIMEOUT =
Long.getLong("standby.server.timeout", 10_000L);
static Builder builder(int port, StoreProvider provider, int blobChunkSize) {
return new Builder(port, provider, blobChunkSize);
}
private final int port;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final ServerBootstrap b;
private SslContext sslContext;
private ChannelFuture channelFuture;
static class Builder {
private final int port;
private final StoreProvider storeProvider;
private final int blobChunkSize;
private boolean secure;
private String[] allowedClientIPRanges;
private StateConsumer stateConsumer;
private CommunicationObserver observer;
private Builder(final int port, final StoreProvider storeProvider, final int blobChunkSize) {
this.port = port;
this.storeProvider = storeProvider;
this.blobChunkSize = blobChunkSize;
}
Builder secure(boolean secure) {
this.secure = secure;
return this;
}
Builder allowIPRanges(String[] allowedClientIPRanges) {
this.allowedClientIPRanges = allowedClientIPRanges;
return this;
}
Builder withStateConsumer(StateConsumer stateConsumer) {
this.stateConsumer = stateConsumer;
return this;
}
Builder withObserver(CommunicationObserver observer) {
this.observer = observer;
return this;
}
StandbyServer build() throws CertificateException, SSLException {
return new StandbyServer(this);
}
}
private StandbyServer(final Builder builder) throws CertificateException, SSLException {
this.port = builder.port;
if (builder.secure) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslContext = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
}
bossGroup = new NioEventLoopGroup(1, new NamedThreadFactory("primary-run"));
workerGroup = new NioEventLoopGroup(0, new NamedThreadFactory("primary"));
b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.option(ChannelOption.SO_REUSEADDR, true);
b.childOption(ChannelOption.TCP_NODELAY, true);
b.childOption(ChannelOption.SO_REUSEADDR, true);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ClientFilterHandler(new ClientIpFilter(builder.allowedClientIPRanges)));
if (sslContext != null) {
p.addLast("ssl", sslContext.newHandler(ch.alloc()));
}
// Decoders
p.addLast(new LineBasedFrameDecoder(8192));
p.addLast(new StringDecoder(CharsetUtil.UTF_8));
p.addLast(new RequestDecoder());
p.addLast(new StateHandler(builder.stateConsumer));
p.addLast(new RequestObserverHandler(builder.observer));
// Snappy Encoder
p.addLast(new SnappyFrameEncoder());
// Use chunking transparently
p.addLast(new ChunkedWriteHandler());
// Other Encoders
p.addLast(new GetHeadResponseEncoder());
p.addLast(new GetSegmentResponseEncoder());
p.addLast(new GetBlobResponseEncoder(builder.blobChunkSize));
p.addLast(new GetReferencesResponseEncoder());
p.addLast(new ResponseObserverHandler(builder.observer));
// Handlers
FileStore store = builder.storeProvider.provideStore();
p.addLast(new GetHeadRequestHandler(new DefaultStandbyHeadReader(store, READ_HEAD_TIMEOUT)));
p.addLast(new GetSegmentRequestHandler(new DefaultStandbySegmentReader(store)));
p.addLast(new GetBlobRequestHandler(new DefaultStandbyBlobReader(store.getBlobStore())));
p.addLast(new GetReferencesRequestHandler(new DefaultStandbyReferencesReader(store)));
// Exception handler
p.addLast(new ExceptionHandler());
}
});
}
public void start() {
channelFuture = b.bind(port);
if (channelFuture.awaitUninterruptibly(1, TimeUnit.SECONDS)) {
onTimelyConnect();
} else {
onConnectTimeOut();
}
}
public void stop() {
if (channelFuture == null) {
return;
}
if (channelFuture.channel().disconnect().awaitUninterruptibly(1, TimeUnit.SECONDS)) {
log.debug("Channel disconnected");
} else {
log.debug("Channel disconnect timed out");
}
}
@Override
public void close() {
stop();
if (shutDown(bossGroup)) {
log.debug("Boss group shut down");
} else {
log.debug("Boss group shutdown timed out");
}
if (shutDown(workerGroup)) {
log.debug("Worker group shut down");
} else {
log.debug("Worker group shutdown timed out");
}
}
private static boolean shutDown(EventLoopGroup group) {
return group.shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly(10, TimeUnit.SECONDS);
}
private void onTimelyConnect() {
if (channelFuture.isSuccess()) {
log.debug("Binding was successful");
}
if (channelFuture.cause() != null) {
throw new RuntimeException(channelFuture.cause());
}
}
private void onConnectTimeOut() {
log.debug("Binding timed out, canceling");
channelFuture.cancel(true);
}
}