blob: e324f9e72e72460a4aa170cf0d110d9722690e24 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.plc4x.java.ads.protocol;
import org.apache.plc4x.java.ads.readwrite.types.ReturnCode;
import org.apache.plc4x.java.ads.configuration.AdsConfiguration;
import org.apache.plc4x.java.ads.field.AdsField;
import org.apache.plc4x.java.ads.field.DirectAdsField;
import org.apache.plc4x.java.ads.field.SymbolicAdsField;
import org.apache.plc4x.java.ads.readwrite.*;
import org.apache.plc4x.java.ads.readwrite.io.DataItemIO;
import org.apache.plc4x.java.ads.readwrite.types.CommandId;
import org.apache.plc4x.java.ads.readwrite.types.ReservedIndexGroups;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.messages.PlcWriteResponse;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.value.*;
import org.apache.plc4x.java.spi.ConversationContext;
import org.apache.plc4x.java.spi.Plc4xProtocolBase;
import org.apache.plc4x.java.spi.configuration.HasConfiguration;
import org.apache.plc4x.java.spi.generation.ParseException;
import org.apache.plc4x.java.spi.generation.ReadBuffer;
import org.apache.plc4x.java.spi.generation.WriteBuffer;
import org.apache.plc4x.java.spi.messages.DefaultPlcReadResponse;
import org.apache.plc4x.java.spi.messages.DefaultPlcWriteResponse;
import org.apache.plc4x.java.spi.messages.InternalPlcReadRequest;
import org.apache.plc4x.java.spi.messages.InternalPlcWriteRequest;
import org.apache.plc4x.java.spi.messages.utils.ResponseItem;
import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements HasConfiguration<AdsConfiguration> {
private static final Logger LOGGER = LoggerFactory.getLogger(AdsProtocolLogic.class);
private AdsConfiguration configuration;
public static final State DEFAULT_COMMAND_STATE = new State(
false, false, false, false, false, true, false, false, false);
private ConversationContext<AmsTCPPacket> adsDriverContext;
private final AtomicLong invokeIdGenerator = new AtomicLong(1);
private RequestTransactionManager tm;
private ConcurrentHashMap<SymbolicAdsField, DirectAdsField> symbolicFieldMapping;
private ConcurrentHashMap<SymbolicAdsField, CompletableFuture<Void>> pendingResolutionRequests;
public AdsProtocolLogic() {
symbolicFieldMapping = new ConcurrentHashMap<>();
pendingResolutionRequests = new ConcurrentHashMap<>();
// Initialize Transaction Manager.
// Until the number of concurrent requests is successfully negotiated we set it to a
// maximum of only one request being able to be sent at a time. During the login process
// No concurrent requests can be sent anyway. It will be updated when receiving the
// S7ParameterSetupCommunication response.
this.tm = new RequestTransactionManager(1);
}
@Override
public void setConfiguration(AdsConfiguration configuration) {
this.configuration = configuration;
}
@Override
public void setContext(ConversationContext<AmsTCPPacket> context) {
super.setContext(context);
adsDriverContext = context;
}
@Override
public void close(ConversationContext<AmsTCPPacket> context) {
}
@Override
public void onConnect(ConversationContext<AmsTCPPacket> context) {
// AMS/ADS doesn't know a concept of a connect.
context.fireConnected();
}
@Override
public void onDisconnect(ConversationContext<AmsTCPPacket> context) {
super.onDisconnect(context);
// TODO: Here we have to clean up all of the handles this connection acquired.
}
@Override
public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
// Get all ADS addresses in their resolved state.
final CompletableFuture<List<DirectAdsField>> directAdsFieldsFuture =
getDirectAddresses(readRequest.getFields());
// If all addresses were already resolved we can send the request immediately.
if(directAdsFieldsFuture.isDone()) {
final List<DirectAdsField> fields = directAdsFieldsFuture.getNow(null);
if(fields != null) {
return executeRead(readRequest, fields);
} else {
final CompletableFuture<PlcReadResponse> errorFuture = new CompletableFuture<>();
errorFuture.completeExceptionally(new PlcException("Error"));
return errorFuture;
}
}
// If there are still symbolic addresses that have to be resolved, send the
// request as soon as the resolution is done.
// In order to instantly be able to return a future, for the final result we have to
// create a new one which is then completed later on. Unfortunately as soon as the
// directAdsFieldsFuture is completed we still don't have the end result, but we can
// now actually send the delayed read request ... as soon as that future completes
// we can complete the initial one.
else {
CompletableFuture<PlcReadResponse> delayedRead = new CompletableFuture<>();
directAdsFieldsFuture.handle((directAdsFields, throwable) -> {
if(directAdsFields != null) {
final CompletableFuture<PlcReadResponse> delayedResponse =
executeRead(readRequest, directAdsFields);
delayedResponse.handle((plcReadResponse, throwable1) -> {
if (plcReadResponse != null) {
delayedRead.complete(plcReadResponse);
} else {
delayedRead.completeExceptionally(throwable1);
}
return this;
});
} else {
delayedRead.completeExceptionally(throwable);
}
return this;
});
return delayedRead;
}
}
protected CompletableFuture<PlcReadResponse> executeRead(PlcReadRequest readRequest,
List<DirectAdsField> directAdsFields) {
// Depending on the number of fields, use a single item request or a sum-request
if (directAdsFields.size() == 1) {
// Do a normal (single item) ADS Read Request
return singleRead(readRequest, directAdsFields.get(0));
} else {
// TODO: Check if the version of the remote station is at least TwinCAT v2.11 Build >= 1550 otherwise split up into single item requests.
// Do a ADS-Sum Read Request.
return multiRead(readRequest, directAdsFields);
}
}
protected CompletableFuture<PlcReadResponse> singleRead(PlcReadRequest readRequest, DirectAdsField directAdsField) {
CompletableFuture<PlcReadResponse> future = new CompletableFuture<>();
int size = directAdsField.getAdsDataType().getNumBytes() * directAdsField.getNumberOfElements();
AdsData adsData = new AdsReadRequest(directAdsField.getIndexGroup(), directAdsField.getIndexOffset(), size);
AmsPacket amsPacket = new AmsPacket(configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(),
configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(),
CommandId.ADS_READ, DEFAULT_COMMAND_STATE, 0, getInvokeId(), adsData);
AmsTCPPacket amsTCPPacket = new AmsTCPPacket(amsPacket);
// Start a new request-transaction (Is ended in the response-handler)
RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
transaction.submit(() -> context.sendRequest(amsTCPPacket)
.expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest()))
.onTimeout(future::completeExceptionally)
.onError((p, e) -> future.completeExceptionally(e))
.check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == amsPacket.getInvokeId())
.unwrap(response -> (AdsReadResponse) response.getUserdata().getData())
.handle(responseAdsData -> {
if(responseAdsData.getResult() == ReturnCode.OK) {
final PlcReadResponse plcReadResponse = convertToPlc4xReadResponse(readRequest, responseAdsData);
// Convert the response from the PLC into a PLC4X Response ...
future.complete(plcReadResponse);
} else {
// TODO: Implement this correctly.
future.completeExceptionally(new PlcException("Error"));
}
// Finish the request-transaction.
transaction.endRequest();
}));
return future;
}
protected CompletableFuture<PlcReadResponse> multiRead(PlcReadRequest readRequest, List<DirectAdsField> directAdsFields) {
CompletableFuture<PlcReadResponse> future = new CompletableFuture<>();
// Calculate the size of all fields together.
// Calculate the expected size of the response data.
long expectedResponseDataSize = directAdsFields.stream().mapToLong(
field -> (field.getAdsDataType().getNumBytes() * field.getNumberOfElements()) + 4).sum();
// With multi-requests, the index-group is fixed and the index offset indicates the number of elements.
AdsData adsData = new AdsReadWriteRequest(
ReservedIndexGroups.ADSIGRP_MULTIPLE_READ.getValue(), directAdsFields.size(), expectedResponseDataSize,
directAdsFields.stream().map(directAdsField -> new AdsMultiRequestItemRead(
directAdsField.getIndexGroup(), directAdsField.getIndexOffset(),
(directAdsField.getAdsDataType().getNumBytes() * directAdsField.getNumberOfElements())))
.toArray(AdsMultiRequestItem[]::new), null);
AmsPacket amsPacket = new AmsPacket(configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(),
configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(),
CommandId.ADS_READ_WRITE, DEFAULT_COMMAND_STATE, 0, getInvokeId(), adsData);
AmsTCPPacket amsTCPPacket = new AmsTCPPacket(amsPacket);
// Start a new request-transaction (Is ended in the response-handler)
RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
transaction.submit(() -> context.sendRequest(amsTCPPacket)
.expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest()))
.onTimeout(future::completeExceptionally)
.onError((p, e) -> future.completeExceptionally(e))
.check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == amsPacket.getInvokeId())
.unwrap(response -> (AdsReadWriteResponse) response.getUserdata().getData())
.handle(responseAdsData -> {
if(responseAdsData.getResult() == ReturnCode.OK) {
final PlcReadResponse plcReadResponse = convertToPlc4xReadResponse(readRequest, responseAdsData);
// Convert the response from the PLC into a PLC4X Response ...
future.complete(plcReadResponse);
} else {
// TODO: Implement this correctly.
future.completeExceptionally(new PlcException("Error"));
}
// Finish the request-transaction.
transaction.endRequest();
}));
return future;
}
protected PlcReadResponse convertToPlc4xReadResponse(PlcReadRequest readRequest, AdsData adsData) {
ReadBuffer readBuffer = null;
Map<String, PlcResponseCode> responseCodes = new HashMap<>();
if (adsData instanceof AdsReadResponse) {
AdsReadResponse adsReadResponse = (AdsReadResponse) adsData;
readBuffer = new ReadBuffer(adsReadResponse.getData(), true);
responseCodes.put(readRequest.getFieldNames().stream().findFirst().orElse(""),
parsePlcResponseCode(adsReadResponse.getResult()));
} else if (adsData instanceof AdsReadWriteResponse) {
AdsReadWriteResponse adsReadWriteResponse = (AdsReadWriteResponse) adsData;
readBuffer = new ReadBuffer(adsReadWriteResponse.getData(), true);
// When parsing a multi-item response, the error codes of each items come
// in sequence and then come the values.
for (String fieldName : readRequest.getFieldNames()) {
try {
final ReturnCode result = ReturnCode.valueOf(readBuffer.readUnsignedLong(32));
responseCodes.put(fieldName, parsePlcResponseCode(result));
} catch (ParseException e) {
responseCodes.put(fieldName, PlcResponseCode.INTERNAL_ERROR);
}
}
}
if(readBuffer != null) {
Map<String, ResponseItem<PlcValue>> values = new HashMap<>();
for (String fieldName : readRequest.getFieldNames()) {
AdsField field = (AdsField) readRequest.getField(fieldName);
// If the response-code was anything but OK, we don't need to parse the payload.
if(responseCodes.get(fieldName) != PlcResponseCode.OK) {
values.put(fieldName, new ResponseItem<>(responseCodes.get(fieldName), null));
}
// If the response-code was ok, parse the data returned.
else {
values.put(fieldName, parsePlcValue(field, readBuffer));
}
}
return new DefaultPlcReadResponse((InternalPlcReadRequest) readRequest, values);
}
return null;
}
private PlcResponseCode parsePlcResponseCode(ReturnCode adsResult) {
if (adsResult == ReturnCode.OK) {
return PlcResponseCode.OK;
} else {
// TODO: Implement this a little more ...
return PlcResponseCode.INTERNAL_ERROR;
}
}
private ResponseItem<PlcValue> parsePlcValue(AdsField field, ReadBuffer readBuffer) {
try {
if (field.getNumberOfElements() == 1) {
return new ResponseItem<>(PlcResponseCode.OK,
DataItemIO.staticParse(readBuffer, field.getAdsDataType()));
} else {
// Fetch all
final PlcValue[] resultItems = IntStream.range(0, field.getNumberOfElements()).mapToObj(i -> {
try {
return DataItemIO.staticParse(readBuffer, field.getAdsDataType());
} catch (ParseException e) {
LOGGER.warn("Error parsing field item of type: '{}' (at position {}})", field.getAdsDataType(), i, e);
}
return null;
}).toArray(PlcValue[]::new);
return new ResponseItem<>(PlcResponseCode.OK, PlcValues.of(resultItems));
}
} catch (ParseException e) {
LOGGER.warn(String.format("Error parsing field item of type: '%s'", field.getAdsDataType()), e);
return new ResponseItem<>(PlcResponseCode.INTERNAL_ERROR, null);
}
}
@Override
public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
// Get all ADS addresses in their resolved state.
final CompletableFuture<List<DirectAdsField>> directAdsFieldsFuture =
getDirectAddresses(writeRequest.getFields());
// If all addresses were already resolved we can send the request immediately.
if(directAdsFieldsFuture.isDone()) {
final List<DirectAdsField> fields = directAdsFieldsFuture.getNow(null);
if(fields != null) {
return executeWrite((InternalPlcWriteRequest) writeRequest, fields);
} else {
final CompletableFuture<PlcWriteResponse> errorFuture = new CompletableFuture<>();
errorFuture.completeExceptionally(new PlcException("Error"));
return errorFuture;
}
}
// If there are still symbolic addresses that have to be resolved, send the
// request as soon as the resolution is done.
// In order to instantly be able to return a future, for the final result we have to
// create a new one which is then completed later on. Unfortunately as soon as the
// directAdsFieldsFuture is completed we still don't have the end result, but we can
// now actually send the delayed read request ... as soon as that future completes
// we can complete the initial one.
else {
CompletableFuture<PlcWriteResponse> delayedWrite = new CompletableFuture<>();
directAdsFieldsFuture.handle((directAdsFields, throwable) -> {
if(directAdsFields != null) {
final CompletableFuture<PlcWriteResponse> delayedResponse =
executeWrite((InternalPlcWriteRequest) writeRequest, directAdsFields);
delayedResponse.handle((plcReadResponse, throwable1) -> {
if (plcReadResponse != null) {
delayedWrite.complete(plcReadResponse);
} else {
delayedWrite.completeExceptionally(throwable1);
}
return this;
});
} else {
delayedWrite.completeExceptionally(throwable);
}
return this;
});
return delayedWrite;
}
}
protected CompletableFuture<PlcWriteResponse> executeWrite(InternalPlcWriteRequest writeRequest,
List<DirectAdsField> directAdsFields) {
// Depending on the number of fields, use a single item request or a sum-request
if (directAdsFields.size() == 1) {
// Do a normal (single item) ADS Write Request
return singleWrite(writeRequest, directAdsFields.get(0));
} else {
// TODO: Check if the version of the remote station is at least TwinCAT v2.11 Build >= 1550 otherwise split up into single item requests.
// Do a ADS-Sum Read Request.
return multiWrite(writeRequest, directAdsFields);
}
}
protected CompletableFuture<PlcWriteResponse> singleWrite(InternalPlcWriteRequest writeRequest, DirectAdsField directAdsField) {
CompletableFuture<PlcWriteResponse> future = new CompletableFuture<>();
final String fieldName = writeRequest.getFieldNames().iterator().next();
final AdsField plcField = (AdsField) writeRequest.getField(fieldName);
final PlcValue plcValue = writeRequest.getPlcValue(fieldName);
try {
WriteBuffer writeBuffer = DataItemIO.staticSerialize(plcValue, plcField.getAdsDataType(), true);
AdsData adsData = new AdsWriteRequest(
directAdsField.getIndexGroup(), directAdsField.getIndexOffset(), writeBuffer.getData());
AmsPacket amsPacket = new AmsPacket(configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(),
configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(),
CommandId.ADS_WRITE, DEFAULT_COMMAND_STATE, 0, getInvokeId(), adsData);
AmsTCPPacket amsTCPPacket = new AmsTCPPacket(amsPacket);
// Start a new request-transaction (Is ended in the response-handler)
RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
transaction.submit(() -> context.sendRequest(amsTCPPacket)
.expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest()))
.onTimeout(future::completeExceptionally)
.onError((p, e) -> future.completeExceptionally(e))
.check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == amsPacket.getInvokeId())
.unwrap(response -> (AdsWriteResponse) response.getUserdata().getData())
.handle(responseAdsData -> {
if (responseAdsData.getResult() == ReturnCode.OK) {
final PlcWriteResponse plcWriteResponse = convertToPlc4xWriteResponse(writeRequest, responseAdsData);
// Convert the response from the PLC into a PLC4X Response ...
future.complete(plcWriteResponse);
} else {
// TODO: Implement this correctly.
future.completeExceptionally(new PlcException("Error"));
}
// Finish the request-transaction.
transaction.endRequest();
}));
} catch (Exception e) {
future.completeExceptionally(new PlcException("Error"));
}
return future;
}
protected CompletableFuture<PlcWriteResponse> multiWrite(InternalPlcWriteRequest writeRequest, List<DirectAdsField> directAdsFields) {
CompletableFuture<PlcWriteResponse> future = new CompletableFuture<>();
// Calculate the size of all fields together.
// Calculate the expected size of the response data.
int expectedRequestDataSize = directAdsFields.stream().mapToInt(
field -> field.getAdsDataType().getNumBytes() * field.getNumberOfElements()).sum();
byte[] writeBuffer = new byte[expectedRequestDataSize];
int pos = 0;
for (String fieldName : writeRequest.getFieldNames()) {
final AdsField field = (AdsField) writeRequest.getField(fieldName);
final PlcValue plcValue = writeRequest.getPlcValue(fieldName);
try {
final WriteBuffer itemWriteBuffer = DataItemIO.staticSerialize(plcValue, field.getAdsDataType(), true);
int numBytes = itemWriteBuffer.getPos();
System.arraycopy(itemWriteBuffer.getData(), 0, writeBuffer, pos, numBytes);
pos += numBytes;
} catch (Exception e) {
throw new PlcRuntimeException("Error serializing data", e);
}
}
// With multi-requests, the index-group is fixed and the index offset indicates the number of elements.
AdsData adsData = new AdsReadWriteRequest(
ReservedIndexGroups.ADSIGRP_MULTIPLE_WRITE.getValue(), directAdsFields.size(), directAdsFields.size() * 4,
directAdsFields.stream().map(directAdsField -> new AdsMultiRequestItemWrite(
directAdsField.getIndexGroup(), directAdsField.getIndexOffset(),
(directAdsField.getAdsDataType().getNumBytes() * directAdsField.getNumberOfElements())))
.toArray(AdsMultiRequestItem[]::new), writeBuffer);
AmsPacket amsPacket = new AmsPacket(configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(),
configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(),
CommandId.ADS_READ_WRITE, DEFAULT_COMMAND_STATE, 0, getInvokeId(), adsData);
AmsTCPPacket amsTCPPacket = new AmsTCPPacket(amsPacket);
// Start a new request-transaction (Is ended in the response-handler)
RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
transaction.submit(() -> context.sendRequest(amsTCPPacket)
.expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest()))
.onTimeout(future::completeExceptionally)
.onError((p, e) -> future.completeExceptionally(e))
.check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == amsPacket.getInvokeId())
.unwrap(response -> (AdsReadWriteResponse) response.getUserdata().getData())
.handle(responseAdsData -> {
if(responseAdsData.getResult() == ReturnCode.OK) {
final PlcWriteResponse plcWriteResponse = convertToPlc4xWriteResponse(writeRequest, responseAdsData);
// Convert the response from the PLC into a PLC4X Response ...
future.complete(plcWriteResponse);
} else {
// TODO: Implement this correctly.
future.completeExceptionally(new PlcException("Error"));
}
// Finish the request-transaction.
transaction.endRequest();
}));
return future;
}
protected PlcWriteResponse convertToPlc4xWriteResponse(PlcWriteRequest writeRequest, AdsData adsData) {
Map<String, PlcResponseCode> responseCodes = new HashMap<>();
if (adsData instanceof AdsWriteResponse) {
AdsWriteResponse adsWriteResponse = (AdsWriteResponse) adsData;
responseCodes.put(writeRequest.getFieldNames().stream().findFirst().orElse(""),
parsePlcResponseCode(adsWriteResponse.getResult()));
} else if (adsData instanceof AdsReadWriteResponse) {
AdsReadWriteResponse adsReadWriteResponse = (AdsReadWriteResponse) adsData;
ReadBuffer readBuffer = new ReadBuffer(adsReadWriteResponse.getData(), true);
// When parsing a multi-item response, the error codes of each items come
// in sequence and then come the values.
for (String fieldName : writeRequest.getFieldNames()) {
try {
final ReturnCode result = ReturnCode.valueOf(readBuffer.readUnsignedLong(32));
responseCodes.put(fieldName, parsePlcResponseCode(result));
} catch (ParseException e) {
responseCodes.put(fieldName, PlcResponseCode.INTERNAL_ERROR);
}
}
}
return new DefaultPlcWriteResponse((InternalPlcWriteRequest) writeRequest, responseCodes);
}
@Override
protected void decode(ConversationContext<AmsTCPPacket> context, AmsTCPPacket msg) throws Exception {
super.decode(context, msg);
}
protected CompletableFuture<List<DirectAdsField>> getDirectAddresses(List<PlcField> fields) {
CompletableFuture<List<DirectAdsField>> future = new CompletableFuture<>();
// Get all symbolic fields from the current request.
// These potentially need to be resolved to direct addresses, if this has not been done before.
final List<SymbolicAdsField> referencedSymbolicFields = fields.stream()
.filter(plcField -> plcField instanceof SymbolicAdsField)
.map(plcField -> (SymbolicAdsField) plcField).collect(Collectors.toList());
// Find out for which of these symbolic addresses no resolution has been initiated.
final List<SymbolicAdsField> symbolicFieldsNeedingResolution = referencedSymbolicFields.stream()
.filter(symbolicAdsField -> !symbolicFieldMapping.containsKey(symbolicAdsField))
.collect(Collectors.toList());
// If there are unresolved symbolic addresses, initiate the resolution
if (!symbolicFieldsNeedingResolution.isEmpty()) {
// Get a list of symbolic addresses for which no resolution request has been sent yet
// (A parallel request initiated a bit earlier might have already initiated a resolution
// which has not yet been completed)
final List<SymbolicAdsField> requiredResolutionFields =
symbolicFieldsNeedingResolution.stream().filter(symbolicAdsField ->
!pendingResolutionRequests.containsKey(symbolicAdsField)).collect(Collectors.toList());
// If there are fields for which no resolution request has been sent yet,
// send a request.
if (!requiredResolutionFields.isEmpty()) {
CompletableFuture<Void> resolutionFuture;
// Create a future which will be completed as soon as the
// resolution result has been added to the map.
if (requiredResolutionFields.size() == 1) {
SymbolicAdsField symbolicAdsField = requiredResolutionFields.get(0);
resolutionFuture = resolveSingleSymbolicAddress(requiredResolutionFields.get(0));
pendingResolutionRequests.put(symbolicAdsField, resolutionFuture);
} else {
resolutionFuture = resolveMultipleSymbolicAddresses(requiredResolutionFields);
for (SymbolicAdsField symbolicAdsField : requiredResolutionFields) {
pendingResolutionRequests.put(symbolicAdsField, resolutionFuture);
}
}
}
// Create a global future which is completed as soon as all sub-futures for this request are completed.
final CompletableFuture<Void> resolutionComplete =
CompletableFuture.allOf(symbolicFieldsNeedingResolution.stream()
.map(symbolicAdsField -> pendingResolutionRequests.get(symbolicAdsField))
.toArray(CompletableFuture[]::new));
// Complete the future asynchronously as soon as all fields are resolved.
resolutionComplete.handleAsync((unused, throwable) -> {
return future.complete(fields.stream().map(plcField -> {
if (plcField instanceof SymbolicAdsField) {
return symbolicFieldMapping.get(plcField);
} else {
return (DirectAdsField) plcField;
}
}).collect(Collectors.toList()));
});
} else {
// If all fields were resolved, we can continue instantly.
future.complete(fields.stream().map(plcField -> {
if (plcField instanceof SymbolicAdsField) {
return symbolicFieldMapping.get(plcField);
} else {
return (DirectAdsField) plcField;
}
}).collect(Collectors.toList()));
}
return future;
}
protected CompletableFuture<Void> resolveSingleSymbolicAddress(SymbolicAdsField symbolicAdsField) {
CompletableFuture<Void> future = new CompletableFuture<>();
AdsData adsData = new AdsReadWriteRequest(ReservedIndexGroups.ADSIGRP_SYM_HNDBYNAME.getValue(), 0,
4, null,
getNullByteTerminatedArray(symbolicAdsField.getSymbolicField()));
AmsPacket amsPacket = new AmsPacket(configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(),
configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(),
CommandId.ADS_READ_WRITE, DEFAULT_COMMAND_STATE, 0, getInvokeId(), adsData);
AmsTCPPacket amsTCPPacket = new AmsTCPPacket(amsPacket);
// Start a new request-transaction (Is ended in the response-handler)
RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
transaction.submit(() -> context.sendRequest(amsTCPPacket)
.expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest()))
.onTimeout(future::completeExceptionally)
.onError((p, e) -> future.completeExceptionally(e))
.check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == amsPacket.getInvokeId())
.unwrap(response -> response.getUserdata().getData())
.check(adsDataResponse -> adsDataResponse instanceof AdsReadWriteResponse)
.unwrap(adsDataResponse -> (AdsReadWriteResponse) adsDataResponse)
.handle(responseAdsData -> {
ReadBuffer readBuffer = new ReadBuffer(responseAdsData.getData(), true);
try {
// Read the handle.
long handle = readBuffer.readUnsignedLong(32);
DirectAdsField directAdsField = new DirectAdsField(
ReservedIndexGroups.ADSIGRP_SYM_VALBYHND.getValue(), handle,
symbolicAdsField.getAdsDataType(), symbolicAdsField.getNumberOfElements());
symbolicFieldMapping.put(symbolicAdsField, directAdsField);
future.complete(null);
} catch (ParseException e) {
future.completeExceptionally(e);
}
transaction.endRequest();
}));
return future;
}
protected CompletableFuture<Void> resolveMultipleSymbolicAddresses(List<SymbolicAdsField> symbolicAdsFields) {
CompletableFuture<Void> future = new CompletableFuture<>();
// The expected response for every symbolic address is 12 bytes (8 bytes header and 4 bytes for the handle)
long expectedResponseDataSize = symbolicAdsFields.size() * 12;
// Concatenate the string part of each symbolic address into one concattenated string and get the bytes.
byte[] addressData = symbolicAdsFields.stream().map(
SymbolicAdsField::getSymbolicField).collect(Collectors.joining("")).getBytes();
AdsData adsData = new AdsReadWriteRequest(ReservedIndexGroups.ADSIGRP_MULTIPLE_READ_WRITE.getValue(),
symbolicAdsFields.size(), expectedResponseDataSize, symbolicAdsFields.stream().map(symbolicAdsField ->
new AdsMultiRequestItemReadWrite(ReservedIndexGroups.ADSIGRP_SYM_HNDBYNAME.getValue(), 0,
4, symbolicAdsField.getSymbolicField().length())
).toArray(AdsMultiRequestItem[]::new), addressData);
AmsPacket amsPacket = new AmsPacket(configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(),
configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(),
CommandId.ADS_READ_WRITE, DEFAULT_COMMAND_STATE, 0, getInvokeId(), adsData);
AmsTCPPacket amsTCPPacket = new AmsTCPPacket(amsPacket);
// Start a new request-transaction (Is ended in the response-handler)
RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
transaction.submit(() -> context.sendRequest(amsTCPPacket)
.expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest()))
.onTimeout(future::completeExceptionally)
.onError((p, e) -> future.completeExceptionally(e))
.check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == amsPacket.getInvokeId())
.unwrap(response -> response.getUserdata().getData())
.check(adsDataResponse -> adsDataResponse instanceof AdsReadWriteResponse)
.unwrap(adsDataResponse -> (AdsReadWriteResponse) adsDataResponse)
.handle(responseAdsData -> {
ReadBuffer readBuffer = new ReadBuffer(responseAdsData.getData(), true);
Map<SymbolicAdsField, Long> returnCodes = new HashMap<>();
// In the response first come the return codes and the data-lengths for each item.
symbolicAdsFields.forEach(symbolicAdsField -> {
try {
// This should be 0 in the success case.
long returnCode = readBuffer.readUnsignedLong(32);
// This is always 4
long itemLength = readBuffer.readUnsignedLong(32);
returnCodes.put(symbolicAdsField, returnCode);
} catch (ParseException e) {
e.printStackTrace();
}
});
// After reading the header-information, comes the data itself.
symbolicAdsFields.forEach(symbolicAdsField -> {
try {
if (returnCodes.get(symbolicAdsField) == 0) {
// Read the handle.
long handle = readBuffer.readUnsignedLong(32);
DirectAdsField directAdsField = new DirectAdsField(
ReservedIndexGroups.ADSIGRP_SYM_VALBYHND.getValue(), handle,
symbolicAdsField.getAdsDataType(), symbolicAdsField.getNumberOfElements());
symbolicFieldMapping.put(symbolicAdsField, directAdsField);
} else {
// TODO: Handle the case of unsuccessful resolution ..
}
} catch (ParseException e) {
e.printStackTrace();
}
});
future.complete(null);
transaction.endRequest();
}));
return future;
}
protected long getInvokeId() {
long invokeId = invokeIdGenerator.getAndIncrement();
// If we've reached the max value for a 16 bit transaction identifier, reset back to 1
if(invokeIdGenerator.get() == 0xFFFFFFFF) {
invokeIdGenerator.set(1);
}
return invokeId;
}
protected byte[] getNullByteTerminatedArray(String value) {
byte[] valueBytes = value.getBytes();
byte[] nullTerminatedBytes = new byte[valueBytes.length + 1];
System.arraycopy(valueBytes, 0, nullTerminatedBytes, 0, valueBytes.length);
return nullTerminatedBytes;
}
}