blob: f75377e992158b0846ecf8870fd6b8ecaffa7df5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.core.observer.v1;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import com.google.common.collect.Iterables;
import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.Observer;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.observer.Observers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("deprecation")
class ObserversV1 implements Observers {
private static final Logger log = LoggerFactory.getLogger(ObserversV1.class);
private Environment env;
Map<Column, List<Observer>> observers = new HashMap<>();
Map<Column, ObserverSpecification> strongObservers;
Map<Column, ObserverSpecification> weakObservers;
Map<Column, String> aliases;
private List<Observer> getObserverList(Column col) {
List<Observer> observerList;
synchronized (observers) {
observerList = observers.get(col);
if (observerList == null) {
observerList = new ArrayList<>();
observers.put(col, observerList);
}
}
return observerList;
}
public ObserversV1(Environment env, Map<Column, ObserverSpecification> strongObservers,
Map<Column, ObserverSpecification> weakObservers) {
this.env = env;
this.strongObservers = strongObservers;
this.weakObservers = weakObservers;
this.aliases = new HashMap<>();
for (Entry<Column, ObserverSpecification> e : Iterables.concat(strongObservers.entrySet(),
weakObservers.entrySet())) {
ObserverSpecification observerConfig = e.getValue();
try {
String alias =
Class.forName(observerConfig.getClassName()).asSubclass(Observer.class).getSimpleName();
aliases.put(e.getKey(), alias);
} catch (ClassNotFoundException e1) {
throw new RuntimeException(e1);
}
}
}
public Observer getObserver(Column col) {
List<Observer> observerList;
observerList = getObserverList(col);
synchronized (observerList) {
if (observerList.size() > 0) {
return observerList.remove(observerList.size() - 1);
}
}
Observer observer = null;
ObserverSpecification observerConfig = strongObservers.get(col);
if (observerConfig == null) {
observerConfig = weakObservers.get(col);
}
if (observerConfig != null) {
try {
observer =
Class.forName(observerConfig.getClassName()).asSubclass(Observer.class).newInstance();
observer.init(new ObserverContext(env, observerConfig.getConfiguration()));
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
if (!observer.getObservedColumn().getColumn().equals(col)) {
throw new IllegalStateException("Mismatch between configured column and class column "
+ observerConfig.getClassName() + " " + col + " "
+ observer.getObservedColumn().getColumn());
}
}
return observer;
}
public void returnObserver(Observer observer) {
List<Observer> olist = getObserverList(observer.getObservedColumn().getColumn());
synchronized (olist) {
olist.add(observer);
}
}
@Override
public void close() {
if (observers == null) {
return;
}
synchronized (observers) {
for (List<Observer> olist : observers.values()) {
synchronized (olist) {
for (Observer observer : olist) {
try {
observer.close();
} catch (Exception e) {
log.error("Failed to close observer", e);
}
}
olist.clear();
}
}
}
observers = null;
}
@Override
public String getObserverId(Column col) {
return aliases.get(col);
}
}