blob: 77c183abe1a3596800e99361eb10cddbff291c6c [file] [log] [blame]
/*-
* Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
*
* This file was distributed by Oracle as part of a version of Oracle Berkeley
* DB Java Edition made available at:
*
* http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html
*
* Please see the LICENSE file included in the top-level directory of the
* appropriate version of Oracle Berkeley DB Java Edition for a copy of the
* license and additional information.
*/
package com.sleepycat.je.rep.util.ldiff;
import java.io.IOException;
import java.util.Iterator;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseNotFoundException;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.impl.RepImpl;
import com.sleepycat.je.rep.impl.node.NameIdPair;
import com.sleepycat.je.rep.net.DataChannel;
import com.sleepycat.je.rep.util.ldiff.Protocol.DbBlocks;
import com.sleepycat.je.rep.util.ldiff.Protocol.EnvDiff;
import com.sleepycat.je.rep.util.ldiff.Protocol.RemoteDiffRequest;
import com.sleepycat.je.rep.utilint.ServiceDispatcher;
import com.sleepycat.je.rep.utilint.BinaryProtocol.Message;
import com.sleepycat.je.rep.utilint.BinaryProtocol.ProtocolException;
import com.sleepycat.je.rep.utilint.ServiceDispatcher.ExecutingService;
/**
* Implementation of the LDiff service that process requests from the LDiff
* client. It's the network level interface to the remote processing done as
* part of the ldiff implementation.
*
* Note that the service only processes one request at a time, so as not to
* overload the node.
*/
public class LDiffService extends ExecutingService {
/* The service name. */
public static final String NAME = "LDiff";
/*
* Determines whether the service is busy and will accept a new
* request.
*/
private final AtomicBoolean busy = new AtomicBoolean(false);
private final RepImpl repImpl;
private final ServiceDispatcher dispatcher;
public LDiffService(ServiceDispatcher dispatcher, RepImpl repImpl) {
super(NAME, dispatcher);
this.repImpl = repImpl;
this.dispatcher = dispatcher;
dispatcher.register(this);
}
public void shutdown() {
dispatcher.cancel(NAME);
}
/*
* Returns busy if we are already processing a request.
*/
@Override
public boolean isBusy() {
return busy.get();
}
@Override
public Runnable getRunnable(DataChannel dataChannel) {
if (!busy.compareAndSet(false, true)) {
throw EnvironmentFailureException.unexpectedState
("Service is already busy");
}
return new LDiffServiceRunnable(dataChannel);
}
class LDiffServiceRunnable implements Runnable {
final DataChannel channel;
private ReplicatedEnvironment env;
private DatabaseConfig dbConfig = new DatabaseConfig();
LDiffServiceRunnable(DataChannel dataChannel) {
this.channel = dataChannel;
dbConfig.setReadOnly(true);
dbConfig.setAllowCreate(false);
}
public void runLDiff(DbBlocks request, Protocol protocol)
throws IOException {
Database db = null;
Cursor cursor = null;
try{
db = env.openDatabase
(null, request.getDbName(), dbConfig);
protocol.write(protocol.new BlockListStart(), channel);
LDiffConfig cfg = new LDiffConfig();
cfg.setBlockSize(request.getBlockSize());
LDiff ldf = new LDiff(cfg);
/* Use the Iterator to stream the blocks across the wire. */
Iterator<Block> blocks = ldf.iterator(db);
while (blocks.hasNext()) {
protocol.write(protocol.new BlockInfo(blocks.next()),
channel);
}
protocol.write(protocol.new BlockListEnd(), channel);
/* Start to do the record difference analysis. */
Message msg = protocol.read(channel);
if (msg.getOp() == Protocol.REMOTE_DIFF_REQUEST) {
cursor = db.openCursor(null, null);
sendDiffArea(cursor, (RemoteDiffRequest) msg, protocol);
runDiffAnalysis(cursor, protocol);
} else if (msg.getOp() != Protocol.DONE) {
protocol.write(protocol.new ProtocolError
("Invalid message: " + msg), channel);
}
} catch (DatabaseNotFoundException e) {
protocol.write(protocol.new DbMismatch(e.getMessage()),
channel);
} finally {
if (cursor != null) {
cursor.close();
}
if (db != null) {
db.close();
}
}
}
/* Get records for all different areas and send out. */
private void runDiffAnalysis(Cursor cursor,
Protocol protocol)
throws IOException {
while (true) {
Message msg = protocol.read(channel);
if (msg.getOp() == Protocol.REMOTE_DIFF_REQUEST) {
sendDiffArea(cursor, (RemoteDiffRequest) msg, protocol);
} else {
if (msg.getOp() != Protocol.DONE) {
protocol.write(protocol.new ProtocolError
("Invalid message: " + msg), channel);
}
break;
}
}
}
/* Send the different records of an area to the requested machine. */
private void sendDiffArea(Cursor cursor,
RemoteDiffRequest request,
Protocol protocol)
throws IOException {
/* Get the records in the different area. */
HashSet<Record> records = null;
try {
records = DiffRecordAnalyzer.getDiffArea(cursor, request);
} catch (Exception e) {
protocol.write(protocol.new Error(e.getMessage()), channel);
throw new LDiffRecordRequestException(e.getMessage());
}
/* Write them out to the requested machine. */
protocol.write(protocol.new DiffAreaStart(), channel);
for (Record record: records) {
protocol.write(protocol.new RemoteRecord(record), channel);
}
protocol.write(protocol.new DiffAreaEnd(), channel);
}
public void runEnvDiff(EnvDiff request, Protocol protocol)
throws IOException {
protocol.write(protocol.new EnvInfo
(env.getDatabaseNames().size()), channel);
}
@Override
public void run() {
final Protocol protocol;
ensureChannelBlocking(channel);
try {
env = repImpl.makeEnvironment();
protocol = new Protocol(new NameIdPair("Ldiff", -1), repImpl);
try {
Message msg = protocol.read(channel);
if (msg.getOp() == Protocol.DB_BLOCKS) {
runLDiff((DbBlocks)msg, protocol);
} else if (msg.getOp() == Protocol.ENV_DIFF) {
runEnvDiff((EnvDiff)msg, protocol);
}
} catch (ProtocolException e) {
/* Unexpected message. */
protocol.write
(protocol.new ProtocolError(e.getMessage()),
channel);
} finally {
if (channel.isOpen()) {
channel.close();
}
}
} catch (IOException e) {
/*
* Channel has already been closed, or the close itself
* failed.
*/
} finally {
if (env != null) {
env.close();
}
if (!busy.compareAndSet(true, false)) {
throw EnvironmentFailureException.unexpectedState
("Service is not busy");
}
}
}
}
}