| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.commons.service; |
| |
| import org.apache.iotdb.commons.conf.IoTDBConstant; |
| import org.apache.iotdb.commons.exception.StartupException; |
| |
| import org.apache.thrift.TProcessor; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.lang.reflect.InvocationTargetException; |
| import java.util.concurrent.CountDownLatch; |
| |
| public abstract class ThriftService implements IService { |
| |
| private static final Logger logger = LoggerFactory.getLogger(ThriftService.class); |
| |
| public static final String STATUS_UP = "UP"; |
| public static final String STATUS_DOWN = "DOWN"; |
| |
| protected String mbeanName = |
| String.format( |
| "%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, getID().getJmxName()); |
| protected AbstractThriftServiceThread thriftServiceThread; |
| protected TProcessor processor; |
| |
| private CountDownLatch stopLatch; |
| |
| public String getRPCServiceStatus() { |
| if (thriftServiceThread == null) { |
| logger.debug("Start latch is null when getting status"); |
| } else { |
| logger.debug("Start status is {} when getting status", thriftServiceThread.isServing()); |
| } |
| if (stopLatch == null) { |
| logger.debug("Stop latch is null when getting status"); |
| } else { |
| logger.debug("Stop latch is {} when getting status", stopLatch.getCount()); |
| } |
| |
| if (thriftServiceThread != null && thriftServiceThread.isServing()) { |
| return STATUS_UP; |
| } else { |
| return STATUS_DOWN; |
| } |
| } |
| |
| @Override |
| public void start() throws StartupException { |
| JMXService.registerMBean(this, mbeanName); |
| startService(); |
| } |
| |
| @Override |
| public void stop() { |
| stopService(); |
| JMXService.deregisterMBean(mbeanName); |
| } |
| |
| boolean setSyncedImpl = false; |
| boolean setAsyncedImpl = false; |
| |
| public void initSyncedServiceImpl(Object serviceImpl) { |
| setSyncedImpl = true; |
| } |
| |
| public void initAsyncedServiceImpl(Object serviceImpl) { |
| setAsyncedImpl = true; |
| } |
| |
| public abstract void initTProcessor() |
| throws ClassNotFoundException, IllegalAccessException, InstantiationException, |
| NoSuchMethodException, InvocationTargetException; |
| |
| public abstract void initThriftServiceThread() |
| throws IllegalAccessException, InstantiationException, ClassNotFoundException; |
| |
| public abstract String getBindIP(); |
| |
| public abstract int getBindPort(); |
| |
| @SuppressWarnings("squid:S2276") |
| public void startService() throws StartupException { |
| if (STATUS_UP.equals(getRPCServiceStatus())) { |
| logger.info( |
| "{}: {} has been already running now", |
| IoTDBConstant.GLOBAL_DB_NAME, |
| this.getID().getName()); |
| return; |
| } |
| logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName()); |
| try { |
| reset(); |
| initTProcessor(); |
| if (!setSyncedImpl && !setAsyncedImpl) { |
| throw new StartupException( |
| getID().getName(), "At least one service implementataion should be set."); |
| } |
| initThriftServiceThread(); |
| thriftServiceThread.setThreadStopLatch(stopLatch); |
| thriftServiceThread.start(); |
| |
| while (!thriftServiceThread.isServing()) { |
| // sleep 100ms for waiting the rpc server start. |
| Thread.sleep(100); |
| } |
| } catch (InterruptedException |
| | ClassNotFoundException |
| | IllegalAccessException |
| | InstantiationException |
| | NoSuchMethodException |
| | InvocationTargetException e) { |
| Thread.currentThread().interrupt(); |
| throw new StartupException(this.getID().getName(), e.getMessage()); |
| } |
| |
| logger.info( |
| "{}: start {} successfully, listening on ip {} port {}", |
| IoTDBConstant.GLOBAL_DB_NAME, |
| this.getID().getName(), |
| getBindIP(), |
| getBindPort()); |
| } |
| |
| private void reset() { |
| thriftServiceThread = null; |
| stopLatch = new CountDownLatch(1); |
| } |
| |
| public void restartService() throws StartupException { |
| stopService(); |
| startService(); |
| } |
| |
| public void stopService() { |
| if (STATUS_DOWN.equals(getRPCServiceStatus())) { |
| logger.info("{}: {} isn't running now", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName()); |
| return; |
| } |
| logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName()); |
| if (thriftServiceThread != null) { |
| thriftServiceThread.close(); |
| } |
| try { |
| stopLatch.await(); |
| reset(); |
| logger.info( |
| "{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName()); |
| } catch (InterruptedException e) { |
| logger.error( |
| "{}: close {} failed because: ", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |