blob: 6b726b283a96542235ac82302fac75d4414d1cfb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.tso;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.EnsurePath;
import org.apache.omid.tso.TSOStateManager.TSOState;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Encompasses all the required elements to control the leases required for
* identifying the master instance when running multiple TSO instances for HA
* It delegates the initialization of the TSO state and the publication of
* the instance information when getting the lease to an asynchronous task to
* continue managing the leases without interruptions.
*/
class LeaseManager extends AbstractScheduledService implements LeaseManagement {
private static final Logger LOG = LoggerFactory.getLogger(LeaseManager.class);
private final CuratorFramework zkClient;
private final Panicker panicker;
private final String tsoHostAndPort;
private final TSOStateManager stateManager;
private final ExecutorService tsoStateInitializer = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setNameFormat("tso-state-initializer")
.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
panicker.panic(t + " threw exception", e);
}
})
.build());
private final long leasePeriodInMs;
private final TSOChannelHandler tsoChannelHandler;
private int leaseNodeVersion;
private final AtomicLong endLeaseInMs = new AtomicLong(0L);
private final AtomicLong baseTimeInMs = new AtomicLong(0L);
private final String leasePath;
private final String currentTSOPath;
LeaseManager(String tsoHostAndPort,
TSOChannelHandler tsoChannelHandler,
TSOStateManager stateManager,
long leasePeriodInMs,
String leasePath,
String currentTSOPath,
CuratorFramework zkClient,
Panicker panicker) {
this.tsoHostAndPort = tsoHostAndPort;
this.tsoChannelHandler = tsoChannelHandler;
this.stateManager = stateManager;
this.leasePeriodInMs = leasePeriodInMs;
this.leasePath = leasePath;
this.currentTSOPath = currentTSOPath;
this.zkClient = zkClient;
this.panicker = panicker;
LOG.info("LeaseManager {} initialized. Lease period {}ms", toString(), leasePeriodInMs);
}
// ----------------------------------------------------------------------------------------------------------------
// LeaseManagement implementation
// ----------------------------------------------------------------------------------------------------------------
@Override
public void startService() throws LeaseManagementException {
createLeaseManagementZNode();
createCurrentTSOZNode();
startAndWait();
}
@Override
public void stopService() throws LeaseManagementException {
stopAndWait();
}
@Override
public boolean stillInLeasePeriod() {
return System.currentTimeMillis() <= getEndLeaseInMs();
}
// ----------------------------------------------------------------------------------------------------------------
// End LeaseManagement implementation
// ----------------------------------------------------------------------------------------------------------------
void tryToGetInitialLeasePeriod() throws Exception {
baseTimeInMs.set(System.currentTimeMillis());
if (canAcquireLease()) {
endLeaseInMs.set(baseTimeInMs.get() + leasePeriodInMs);
LOG.info("{} got the lease (Master) Ver. {}/End of lease: {}ms", tsoHostAndPort,
leaseNodeVersion, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(endLeaseInMs));
tsoStateInitializer.submit(new Runnable() {
// TSO State initialization
@Override
public void run() {
try {
TSOState newTSOState = stateManager.initialize();
advertiseTSOServerInfoThroughZK(newTSOState.getEpoch());
tsoChannelHandler.reconnect();
} catch (Exception e) {
Thread t = Thread.currentThread();
t.getUncaughtExceptionHandler().uncaughtException(t, e);
}
}
});
} else {
tsoStateInitializer.submit(new Runnable() {
// TSO State initialization
@Override
public void run() {
// In case the TSO was paused close the connection
tsoChannelHandler.closeConnection();
}
});
}
}
void tryToRenewLeasePeriod() throws Exception {
baseTimeInMs.set(System.currentTimeMillis());
if (canAcquireLease()) {
if (System.currentTimeMillis() > getEndLeaseInMs()) {
endLeaseInMs.set(0L);
panicker.panic(tsoHostAndPort + " expired lease! Master is committing suicide");
} else {
endLeaseInMs.set(baseTimeInMs.get() + leasePeriodInMs);
LOG.trace("{} renewed lease: Version {}/End of lease at {}ms",
tsoHostAndPort, leaseNodeVersion, endLeaseInMs);
}
} else {
endLeaseInMs.set(0L);
panicker.panic(tsoHostAndPort + " lease lost (Ver. " + leaseNodeVersion + ")! Other instance is Master. Committing suicide...");
}
}
private boolean haveLease() {
return stillInLeasePeriod();
}
private long getEndLeaseInMs() {
return endLeaseInMs.get();
}
private boolean canAcquireLease() throws Exception {
try {
int previousLeaseNodeVersion = leaseNodeVersion;
final byte[] instanceInfo = tsoHostAndPort.getBytes(Charsets.UTF_8);
// Try to acquire the lease
Stat stat = zkClient.setData().withVersion(previousLeaseNodeVersion)
.forPath(leasePath, instanceInfo);
leaseNodeVersion = stat.getVersion();
LOG.trace("{} got new lease version {}", tsoHostAndPort, leaseNodeVersion);
} catch (KeeperException.BadVersionException e) {
return false;
}
return true;
}
// ----------------------------------------------------------------------------------------------------------------
// AbstractScheduledService implementation
// ----------------------------------------------------------------------------------------------------------------
@Override
protected void startUp() {
}
@Override
protected void shutDown() {
try {
tsoChannelHandler.close();
LOG.info("Channel handler closed");
} catch (IOException e) {
LOG.error("Error closing TSOChannelHandler", e);
}
}
@Override
protected void runOneIteration() throws Exception {
if (!haveLease()) {
tryToGetInitialLeasePeriod();
} else {
tryToRenewLeasePeriod();
}
}
@Override
protected Scheduler scheduler() {
final long guardLeasePeriodInMs = leasePeriodInMs / 4;
return new AbstractScheduledService.CustomScheduler() {
@Override
protected Schedule getNextSchedule() throws Exception {
if (!haveLease()) {
// Get the current node version...
Stat stat = zkClient.checkExists().forPath(leasePath);
leaseNodeVersion = stat.getVersion();
LOG.trace("{} will try to get lease (with Ver. {}) in {}ms", tsoHostAndPort, leaseNodeVersion,
leasePeriodInMs);
// ...and wait the lease period
return new Schedule(leasePeriodInMs, TimeUnit.MILLISECONDS);
} else {
long waitTimeInMs = getEndLeaseInMs() - System.currentTimeMillis() - guardLeasePeriodInMs;
LOG.trace("{} will try to renew lease (with Ver. {}) in {}ms", tsoHostAndPort,
leaseNodeVersion, waitTimeInMs);
return new Schedule(waitTimeInMs, TimeUnit.MILLISECONDS);
}
}
};
}
// ----------------------------------------------------------------------------------------------------------------
// Helper methods
// ----------------------------------------------------------------------------------------------------------------
@Override
public String toString() {
return tsoHostAndPort;
}
private void createLeaseManagementZNode() throws LeaseManagementException {
try {
validateZKPath(leasePath);
} catch (Exception e) {
throw new LeaseManagementException("Error creating Lease Management ZNode", e);
}
}
private void createCurrentTSOZNode() throws LeaseManagementException {
try {
validateZKPath(currentTSOPath);
} catch (Exception e) {
throw new LeaseManagementException("Error creating TSO ZNode", e);
}
}
private void validateZKPath(String zkPath) throws Exception {
EnsurePath path = zkClient.newNamespaceAwareEnsurePath(zkPath);
path.ensure(zkClient.getZookeeperClient());
Stat stat = zkClient.checkExists().forPath(zkPath);
Preconditions.checkNotNull(stat);
LOG.info("Path {} ensured", path.getPath());
}
private void advertiseTSOServerInfoThroughZK(long epoch) throws Exception {
Stat previousTSOZNodeStat = new Stat();
byte[] previousTSOInfoAsBytes = zkClient.getData().storingStatIn(previousTSOZNodeStat).forPath(currentTSOPath);
if (previousTSOInfoAsBytes != null && !new String(previousTSOInfoAsBytes, Charsets.UTF_8).isEmpty()) {
String previousTSOInfo = new String(previousTSOInfoAsBytes, Charsets.UTF_8);
String[] previousTSOAndEpochArray = previousTSOInfo.split("#");
Preconditions.checkArgument(previousTSOAndEpochArray.length == 2, "Incorrect TSO Info found: ", previousTSOInfo);
long oldEpoch = Long.parseLong(previousTSOAndEpochArray[1]);
if (oldEpoch > epoch) {
throw new LeaseManagementException("Another TSO replica was found " + previousTSOInfo);
}
}
String tsoInfoAsString = tsoHostAndPort + "#" + Long.toString(epoch);
byte[] tsoInfoAsBytes = tsoInfoAsString.getBytes(Charsets.UTF_8);
zkClient.setData().withVersion(previousTSOZNodeStat.getVersion()).forPath(currentTSOPath, tsoInfoAsBytes);
LOG.info("TSO instance {} (Epoch {}) advertised through ZK", tsoHostAndPort, epoch);
}
}