blob: fdedf0c6c6ac04f3c98168218e7ca4ae94c26b27 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.spi.connection;
import io.netty.channel.*;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcIoException;
import org.apache.plc4x.java.spi.configuration.Configuration;
import org.apache.plc4x.java.spi.configuration.ConfigurationFactory;
import org.apache.plc4x.java.spi.events.*;
import org.apache.plc4x.java.spi.optimizer.BaseOptimizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.plc4x.java.api.value.PlcValueHandler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class DefaultNettyPlcConnection extends AbstractPlcConnection implements ChannelExposingConnection {
/**
* a {@link HashedWheelTimer} shall be only instantiated once.
*/
// TODO: maybe find a way to make this configurable per jvm
protected final static Timer timer = new HashedWheelTimer();
protected final static long DEFAULT_DISCONNECT_WAIT_TIME = 10000L;
private static final Logger logger = LoggerFactory.getLogger(DefaultNettyPlcConnection.class);
protected Configuration configuration;
protected final ChannelFactory channelFactory;
protected final boolean awaitSessionSetupComplete;
protected final boolean awaitSessionDisconnectComplete;
protected final boolean awaitSessionDiscoverComplete;
protected final ProtocolStackConfigurer stackConfigurer;
protected final CompletableFuture<Void> sessionDisconnectCompleteFuture = new CompletableFuture<>();
protected Channel channel;
protected boolean connected;
public DefaultNettyPlcConnection(boolean canRead, boolean canWrite, boolean canSubscribe,
PlcFieldHandler fieldHandler, PlcValueHandler valueHandler, Configuration configuration,
ChannelFactory channelFactory, boolean awaitSessionSetupComplete,
boolean awaitSessionDisconnectComplete, boolean awaitSessionDiscoverComplete, ProtocolStackConfigurer stackConfigurer, BaseOptimizer optimizer) {
super(canRead, canWrite, canSubscribe, fieldHandler, valueHandler, optimizer);
this.configuration = configuration;
this.channelFactory = channelFactory;
this.awaitSessionSetupComplete = awaitSessionSetupComplete;
//Used to signal that a disconnect has completed while closing a connection.
this.awaitSessionDisconnectComplete = awaitSessionDisconnectComplete;
//Used to signal that discovery has been completed
this.awaitSessionDiscoverComplete = awaitSessionDiscoverComplete;
this.stackConfigurer = stackConfigurer;
this.connected = false;
}
@Override
public void connect() throws PlcConnectionException {
try {
// As we don't just want to wait till the connection is established,
// define a future we can use to signal back that the s7 session is
// finished initializing.
CompletableFuture<Void> sessionSetupCompleteFuture = new CompletableFuture<>();
CompletableFuture<Configuration> sessionDiscoveredCompleteFuture = new CompletableFuture<>();
if(channelFactory == null) {
throw new PlcConnectionException("No channel factory provided");
}
// Inject the configuration
ConfigurationFactory.configure(configuration, channelFactory);
// Have the channel factory create a new channel instance.
if (awaitSessionDiscoverComplete) {
channel = channelFactory.createChannel(getChannelHandler(sessionSetupCompleteFuture, sessionDisconnectCompleteFuture, sessionDiscoveredCompleteFuture));
channel.closeFuture().addListener(future -> {
if (!sessionDiscoveredCompleteFuture.isDone()) {
//Do Nothing
try {
sessionDiscoveredCompleteFuture.complete(null);
} catch (Exception e) {
//Do Nothing
}
}
});
channel.pipeline().fireUserEventTriggered(new DiscoverEvent());
// Wait till the connection is established.
sessionDiscoveredCompleteFuture.get();
}
channel = channelFactory.createChannel(getChannelHandler(sessionSetupCompleteFuture, sessionDisconnectCompleteFuture, sessionDiscoveredCompleteFuture));
channel.closeFuture().addListener(future -> {
if (!sessionSetupCompleteFuture.isDone()) {
sessionSetupCompleteFuture.completeExceptionally(
new PlcIoException("Connection terminated by remote"));
}
});
// Send an event to the pipeline telling the Protocol filters what's going on.
sendChannelCreatedEvent();
// Wait till the connection is established.
if (awaitSessionSetupComplete) {
sessionSetupCompleteFuture.get();
}
// Set the connection to "connected"
connected = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PlcConnectionException(e);
} catch (ExecutionException e) {
throw new PlcConnectionException(e);
}
}
/**
* Close the connection by firstly calling disconnect and waiting for a DisconnectedEvent occurs and then calling
* Close() to finish up any other clean up.
* @throws PlcConnectionException
*/
@Override
public void close() throws PlcConnectionException {
logger.debug("Closing connection to PLC, await for disconnect = {}", awaitSessionDisconnectComplete);
channel.pipeline().fireUserEventTriggered(new DisconnectEvent());
try {
if (awaitSessionDisconnectComplete) {
sessionDisconnectCompleteFuture.get(DEFAULT_DISCONNECT_WAIT_TIME, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
logger.error("Timeout while trying to close connection");
}
channel.pipeline().fireUserEventTriggered(new CloseConnectionEvent());
channel.close().awaitUninterruptibly();
if (!sessionDisconnectCompleteFuture.isDone()) {
sessionDisconnectCompleteFuture.complete(null);
}
channel = null;
connected = false;
}
/**
* Check if the communication channel is active (channel.isActive()) and the driver for a given protocol
* has finished establishing the connection.
*/
@Override
public boolean isConnected() {
return connected && channel.isActive();
}
@Override
public Channel getChannel() {
return channel;
}
public ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupCompleteFuture, CompletableFuture<Void> sessionDisconnectCompleteFuture, CompletableFuture<Configuration> sessionDiscoverCompleteFuture) {
if (stackConfigurer == null) {
throw new IllegalStateException("No Protocol Stack Configurer is given!");
}
/*if (factory == null) {
throw new IllegalStateException("No Instance Factory is Present!");
}*/
return new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) {
// Build the protocol stack for communicating with the s7 protocol.
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof ConnectedEvent) {
sessionSetupCompleteFuture.complete(null);
} else if (evt instanceof DisconnectedEvent) {
sessionDisconnectCompleteFuture.complete(null);
} else if (evt instanceof DiscoveredEvent) {
sessionDiscoverCompleteFuture.complete(((DiscoveredEvent) evt).getConfiguration());
} else {
super.userEventTriggered(ctx, evt);
}
}
});
// Initialize via Transport Layer
channelFactory.initializePipeline(pipeline);
// Initialize Protocol Layer
setProtocol(stackConfigurer.configurePipeline(configuration, pipeline, channelFactory.isPassive()));
}
};
}
protected void sendChannelCreatedEvent() {
logger.trace("Channel was created, firing ChannelCreated Event");
// Send an event to the pipeline telling the Protocol filters what's going on.
channel.pipeline().fireUserEventTriggered(new ConnectEvent());
}
}