blob: 9388a8eca682a1ded30d248da6b63cd5c61161eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.odbc;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.OdbcConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.HostAndPortRange;
import org.apache.ignite.internal.util.nio.GridNioAsyncNotifyFilter;
import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
import org.apache.ignite.internal.util.nio.GridNioFilter;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.spi.IgnitePortProtocol;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import java.net.InetAddress;
import java.nio.ByteOrder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
/**
* ODBC processor.
*/
public class OdbcProcessor extends GridProcessorAdapter {
/** Default number of selectors. */
private static final int DFLT_SELECTOR_CNT = Math.min(4, Runtime.getRuntime().availableProcessors());
/** Default TCP_NODELAY flag. */
private static final boolean DFLT_TCP_NODELAY = true;
/** Default TCP direct buffer flag. */
private static final boolean DFLT_TCP_DIRECT_BUF = false;
/** Busy lock. */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
/** ODBC TCP Server. */
private GridNioServer<byte[]> srv;
/** ODBC executor service. */
private ExecutorService odbcExecSvc;
/**
* @param ctx Kernal context.
*/
public OdbcProcessor(GridKernalContext ctx) {
super(ctx);
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
IgniteConfiguration cfg = ctx.config();
OdbcConfiguration odbcCfg = cfg.getOdbcConfiguration();
if (odbcCfg != null) {
try {
Marshaller marsh = cfg.getMarshaller();
if (marsh != null && !(marsh instanceof BinaryMarshaller))
throw new IgniteCheckedException("ODBC can only be used with BinaryMarshaller (please set it " +
"through IgniteConfiguration.setMarshaller())");
HostAndPortRange hostPort;
if (F.isEmpty(odbcCfg.getEndpointAddress())) {
hostPort = new HostAndPortRange(OdbcConfiguration.DFLT_TCP_HOST,
OdbcConfiguration.DFLT_TCP_PORT_FROM,
OdbcConfiguration.DFLT_TCP_PORT_TO
);
}
else {
hostPort = HostAndPortRange.parse(odbcCfg.getEndpointAddress(),
OdbcConfiguration.DFLT_TCP_PORT_FROM,
OdbcConfiguration.DFLT_TCP_PORT_TO,
"Failed to parse ODBC endpoint address"
);
}
assertParameter(odbcCfg.getThreadPoolSize() > 0, "threadPoolSize > 0");
odbcExecSvc = new IgniteThreadPoolExecutor(
"odbc",
cfg.getGridName(),
odbcCfg.getThreadPoolSize(),
odbcCfg.getThreadPoolSize(),
0,
new LinkedBlockingQueue<Runnable>());
InetAddress host;
try {
host = InetAddress.getByName(hostPort.host());
}
catch (Exception e) {
throw new IgniteCheckedException("Failed to resolve ODBC host: " + hostPort.host(), e);
}
Exception lastErr = null;
for (int port = hostPort.portFrom(); port <= hostPort.portTo(); port++) {
try {
GridNioFilter[] filters = new GridNioFilter[] {
new GridNioAsyncNotifyFilter(ctx.gridName(), odbcExecSvc, log) {
@Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException {
proceedSessionOpened(ses);
}
},
new GridNioCodecFilter(new OdbcBufferedParser(), log, false)
};
GridNioServer<byte[]> srv0 = GridNioServer.<byte[]>builder()
.address(host)
.port(port)
.listener(new OdbcNioListener(ctx, busyLock, odbcCfg.getMaxOpenCursors()))
.logger(log)
.selectorCount(DFLT_SELECTOR_CNT)
.gridName(ctx.gridName())
.tcpNoDelay(DFLT_TCP_NODELAY)
.directBuffer(DFLT_TCP_DIRECT_BUF)
.byteOrder(ByteOrder.nativeOrder())
.socketSendBufferSize(odbcCfg.getSocketSendBufferSize())
.socketReceiveBufferSize(odbcCfg.getSocketReceiveBufferSize())
.filters(filters)
.directMode(false)
.build();
srv0.start();
srv = srv0;
ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass());
log.info("ODBC processor has started on TCP port " + port);
lastErr = null;
break;
}
catch (Exception e) {
lastErr = e;
}
}
assert (srv != null && lastErr == null) || (srv == null && lastErr != null);
if (lastErr != null)
throw new IgniteCheckedException("Failed to bind to any [host:port] from the range [" +
"address=" + hostPort + ", lastErr=" + lastErr + ']');
}
catch (Exception e) {
throw new IgniteCheckedException("Failed to start ODBC processor.", e);
}
}
}
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
if (srv != null) {
busyLock.block();
srv.stop();
ctx.ports().deregisterPorts(getClass());
if (odbcExecSvc != null) {
U.shutdownNow(getClass(), odbcExecSvc, log);
odbcExecSvc = null;
}
if (log.isDebugEnabled())
log.debug("ODBC processor stopped.");
}
}
}