| /* |
| * Copyright 2017 HugeGraph Authors |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package com.baidu.hugegraph.event; |
| |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| |
| import javax.annotation.Nullable; |
| |
| import org.slf4j.Logger; |
| |
| import com.baidu.hugegraph.iterator.ExtendableIterator; |
| import com.baidu.hugegraph.util.E; |
| import com.baidu.hugegraph.util.ExecutorUtil; |
| import com.baidu.hugegraph.util.Log; |
| import com.google.common.collect.ImmutableList; |
| |
| public class EventHub { |
| |
| private static final Logger LOG = Log.logger(EventHub.class); |
| |
| public static final String EVENT_WORKER = "event-worker-%d"; |
| public static final String ANY_EVENT = "*"; |
| |
| private static final List<EventListener> EMPTY = ImmutableList.of(); |
| |
| // Event executor |
| private static ExecutorService executor = null; |
| |
| private String name; |
| private Map<String, List<EventListener>> listeners; |
| |
| public EventHub() { |
| this("hub"); |
| } |
| |
| public EventHub(String name) { |
| LOG.debug("Create new EventHub: {}", name); |
| |
| this.name = name; |
| this.listeners = new ConcurrentHashMap<>(); |
| EventHub.init(1); |
| } |
| |
| public static synchronized void init(int poolSize) { |
| if (executor != null) { |
| return; |
| } |
| LOG.debug("Init pool(size {}) for EventHub", poolSize); |
| executor = ExecutorUtil.newFixedThreadPool(poolSize, EVENT_WORKER); |
| } |
| |
| public static synchronized boolean destroy(long timeout) |
| throws InterruptedException { |
| E.checkState(executor != null, "EventHub has not been initialized"); |
| LOG.debug("Destroy pool for EventHub"); |
| executor.shutdown(); |
| return executor.awaitTermination(timeout, TimeUnit.SECONDS); |
| } |
| |
| private static ExecutorService executor() { |
| ExecutorService e = executor; |
| E.checkState(e != null, "The event executor has been destroyed"); |
| return e; |
| } |
| |
| public String name() { |
| return this.name; |
| } |
| |
| public boolean containsListener(String event) { |
| List<EventListener> ls = this.listeners.get(event); |
| return ls != null && ls.size() > 0; |
| } |
| |
| public List<EventListener> listeners(String event) { |
| List<EventListener> ls = this.listeners.get(event); |
| return ls == null ? EMPTY : Collections.unmodifiableList(ls); |
| } |
| |
| public void listen(String event, EventListener listener) { |
| E.checkNotNull(event, "event"); |
| E.checkNotNull(listener, "event listener"); |
| |
| if (!this.listeners.containsKey(event)) { |
| this.listeners.putIfAbsent(event, new CopyOnWriteArrayList<>()); |
| } |
| List<EventListener> ls = this.listeners.get(event); |
| assert ls != null : this.listeners; |
| ls.add(listener); |
| } |
| |
| public List<EventListener> unlisten(String event) { |
| List<EventListener> ls = this.listeners.remove(event); |
| return ls == null ? EMPTY : Collections.unmodifiableList(ls); |
| } |
| |
| public int unlisten(String event, EventListener listener) { |
| List<EventListener> ls = this.listeners.get(event); |
| if (ls == null) { |
| return 0; |
| } |
| |
| int count = 0; |
| while (ls.remove(listener)) { |
| count++; |
| } |
| return count; |
| } |
| |
| public Future<Integer> notify(String event, @Nullable Object... args) { |
| @SuppressWarnings("resource") |
| ExtendableIterator<EventListener> all = new ExtendableIterator<>(); |
| |
| List<EventListener> ls = this.listeners.get(event); |
| if (ls != null && !ls.isEmpty()) { |
| all.extend(ls.iterator()); |
| } |
| List<EventListener> lsAny = this.listeners.get(ANY_EVENT); |
| if (lsAny != null && !lsAny.isEmpty()) { |
| all.extend(lsAny.iterator()); |
| } |
| |
| if (!all.hasNext()) { |
| return CompletableFuture.completedFuture(0); |
| } |
| |
| Event ev = new Event(this, event, args); |
| |
| // The submit will catch params: `all`(Listeners) and `ev`(Event) |
| return executor().submit(() -> { |
| int count = 0; |
| // Notify all listeners, and ignore the results |
| while (all.hasNext()) { |
| try { |
| all.next().event(ev); |
| count++; |
| } catch (Throwable ignored) { |
| LOG.warn("Failed to handle event: {}", ev, ignored); |
| } |
| } |
| return count; |
| }); |
| } |
| |
| public Object call(String event, @Nullable Object... args) { |
| List<EventListener> ls = this.listeners.get(event); |
| if (ls == null) { |
| throw new RuntimeException("Not found listener for: " + event); |
| } else if (ls.size() != 1) { |
| throw new RuntimeException("Too many listeners for: " + event); |
| } |
| EventListener listener = ls.get(0); |
| return listener.event(new Event(this, event, args)); |
| } |
| } |