blob: c93f1d0b86e8915d63f29f712c61ab02ab76e5c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.coordinator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.samza.AzureException;
import org.apache.samza.util.LeaseBlobManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class to facilitate leader election in Azure.
* The processor that acquires the lease on the blob becomes the leader.
* The lease ID is null initially. It is generated by Azure when the processor acquires the lease, and updated accordingly.
* Every processor requires a valid active lease ID in order to perform successful write and delete operations on the blob.
* Read operations from the blob are not dependent on the lease ID.
*/
public class AzureLeaderElector implements LeaderElector {
private static final Logger LOG = LoggerFactory.getLogger(AzureLeaderElector.class);
private static final int LEASE_TIME_IN_SEC = 60;
private final LeaseBlobManager leaseBlobManager;
private LeaderElectorListener leaderElectorListener = null;
private final AtomicReference<String> leaseId;
private final AtomicBoolean isLeader;
public AzureLeaderElector(LeaseBlobManager leaseBlobManager) {
this.isLeader = new AtomicBoolean(false);
this.leaseBlobManager = leaseBlobManager;
this.leaseId = new AtomicReference<>(null);
}
@Override
public void setLeaderElectorListener(LeaderElectorListener listener) {
this.leaderElectorListener = listener;
}
/**
* Tries to become the leader by acquiring a lease on the blob.
* The acquireLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals.
* Invokes the listener on becoming the leader.
* @throws AzureException If a Azure storage service error occurred. This includes the case where the blob you're trying to lease does not exist.
*/
@Override
public void tryBecomeLeader() throws AzureException {
leaseId.getAndSet(leaseBlobManager.acquireLease(LEASE_TIME_IN_SEC, leaseId.get()));
if (leaseId.get() != null) {
LOG.info("Became leader with lease ID {}.", leaseId.get());
isLeader.set(true);
if (leaderElectorListener != null) {
leaderElectorListener.onBecomingLeader();
}
} else {
LOG.info("Unable to become the leader. Continuing as a worker.");
}
}
/**
* Releases the lease in order to resign leadership. It also stops all schedulers scheduled by the leader.
* The releaseLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals.
*/
@Override
public void resignLeadership() {
if (isLeader.get()) {
leaseBlobManager.releaseLease(leaseId.get());
isLeader.set(false);
LOG.info("Resigning leadership with lease ID {}", leaseId.get());
leaseId.getAndSet(null);
} else {
LOG.info("Can't release the lease because it is not the leader and does not hold an active lease.");
}
}
/**
* Checks whether it's a leader
* @return true if it is the leader, false otherwise
*/
@Override
public boolean amILeader() {
return isLeader.get();
}
public AtomicReference<String> getLeaseId() {
return leaseId;
}
public LeaseBlobManager getLeaseBlobManager() {
return this.leaseBlobManager;
}
}