| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.karaf.decanter.collector.socket; |
| |
| import java.io.*; |
| import java.net.InetAddress; |
| import java.net.ServerSocket; |
| import java.net.Socket; |
| import java.util.Dictionary; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.karaf.decanter.api.marshaller.Unmarshaller; |
| import org.osgi.service.component.ComponentContext; |
| import org.osgi.service.component.annotations.Activate; |
| import org.osgi.service.component.annotations.Component; |
| import org.osgi.service.component.annotations.ConfigurationPolicy; |
| import org.osgi.service.component.annotations.Deactivate; |
| import org.osgi.service.component.annotations.Reference; |
| import org.osgi.service.event.Event; |
| import org.osgi.service.event.EventAdmin; |
| import org.osgi.service.event.EventConstants; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| @Component ( |
| name = "org.apache.karaf.decanter.collector.socket", |
| configurationPolicy = ConfigurationPolicy.REQUIRE, |
| immediate = true) |
| public class SocketCollector implements Closeable, Runnable { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(SocketCollector.class); |
| |
| private ServerSocket serverSocket; |
| private EventAdmin eventAdmin; |
| private boolean open; |
| private ExecutorService executor; |
| private Dictionary<String, Object> properties; |
| private String eventAdminTopic; |
| private EventAdmin dispatcher; |
| private Unmarshaller unmarshaller; |
| |
| @SuppressWarnings("unchecked") |
| @Activate |
| public void activate(ComponentContext context) throws IOException { |
| this.properties = context.getProperties(); |
| int port = Integer.parseInt(getProperty(this.properties, "port", "34343")); |
| int workers = Integer.parseInt(getProperty(this.properties, "workers", "10")); |
| eventAdminTopic = getProperty(this.properties, EventConstants.EVENT_TOPIC, "decanter/collect/socket"); |
| this.serverSocket = new ServerSocket(port); |
| // adding 1 for serverSocket handling |
| this.executor = Executors.newFixedThreadPool(workers + 1); |
| this.executor.execute(this); |
| this.open = true; |
| } |
| |
| private String getProperty(Dictionary<String, Object> properties, String key, String defaultValue) { |
| return (properties.get(key) != null) ? (String)properties.get(key) : defaultValue; |
| } |
| |
| @Override |
| public void run() { |
| while (open) { |
| try { |
| Socket socket = serverSocket.accept(); |
| LOGGER.debug("Connected to client at {}", socket.getInetAddress()); |
| this.executor.execute(new SocketRunnable(socket)); |
| } catch (IOException e) { |
| LOGGER.warn("Exception receiving log.", e); |
| } |
| } |
| } |
| |
| @Deactivate |
| @Override |
| public void close() throws IOException { |
| this.open = false; |
| try { |
| this.executor.shutdown(); |
| try { |
| this.executor.awaitTermination(5, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| // nothing to do |
| } |
| this.executor.shutdownNow(); |
| } catch (Exception e) { |
| // nothing to do |
| } |
| serverSocket.close(); |
| } |
| |
| @Reference |
| public void setEventAdmin(EventAdmin eventAdmin) { |
| this.eventAdmin = eventAdmin; |
| } |
| |
| private class SocketRunnable implements Runnable { |
| |
| private Socket clientSocket; |
| |
| public SocketRunnable(Socket clientSocket) { |
| this.clientSocket = clientSocket; |
| } |
| |
| public void run() { |
| try (BufferedInputStream bis = new BufferedInputStream(clientSocket.getInputStream())) { |
| Map<String, Object> data = new HashMap<>(); |
| data.put("hostAddress", InetAddress.getLocalHost().getHostAddress()); |
| data.put("hostName", InetAddress.getLocalHost().getHostName()); |
| data.put("type", "socket"); |
| String karafName = System.getProperty("karaf.name"); |
| if (karafName != null) { |
| data.put("karafName", karafName); |
| } |
| try { |
| data.putAll(unmarshaller.unmarshal(bis)); |
| } catch (Exception e) { |
| // nothing to do |
| } |
| Event event = new Event(eventAdminTopic, data); |
| dispatcher.postEvent(event); |
| } catch (EOFException e) { |
| LOGGER.warn("Client closed the connection", e); |
| } catch (IOException e) { |
| LOGGER.warn("Exception receiving data", e); |
| } |
| try { |
| clientSocket.close(); |
| } catch (IOException e) { |
| LOGGER.info("Error closing socket", e); |
| } |
| } |
| } |
| |
| @Reference |
| public void setDispatcher(EventAdmin dispatcher) { |
| this.dispatcher = dispatcher; |
| } |
| |
| @Reference |
| public void setUnmarshaller(Unmarshaller unmarshaller) { |
| this.unmarshaller = unmarshaller; |
| } |
| |
| } |