blob: 94b72de37c369e65faf9dc13a79fa4f7c4225489 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.igfs;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
import org.apache.ignite.internal.util.ipc.IpcEndpointBindException;
import org.apache.ignite.internal.util.ipc.IpcServerEndpoint;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.thread.IgniteThread;
import static org.apache.ignite.igfs.IgfsIpcEndpointType.TCP;
/**
* IGFS server manager.
*/
public class IgfsServerManager extends IgfsManager {
/** IPC server rebind interval. */
private static final long REBIND_INTERVAL = 3000;
/** Collection of servers to maintain. */
private Collection<IgfsServer> srvrs;
/** Server port binders. */
private BindWorker bindWorker;
/** Kernal start latch. */
private CountDownLatch kernalStartLatch = new CountDownLatch(1);
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
FileSystemConfiguration igfsCfg = igfsCtx.configuration();
if (igfsCfg.isIpcEndpointEnabled()) {
IgfsIpcEndpointConfiguration ipcCfg = igfsCfg.getIpcEndpointConfiguration();
if (ipcCfg == null)
ipcCfg = new IgfsIpcEndpointConfiguration();
bind(ipcCfg, /*management*/false);
}
if (igfsCfg.getManagementPort() >= 0) {
IgfsIpcEndpointConfiguration mgmtIpcCfg = new IgfsIpcEndpointConfiguration();
mgmtIpcCfg.setType(TCP);
mgmtIpcCfg.setPort(igfsCfg.getManagementPort());
bind(mgmtIpcCfg, /*management*/true);
}
if (bindWorker != null)
new IgniteThread(bindWorker).start();
}
/**
* Tries to start server endpoint with specified configuration. If failed, will print warning and start a thread
* that will try to periodically start this endpoint.
*
* @param endpointCfg Endpoint configuration to start.
* @param mgmt {@code True} if endpoint is management.
* @throws IgniteCheckedException If failed.
*/
private void bind(final IgfsIpcEndpointConfiguration endpointCfg, final boolean mgmt) throws IgniteCheckedException {
if (srvrs == null)
srvrs = new ConcurrentLinkedQueue<>();
IgfsServer ipcSrv = new IgfsServer(igfsCtx, endpointCfg, mgmt);
try {
ipcSrv.start();
srvrs.add(ipcSrv);
}
catch (IpcEndpointBindException ignored) {
int port = ipcSrv.getIpcServerEndpoint().getPort();
String portMsg = port != -1 ? " Failed to bind to port (is port already in use?): " + port : "";
U.warn(log, "Failed to start IGFS " + (mgmt ? "management " : "") + "endpoint " +
"(will retry every " + (REBIND_INTERVAL / 1000) + "s)." +
portMsg);
if (bindWorker == null)
bindWorker = new BindWorker();
bindWorker.addConfiguration(endpointCfg, mgmt);
}
}
/**
* @return Collection of active endpoints.
*/
public Collection<IpcServerEndpoint> endpoints() {
return F.viewReadOnly(srvrs, new C1<IgfsServer, IpcServerEndpoint>() {
@Override public IpcServerEndpoint apply(IgfsServer e) {
return e.getIpcServerEndpoint();
}
});
}
/** {@inheritDoc} */
@Override protected void onKernalStart0() throws IgniteCheckedException {
if (!F.isEmpty(srvrs)) {
for (IgfsServer srv : srvrs)
srv.onKernalStart();
}
kernalStartLatch.countDown();
}
/** {@inheritDoc} */
@Override protected void stop0(boolean cancel) {
// Safety.
kernalStartLatch.countDown();
if (bindWorker != null) {
bindWorker.cancel();
U.join(bindWorker, log);
}
if (!F.isEmpty(srvrs)) {
for (IgfsServer srv : srvrs)
srv.stop(cancel);
}
}
/**
* Bind worker.
*/
@SuppressWarnings("BusyWait")
private class BindWorker extends GridWorker {
/** Configurations to bind. */
private Collection<IgniteBiTuple<IgfsIpcEndpointConfiguration, Boolean>> bindCfgs = new LinkedList<>();
/**
* Constructor.
*/
private BindWorker() {
super(igfsCtx.kernalContext().igniteInstanceName(), "bind-worker",
igfsCtx.kernalContext().log(IgfsServerManager.class));
}
/**
* Adds configuration to bind on. Should not be called after thread start.
*
* @param cfg Configuration.
* @param mgmt Management flag.
*/
public void addConfiguration(IgfsIpcEndpointConfiguration cfg, boolean mgmt) {
bindCfgs.add(F.t(cfg, mgmt));
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
kernalStartLatch.await();
while (!isCancelled()) {
Thread.sleep(REBIND_INTERVAL);
Iterator<IgniteBiTuple<IgfsIpcEndpointConfiguration, Boolean>> it = bindCfgs.iterator();
while (it.hasNext()) {
IgniteBiTuple<IgfsIpcEndpointConfiguration, Boolean> cfg = it.next();
IgfsServer ipcSrv = new IgfsServer(igfsCtx, cfg.get1(), cfg.get2());
try {
ipcSrv.start();
ipcSrv.onKernalStart();
srvrs.add(ipcSrv);
it.remove();
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to bind IGFS endpoint [cfg=" + cfg + ", err=" + e.getMessage() + ']');
}
}
if (bindCfgs.isEmpty())
break;
}
}
}
}