blob: 3cd5cba0f2b758f14bca4c82adab25734a08697d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events.impl;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cluster.events.ClusterEventProducerBase;
import org.apache.solr.cluster.events.ClusterPropertiesChangedEvent;
import org.apache.solr.cluster.events.ClusterEvent;
import org.apache.solr.cluster.events.ClusterEventProducer;
import org.apache.solr.cluster.events.CollectionsAddedEvent;
import org.apache.solr.cluster.events.CollectionsRemovedEvent;
import org.apache.solr.cluster.events.NodesDownEvent;
import org.apache.solr.cluster.events.NodesUpEvent;
import org.apache.solr.common.cloud.CloudCollectionsListener;
import org.apache.solr.common.cloud.ClusterPropertiesListener;
import org.apache.solr.common.cloud.LiveNodesListener;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of {@link ClusterEventProducer}.
* <h2>Implementation notes</h2>
* <p>For each cluster event relevant listeners are always invoked sequentially
* (not in parallel) and in arbitrary order. This means that if any listener blocks the
* processing other listeners may be invoked much later or not at all.</p>
*/
public class DefaultClusterEventProducer extends ClusterEventProducerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private LiveNodesListener liveNodesListener;
private CloudCollectionsListener cloudCollectionsListener;
private ClusterPropertiesListener clusterPropertiesListener;
private ZkController zkController;
private final Set<ClusterEvent.EventType> supportedEvents =
new HashSet<>(Arrays.asList(
ClusterEvent.EventType.NODES_DOWN,
ClusterEvent.EventType.NODES_UP,
ClusterEvent.EventType.COLLECTIONS_ADDED,
ClusterEvent.EventType.COLLECTIONS_REMOVED,
ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED
));
public DefaultClusterEventProducer(CoreContainer cc) {
super(cc);
}
// ClusterSingleton lifecycle methods
@Override
public synchronized void start() {
if (cc == null) {
liveNodesListener = null;
cloudCollectionsListener = null;
clusterPropertiesListener = null;
state = State.STOPPED;
return;
}
if (state == State.RUNNING) {
log.warn("Double start() invoked on {}, ignoring", this);
return;
}
state = State.STARTING;
this.zkController = this.cc.getZkController();
// clean up any previous instances
doStop();
// register liveNodesListener
liveNodesListener = (oldNodes, newNodes) -> {
// already closed but still registered
if (state == State.STOPPING || state == State.STOPPED) {
// remove the listener
return true;
}
// spurious event, ignore but keep listening
if (oldNodes.equals(newNodes)) {
return false;
}
final Instant now = Instant.now();
final Set<String> downNodes = new HashSet<>(oldNodes);
downNodes.removeAll(newNodes);
if (!downNodes.isEmpty()) {
log.debug("creating nodes down event for nodes: {}", downNodes);
fireEvent(new NodesDownEvent() {
@Override
public Iterator<String> getNodeNames() {
return downNodes.iterator();
}
@Override
public Instant getTimestamp() {
return now;
}
});
}
final Set<String> upNodes = new HashSet<>(newNodes);
upNodes.removeAll(oldNodes);
if (!upNodes.isEmpty()) {
log.debug("creating nodes up event for nodes: {}", upNodes);
fireEvent(new NodesUpEvent() {
@Override
public Iterator<String> getNodeNames() {
return upNodes.iterator();
}
@Override
public Instant getTimestamp() {
return now;
}
});
}
return false;
};
zkController.zkStateReader.registerLiveNodesListener(liveNodesListener);
cloudCollectionsListener = ((oldCollections, newCollections) -> {
if (oldCollections.equals(newCollections)) {
return;
}
final Instant now = Instant.now();
final Set<String> removed = new HashSet<>(oldCollections);
removed.removeAll(newCollections);
if (!removed.isEmpty()) {
log.debug("creating collections removed event for collections: {}", removed);
fireEvent(new CollectionsRemovedEvent() {
@Override
public Iterator<String> getCollectionNames() {
return removed.iterator();
}
@Override
public Instant getTimestamp() {
return now;
}
});
}
final Set<String> added = new HashSet<>(newCollections);
added.removeAll(oldCollections);
if (!added.isEmpty()) {
log.debug("creating collections added event for collections: {}", added);
fireEvent(new CollectionsAddedEvent() {
@Override
public Iterator<String> getCollectionNames() {
return added.iterator();
}
@Override
public Instant getTimestamp() {
return now;
}
});
}
});
zkController.zkStateReader.registerCloudCollectionsListener(cloudCollectionsListener);
clusterPropertiesListener = (newProperties) -> {
fireEvent(new ClusterPropertiesChangedEvent() {
final Instant now = Instant.now();
@Override
public Map<String, Object> getNewClusterProperties() {
return newProperties;
}
@Override
public Instant getTimestamp() {
return now;
}
});
return false;
};
zkController.zkStateReader.registerClusterPropertiesListener(clusterPropertiesListener);
// XXX register collection state listener?
// XXX not sure how to efficiently monitor for REPLICA_DOWN events
state = State.RUNNING;
}
@Override
public Set<ClusterEvent.EventType> getSupportedEventTypes() {
return supportedEvents;
}
@Override
public synchronized void stop() {
state = State.STOPPING;
doStop();
state = State.STOPPED;
}
private void doStop() {
if (liveNodesListener != null) {
zkController.zkStateReader.removeLiveNodesListener(liveNodesListener);
}
if (cloudCollectionsListener != null) {
zkController.zkStateReader.removeCloudCollectionsListener(cloudCollectionsListener);
}
if (clusterPropertiesListener != null) {
zkController.zkStateReader.removeClusterPropertiesListener(clusterPropertiesListener);
}
liveNodesListener = null;
cloudCollectionsListener = null;
clusterPropertiesListener = null;
}
@Override
public void close() throws IOException {
stop();
super.close();
}
}