blob: 2e606a0df23767bb94c411eabcd229816ed0e9ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.modules.util;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.geode.DataSerializable;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.management.internal.security.ResourcePermissions;
import org.apache.geode.security.ResourcePermission;
public class BootstrappingFunction implements Function, MembershipListener, DataSerializable {
private static final long serialVersionUID = 1856043174458190605L;
public static final String ID = "bootstrapping-function";
private static final ReentrantLock registerFunctionLock = new ReentrantLock();
private static final int TIME_TO_WAIT_FOR_CACHE =
Integer.getInteger("gemfiremodules.timeToWaitForCache", 30000);
public BootstrappingFunction() {}
@Override
public void execute(FunctionContext context) {
// Verify that the cache exists before continuing.
// When this function is executed by a remote membership listener, it is
// being invoked before the cache is started.
Cache cache = verifyCacheExists();
// Register as membership listener
registerAsMembershipListener(cache);
if (!isLocator(cache)) {
// Register functions
registerFunctions();
}
// Return status
context.getResultSender().lastResult(Boolean.TRUE);
}
protected boolean isLocator(Cache cache) {
DistributedSystem system = cache.getDistributedSystem();
InternalDistributedMember member = (InternalDistributedMember) system.getDistributedMember();
return member.getVmKind() == ClusterDistributionManager.LOCATOR_DM_TYPE;
}
protected Cache verifyCacheExists() {
int timeToWait = 0;
Cache cache = null;
while (timeToWait < TIME_TO_WAIT_FOR_CACHE) {
try {
cache = CacheFactory.getAnyInstance();
break;
} catch (Exception ignore) {
// keep trying and hope for the best
}
try {
Thread.sleep(250);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
timeToWait += 250;
}
if (cache == null) {
cache = new CacheFactory().create();
}
return cache;
}
@Override
public Collection<ResourcePermission> getRequiredPermissions(String regionName) {
return Collections.singletonList(ResourcePermissions.CLUSTER_MANAGE);
}
private void registerAsMembershipListener(Cache cache) {
DistributionManager dm =
((InternalDistributedSystem) cache.getDistributedSystem()).getDistributionManager();
dm.addMembershipListener(this);
}
protected void registerFunctions() {
// Synchronize so that these functions aren't registered twice. The
// constructor for the CreateRegionFunction creates a meta region.
registerFunctionLock.lock();
try {
// Register the create region function if it is not already registered
if (!FunctionService.isRegistered(CreateRegionFunction.ID)) {
FunctionService.registerFunction(new CreateRegionFunction());
}
// Register the touch partitioned region entries function if it is not already registered
if (!FunctionService.isRegistered(TouchPartitionedRegionEntriesFunction.ID)) {
FunctionService.registerFunction(new TouchPartitionedRegionEntriesFunction());
}
// Register the touch replicated region entries function if it is not already registered
if (!FunctionService.isRegistered(TouchReplicatedRegionEntriesFunction.ID)) {
FunctionService.registerFunction(new TouchReplicatedRegionEntriesFunction());
}
// Register the region size function if it is not already registered
if (!FunctionService.isRegistered(RegionSizeFunction.ID)) {
FunctionService.registerFunction(new RegionSizeFunction());
}
} finally {
registerFunctionLock.unlock();
}
}
private void bootstrapMember(InternalDistributedMember member) {
// Create and execute the function
Cache cache = CacheFactory.getAnyInstance();
Execution execution = FunctionService.onMember(member);
ResultCollector collector = execution.execute(this);
// Get the result. Nothing is being done with it.
try {
collector.getResult();
} catch (Exception e) {
// If an exception occurs in the function, log it.
cache.getLogger().warning("Caught unexpected exception:", e);
}
}
@Override
public String getId() {
return ID;
}
@Override
public boolean hasResult() {
return true;
}
@Override
public boolean isHA() {
return false;
}
@Override
public boolean optimizeForWrite() {
return false;
}
public int hashCode() {
// This method is only implemented so that multiple instances of this class
// don't get added as membership listeners.
return ID.hashCode();
}
public boolean equals(Object obj) {
// This method is only implemented so that multiple instances of this class
// don't get added as membership listeners.
if (this == obj) {
return true;
}
if (obj == null || !(obj instanceof BootstrappingFunction)) {
return false;
}
return true;
}
@Override
public void memberDeparted(DistributionManager distributionManager, InternalDistributedMember id,
boolean crashed) {}
@Override
public void memberJoined(DistributionManager distributionManager, InternalDistributedMember id) {
bootstrapMember(id);
}
@Override
public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id,
InternalDistributedMember whoSuspected, String reason) {}
@Override
public void quorumLost(DistributionManager distributionManager,
Set<InternalDistributedMember> internalDistributedMembers,
List<InternalDistributedMember> internalDistributedMembers2) {}
@Override
public void toData(DataOutput out) throws IOException {}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {}
}