blob: 9fec15fa6bc0e3a0c5cb62c7b160be9a90b7094b [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.proto;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.jboss.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST;
public class BookieRequestProcessor implements RequestProcessor {
private final static Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class);
/**
* The server configuration. We use this for getting the number of add and read
* worker threads.
*/
private final ServerConfiguration serverCfg;
/**
* This is the Bookie instance that is used to handle all read and write requests.
*/
final Bookie bookie;
/**
* The threadpool used to execute all read entry requests issued to this server.
*/
private final ExecutorService readThreadPool;
/**
* The threadpool used to execute all add entry requests issued to this server.
*/
private final ExecutorService writeThreadPool;
// Expose Stats
private final BKStats bkStats = BKStats.getInstance();
private final boolean statsEnabled;
final OpStatsLogger addRequestStats;
final OpStatsLogger addEntryStats;
final OpStatsLogger readRequestStats;
final OpStatsLogger readEntryStats;
public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
StatsLogger statsLogger) {
this.serverCfg = serverCfg;
this.bookie = bookie;
this.readThreadPool =
createExecutor(this.serverCfg.getNumReadWorkerThreads(),
"BookieReadThread-" + serverCfg.getBookiePort() + "-%d");
this.writeThreadPool =
createExecutor(this.serverCfg.getNumAddWorkerThreads(),
"BookieWriteThread-" + serverCfg.getBookiePort() + "-%d");
// Expose Stats
this.statsEnabled = serverCfg.isStatisticsEnabled();
this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
this.addRequestStats = statsLogger.getOpStatsLogger(ADD_ENTRY_REQUEST);
this.readEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY);
this.readRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST);
}
@Override
public void close() {
shutdownExecutor(writeThreadPool);
shutdownExecutor(readThreadPool);
}
private ExecutorService createExecutor(int numThreads, String nameFormat) {
if (numThreads <= 0) {
return null;
} else {
return Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder().setNameFormat(nameFormat).build());
}
}
private void shutdownExecutor(ExecutorService service) {
if (null != service) {
service.shutdown();
}
}
@Override
public void processRequest(Object msg, Channel c) {
// If we can decode this packet as a Request protobuf packet, process
// it as a version 3 packet. Else, just use the old protocol.
if (msg instanceof BookkeeperProtocol.Request) {
BookkeeperProtocol.Request r = (BookkeeperProtocol.Request) msg;
BookkeeperProtocol.BKPacketHeader header = r.getHeader();
switch (header.getOperation()) {
case ADD_ENTRY:
processAddRequestV3(r, c);
break;
case READ_ENTRY:
processReadRequestV3(r, c);
break;
default:
BookkeeperProtocol.Response.Builder response =
BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
c.write(response.build());
if (statsEnabled) {
bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
}
break;
}
} else {
BookieProtocol.Request r = (BookieProtocol.Request) msg;
// process packet
switch (r.getOpCode()) {
case BookieProtocol.ADDENTRY:
processAddRequest(r, c);
break;
case BookieProtocol.READENTRY:
processReadRequest(r, c);
break;
default:
LOG.error("Unknown op type {}, sending error", r.getOpCode());
c.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
if (statsEnabled) {
bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
}
break;
}
}
}
private void processAddRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, this);
if (null == writeThreadPool) {
write.run();
} else {
writeThreadPool.submit(write);
}
}
private void processReadRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
ReadEntryProcessorV3 read = new ReadEntryProcessorV3(r, c, this);
if (null == readThreadPool) {
read.run();
} else {
readThreadPool.submit(read);
}
}
private void processAddRequest(final BookieProtocol.Request r, final Channel c) {
WriteEntryProcessor write = new WriteEntryProcessor(r, c, this);
if (null == writeThreadPool) {
write.run();
} else {
writeThreadPool.submit(write);
}
}
private void processReadRequest(final BookieProtocol.Request r, final Channel c) {
ReadEntryProcessor read = new ReadEntryProcessor(r, c, this);
if (null == readThreadPool) {
read.run();
} else {
readThreadPool.submit(read);
}
}
}