blob: d9725dcc475f267054221ee0e75deaeabd0da953 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.igfs;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.IOException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
import org.apache.ignite.igfs.IgfsIpcEndpointType;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.igfs.common.IgfsControlResponse;
import org.apache.ignite.internal.igfs.common.IgfsDataInputStream;
import org.apache.ignite.internal.igfs.common.IgfsDataOutputStream;
import org.apache.ignite.internal.igfs.common.IgfsIpcCommand;
import org.apache.ignite.internal.igfs.common.IgfsMarshaller;
import org.apache.ignite.internal.igfs.common.IgfsMessage;
import org.apache.ignite.internal.util.ipc.IpcEndpoint;
import org.apache.ignite.internal.util.ipc.IpcServerEndpoint;
import org.apache.ignite.internal.util.ipc.loopback.IpcServerTcpEndpoint;
import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.spi.IgnitePortProtocol.TCP;
/**
* IGFS server. Handles requests passed from IGFS clients.
*/
public class IgfsServer {
/** IGFS context. */
private final IgfsContext igfsCtx;
/** Logger. */
private final IgniteLogger log;
/** IGFS marshaller. */
private final IgfsMarshaller marsh;
/** Endpoint configuration. */
private final IgfsIpcEndpointConfiguration endpointCfg;
/** Server endpoint. */
private IpcServerEndpoint srvEndpoint;
/** Server message handler. */
private IgfsServerHandler hnd;
/** Accept worker. */
private AcceptWorker acceptWorker;
/** Started client workers. */
private ConcurrentLinkedDeque8<ClientWorker> clientWorkers = new ConcurrentLinkedDeque8<>();
/** Flag indicating if this a management endpoint. */
private final boolean mgmt;
/**
* Constructs igfs server manager.
* @param igfsCtx IGFS context.
* @param endpointCfg Endpoint configuration to start.
* @param mgmt Management flag - if true, server is intended to be started for Visor.
*/
public IgfsServer(IgfsContext igfsCtx, IgfsIpcEndpointConfiguration endpointCfg, boolean mgmt) {
assert igfsCtx != null;
assert endpointCfg != null;
this.endpointCfg = endpointCfg;
this.igfsCtx = igfsCtx;
this.mgmt = mgmt;
log = igfsCtx.kernalContext().log(IgfsServer.class);
marsh = new IgfsMarshaller();
}
/**
* Starts this server.
*
* @throws IgniteCheckedException If failed.
*/
public void start() throws IgniteCheckedException {
srvEndpoint = createEndpoint(endpointCfg, mgmt);
if (U.isWindows() && srvEndpoint instanceof IpcSharedMemoryServerEndpoint)
throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.class.getSimpleName() +
" should not be configured on Windows (configure " +
IpcServerTcpEndpoint.class.getSimpleName() + ")");
if (srvEndpoint instanceof IpcServerTcpEndpoint) {
IpcServerTcpEndpoint srvEndpoint0 = (IpcServerTcpEndpoint)srvEndpoint;
srvEndpoint0.setManagement(mgmt);
if (srvEndpoint0.getHost() == null) {
if (mgmt) {
String locHostName = igfsCtx.kernalContext().config().getLocalHost();
try {
srvEndpoint0.setHost(U.resolveLocalHost(locHostName).getHostAddress());
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to resolve local host: " + locHostName, e);
}
}
else
// Bind non-management endpoint to 127.0.0.1 by default.
srvEndpoint0.setHost("127.0.0.1");
}
}
igfsCtx.kernalContext().resource().injectGeneric(srvEndpoint);
srvEndpoint.start();
// IpcServerEndpoint.getPort contract states return -1 if there is no port to be registered.
if (srvEndpoint.getPort() >= 0)
igfsCtx.kernalContext().ports().registerPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass());
hnd = new IgfsIpcHandler(igfsCtx, endpointCfg, mgmt);
// Start client accept worker.
acceptWorker = new AcceptWorker();
}
/**
* Create server IPC endpoint.
*
* @param endpointCfg Endpoint configuration.
* @param mgmt Management flag.
* @return Server endpoint.
* @throws IgniteCheckedException If failed.
*/
private IpcServerEndpoint createEndpoint(IgfsIpcEndpointConfiguration endpointCfg, boolean mgmt)
throws IgniteCheckedException {
A.notNull(endpointCfg, "endpointCfg");
IgfsIpcEndpointType typ = endpointCfg.getType();
if (typ == null)
throw new IgniteCheckedException("Failed to create server endpoint (type is not specified)");
switch (typ) {
case SHMEM: {
IpcSharedMemoryServerEndpoint endpoint =
new IpcSharedMemoryServerEndpoint(igfsCtx.kernalContext().config().getWorkDirectory());
endpoint.setPort(endpointCfg.getPort());
endpoint.setSize(endpointCfg.getMemorySize());
endpoint.setTokenDirectoryPath(endpointCfg.getTokenDirectoryPath());
return endpoint;
}
case TCP: {
IpcServerTcpEndpoint endpoint = new IpcServerTcpEndpoint();
endpoint.setHost(endpointCfg.getHost());
endpoint.setPort(endpointCfg.getPort());
endpoint.setManagement(mgmt);
return endpoint;
}
default:
throw new IgniteCheckedException("Failed to create server endpoint (type is unknown): " + typ);
}
}
/**
* Callback that is invoked when kernal is ready.
*/
public void onKernalStart() {
// Accept connections only when grid is ready.
if (srvEndpoint != null)
new IgniteThread(acceptWorker).start();
}
/**
* Stops this server.
*
* @param cancel Cancel flag.
*/
public void stop(boolean cancel) {
// Skip if did not start.
if (srvEndpoint == null)
return;
// Stop accepting new client connections.
U.cancel(acceptWorker);
U.join(acceptWorker, log);
// Stop server handler, no more requests on existing connections will be processed.
try {
hnd.stop();
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to stop IGFS server handler (will close client connections anyway).", e);
}
// Stop existing client connections.
for (ClientWorker worker : clientWorkers)
U.cancel(worker);
U.join(clientWorkers, log);
// IpcServerEndpoint.getPort contract states return -1 if there is no port to be registered.
if (srvEndpoint.getPort() >= 0)
igfsCtx.kernalContext().ports().deregisterPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass());
try {
igfsCtx.kernalContext().resource().cleanupGeneric(srvEndpoint);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to cleanup server endpoint.", e);
}
}
/**
* Gets IPC server endpoint.
*
* @return IPC server endpoint.
*/
public IpcServerEndpoint getIpcServerEndpoint() {
return srvEndpoint;
}
/**
* Client reader thread.
*/
private class ClientWorker extends GridWorker {
/** Connected client endpoint. */
private final IpcEndpoint endpoint;
/** Data output stream. */
private final IgfsDataOutputStream out;
/** Client session object. */
private final IgfsClientSession ses;
/** Queue node for fast unlink. */
private ConcurrentLinkedDeque8.Node<ClientWorker> node;
/**
* Creates client worker.
*
* @param idx Worker index for worker thread naming.
* @param endpoint Connected client endpoint.
* @throws IgniteCheckedException If endpoint output stream cannot be obtained.
*/
protected ClientWorker(IpcEndpoint endpoint, int idx) throws IgniteCheckedException {
super(igfsCtx.kernalContext().igniteInstanceName(), "igfs-client-worker-" + idx, IgfsServer.this.log);
this.endpoint = endpoint;
ses = new IgfsClientSession();
out = new IgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream()));
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
try {
IgfsDataInputStream dis = new IgfsDataInputStream(endpoint.inputStream());
byte[] hdr = new byte[IgfsMarshaller.HEADER_SIZE];
boolean first = true;
while (!Thread.currentThread().isInterrupted()) {
dis.readFully(hdr);
final long reqId = U.bytesToLong(hdr, 0);
int ordinal = U.bytesToInt(hdr, 8);
if (first) { // First message must be HANDSHAKE.
if (reqId != 0 || ordinal != IgfsIpcCommand.HANDSHAKE.ordinal()) {
if (log.isDebugEnabled())
log.debug("IGFS IPC handshake failed [reqId=" + reqId + ", ordinal=" + ordinal + ']');
return;
}
first = false;
}
final IgfsIpcCommand cmd = IgfsIpcCommand.valueOf(ordinal);
IgfsMessage msg = marsh.unmarshall(cmd, hdr, dis);
IgniteInternalFuture<IgfsMessage> fut = hnd.handleAsync(ses, msg, dis);
// If fut is null, no response is required.
if (fut != null) {
if (fut.isDone()) {
IgfsMessage res;
try {
res = fut.get();
}
catch (IgniteCheckedException e) {
res = new IgfsControlResponse();
((IgfsControlResponse)res).error(e);
}
try {
synchronized (out) {
// Reuse header.
IgfsMarshaller.fillHeader(hdr, reqId, res.command());
marsh.marshall(res, hdr, out);
out.flush();
}
}
catch (IOException | IgniteCheckedException e) {
shutdown0(e);
}
}
else {
fut.listen(new CIX1<IgniteInternalFuture<IgfsMessage>>() {
@Override public void applyx(IgniteInternalFuture<IgfsMessage> fut) {
IgfsMessage res;
try {
res = fut.get();
}
catch (IgniteCheckedException e) {
res = new IgfsControlResponse();
((IgfsControlResponse)res).error(e);
}
try {
synchronized (out) {
byte[] hdr = IgfsMarshaller.createHeader(reqId, res.command());
marsh.marshall(res, hdr, out);
out.flush();
}
}
catch (IOException | IgniteCheckedException e) {
shutdown0(e);
}
}
});
}
}
}
}
catch (EOFException ignored) {
// Client closed connection.
}
catch (IgniteCheckedException | IOException e) {
if (!isCancelled())
U.error(log, "Failed to read data from client (will close connection)", e);
}
finally {
onFinished();
}
}
/**
* @param node Node in queue for this worker.
*/
public void node(ConcurrentLinkedDeque8.Node<ClientWorker> node) {
this.node = node;
}
/** {@inheritDoc} */
@Override public void cancel() {
super.cancel();
shutdown0(null);
}
/**
* @param e Optional exception occurred while stopping this
*/
private void shutdown0(@Nullable Throwable e) {
if (!isCancelled()) {
if (e != null)
U.error(log, "Stopping client reader due to exception: " + endpoint, e);
}
U.closeQuiet(out);
endpoint.close();
}
/**
* Final resource cleanup.
*/
private void onFinished() {
// Second close is no-op, if closed manually.
U.closeQuiet(out);
endpoint.close();
// Finally, remove from queue.
if (clientWorkers.unlinkx(node))
hnd.onClosed(ses);
}
}
/**
* Accept worker.
*/
private class AcceptWorker extends GridWorker {
/** Accept index. */
private int acceptCnt;
/**
* Creates accept worker.
*/
protected AcceptWorker() {
super(igfsCtx.kernalContext().igniteInstanceName(), "igfs-accept-worker", IgfsServer.this.log);
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
try {
while (!Thread.currentThread().isInterrupted()) {
IpcEndpoint client = srvEndpoint.accept();
if (log.isDebugEnabled())
log.debug("IGFS client connected [igfsName=" + igfsCtx.kernalContext().igniteInstanceName() +
", client=" + client + ']');
ClientWorker worker = new ClientWorker(client, acceptCnt++);
IgniteThread workerThread = new IgniteThread(worker);
ConcurrentLinkedDeque8.Node<ClientWorker> node = clientWorkers.addx(worker);
worker.node(node);
workerThread.start();
}
}
catch (IgniteCheckedException e) {
if (!isCancelled())
U.error(log, "Failed to accept client IPC connection (will shutdown accept thread).", e);
}
finally {
srvEndpoint.close();
}
}
/** {@inheritDoc} */
@Override public void cancel() {
super.cancel();
srvEndpoint.close();
}
}
}