JT400 tests can not be run in parallel #6018
diff --git a/integration-tests/jt400/README.adoc b/integration-tests/jt400/README.adoc
index 241dbf8..a95f2cc 100644
--- a/integration-tests/jt400/README.adoc
+++ b/integration-tests/jt400/README.adoc
@@ -88,6 +88,31 @@
CRTDTAQ DTAQ(LIBRARY/TESTLIFO) SEQ(*LIFO) MAXLEN(100)
```
+==== Synchronization for parallel executions
+
+The tests do not work by default for parallel executions.
+For parallel scenario, the locking file has to be provided.
+
+You can create such file by running
+
+```
+QSH CMD('touch #file_lock_path')
+for example QSH CMD('touch /home/#username/cq_jt400_lock')
+```
+
+How to provide a locking file:
+
+```
+export JT400_LOCK_FILE=#file_lock_path
+```
+
+or for Windows:
+
+```
+$Env:JT400_LOCK_FILE="#file_lock_path"
+```
+*If locking file is not provided, tests may fail their executions in parallel mode*
+
==== Using different object names
If your test object names are different from the default ones, you can override default values via environmental variable
@@ -109,27 +134,5 @@
$Env:JT400_KEYED_QUEUE="#lkeyedqueue_if_not_TESTKEYED.DTAQ"
$Env:JT400_MESSAGE_QUEUE="#messagequeue_if_not_TESTMSGQ.MSGQ"
$Env:JT400_MESSAGE_REPLYTO_QUEUE="#messagequeueinquiry_if_not_REPLYMSGQ.MSGQ"
-$Env:JT400_USER_SPACE="#userspace_if_not_PROGCALL"
```
-=== Clear queues after unexpected failures
-
-If tests finishes without unexpected failure, tests are taking care of clearing the data.
-In some cases data might stay written into the real server if test fails unexpectedly.
-This state should might alter following executions.
-
-To force full clear (of each queue) can be achieved by add ing parameter
-```
--Dcq.jt400.clear-all=true
-```
-Be aware that with `-Dcq.jt400.clear-all=true`, the tests can not successfully finish in parallel run.
-
-Usage of clear queues parameter is *strongly* suggested during development
-
-
-==== Parallel runs and locking
-
-Simple locking mechanism is implemented for the test to allow parallel executions.
-
-Whenever test is started, new entry is written into keyed data queue `JT400_KEYED_QUEUE` with the key `cq.jt400.global-lock` and entry is removed after the run.
-Tests are able to clear this lock even if previous execution fails unexpectedly.
\ No newline at end of file
diff --git a/integration-tests/jt400/src/main/resources/application.properties b/integration-tests/jt400/src/main/resources/application.properties
index 296c10e..0dd299e 100644
--- a/integration-tests/jt400/src/main/resources/application.properties
+++ b/integration-tests/jt400/src/main/resources/application.properties
@@ -14,12 +14,6 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-#quarkus.test.flat-class-path = ${quarkus.test.flat-class-path}
-
-# workaround for mocked tests, should be solvable by excluding mocked java files from compilation of skip-mock-tests profile
-# I can not make it work though, but to not block the native support by this, I'm setting flat path to true for all tests
-quarkus.test.flat-class-path = true
-
#jt400 server connection information
cq.jt400.url=${JT400_URL:system}
cq.jt400.username=${JT400_USERNAME:username}
@@ -31,4 +25,6 @@
cq.jt400.message-queue=${JT400_MESSAGE_QUEUE:TESTMSGQ.MSGQ}
cq.jt400.message-replyto-queue=${JT400_MESSAGE_REPLYTO_QUEUE:REPLYMSGQ.MSGQ}
cq.jt400.keyed-queue=${JT400_KEYED_QUEUE:TESTKEYED.DTAQ}
-cq.jt400.lifo-queue=${JT400_LIFO_QUEUE:TESTLIFO.DTAQ}
\ No newline at end of file
+cq.jt400.lifo-queue=${JT400_LIFO_QUEUE:TESTLIFO.DTAQ}
+
+cq.jt400.lock-file=${JT400_LOCK_FILE}
\ No newline at end of file
diff --git a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java
index 3627dd1..2f9bfd7 100644
--- a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java
+++ b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java
@@ -28,7 +28,6 @@
import org.awaitility.Awaitility;
import org.hamcrest.Matchers;
import org.jboss.logging.Logger;
-import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
@@ -41,25 +40,34 @@
private final int MSG_LENGTH = 20;
//tests may be executed in parallel, therefore the timeout is a little bigger in case the test has to wait for another one
- private final int WAIT_IN_SECONDS = 20;
+ private static final int WAIT_IN_SECONDS = 30;
@BeforeAll
public static void beforeAll() throws Exception {
+ //lock execution
+ getClientHelper().lock();
+
//for development purposes
// logQueues();
- //lock execution
- Jt400TestResource.CLIENT_HELPER.lock();
- }
-
- @AfterAll
- public static void afterAll() throws Exception {
- getClientHelper().unlock();
+ //clear al data in advance to be sure that there is no data in the queues
+ //it is not possible to clear data after the run because of CPF2451 Message queue REPLYMSGQ is allocated to another job
+ //wait is required also because of CPF2451, usually takes ~20 seconds to release connections to a reply queue
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(WAIT_IN_SECONDS, TimeUnit.SECONDS).until(
+ () -> {
+ try {
+ return getClientHelper().clear();
+ } catch (Exception e) {
+ LOGGER.debug("Clear failed because of: " + e.getMessage());
+ return false;
+ }
+ },
+ Matchers.is(true));
}
private static void logQueues() throws Exception {
StringBuilder sb = new StringBuilder("\n");
- sb.append("**********************************************************");
+ sb.append("************************************************************");
sb.append(getClientHelper().dumpQueues());
sb.append("\n**********************************************************\n");
LOGGER.info(sb.toString());
@@ -67,7 +75,7 @@
@Test
public void testDataQueue() {
- LOGGER.debug("** testDataQueue() ** has started ");
+ LOGGER.debug("**** testDataQueue() ** has started ");
String msg = RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT);
String answer = "Hello From DQ: " + msg;
@@ -91,7 +99,7 @@
@Test
public void testDataQueueBinary() throws Exception {
- LOGGER.debug("** testDataQueueBinary() ** has started ");
+ LOGGER.debug("**** testDataQueueBinary() ** has started ");
String msg = RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT);
String answer = "Hello (bin) " + msg;
@@ -118,7 +126,7 @@
@Test
public void testKeyedDataQueue() {
- LOGGER.debug("** testKeyedDataQueue() ** has started ");
+ LOGGER.debug("**** testKeyedDataQueue() ** has started ");
String msg1 = RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT);
String msg2 = RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT);
String answer1 = "Hello From KDQ: " + msg1;
@@ -170,7 +178,7 @@
@Test
public void testMessageQueue() throws Exception {
- LOGGER.debug("** testMessageQueue() ** has started ");
+ LOGGER.debug("**** testMessageQueue() ** has started ");
//write
String msg = RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT);
String answer = "Hello from MQ: " + msg;
@@ -205,19 +213,20 @@
@Test
public void testInquiryMessageQueue() throws Exception {
- LOGGER.debug("** testInquiryMessageQueue() **: has started ");
+ LOGGER.debug("**** testInquiryMessageQueue() **: has started ");
+
String msg = RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT);
String replyMsg = "reply to: " + msg;
-
- LOGGER.debug("testInquiryMessageQueue: writing " + msg);
+ getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.replyToQueueu, msg);
+ getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.replyToQueueu, replyMsg);
//sending a message using the same client as component
getClientHelper().sendInquiry(msg);
+ LOGGER.debug("testInquiryMessageQueue: message " + msg + " written via client");
//register deletion of the message in case some following task fails
QueuedMessage queuedMessage = getClientHelper().peekReplyToQueueMessage(msg);
if (queuedMessage != null) {
- getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.replyToQueueu, queuedMessage.getKey());
LOGGER.debug("testInquiryMessageQueue: message confirmed by peek: " + msg);
}
@@ -227,6 +236,7 @@
.post("/jt400/inquiryMessageSetExpected")
.then()
.statusCode(204);
+
//start route before sending message (and wait for start)
Awaitility.await().atMost(WAIT_IN_SECONDS, TimeUnit.SECONDS).until(
() -> RestAssured.get("/jt400/route/start/inquiryRoute")
@@ -237,7 +247,7 @@
LOGGER.debug("testInquiryMessageQueue: inquiry route started");
//await to be processed
- Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(20, TimeUnit.SECONDS).until(
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(WAIT_IN_SECONDS, TimeUnit.SECONDS).until(
() -> RestAssured.get("/jt400/inquiryMessageProcessed")
.then()
.statusCode(200)
diff --git a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java
index 3c09d0f..99bdf6d 100644
--- a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java
+++ b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.quarkus.component.jt400.it;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Enumeration;
@@ -24,7 +23,6 @@
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -32,22 +30,18 @@
import java.util.stream.Collectors;
import com.ibm.as400.access.AS400;
-import com.ibm.as400.access.AS400SecurityException;
import com.ibm.as400.access.DataQueue;
import com.ibm.as400.access.DataQueueEntry;
-import com.ibm.as400.access.ErrorCompletingRequestException;
+import com.ibm.as400.access.IFSFileInputStream;
+import com.ibm.as400.access.IFSKey;
import com.ibm.as400.access.KeyedDataQueue;
-import com.ibm.as400.access.KeyedDataQueueEntry;
import com.ibm.as400.access.MessageQueue;
-import com.ibm.as400.access.ObjectDoesNotExistException;
import com.ibm.as400.access.QueuedMessage;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
-import org.apache.commons.lang3.RandomStringUtils;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.ConfigProvider;
import org.hamcrest.Matchers;
import org.jboss.logging.Logger;
-import org.junit.jupiter.api.Assertions;
public class Jt400TestResource implements QuarkusTestResourceLifecycleManager {
private static final Logger LOGGER = Logger.getLogger(Jt400TestResource.class);
@@ -59,8 +53,6 @@
replyToQueueu;
}
- private static final Optional<String> JT400_CLEAR_ALL = ConfigProvider.getConfig().getOptionalValue("cq.jt400.clear-all",
- String.class);
private static final String JT400_URL = ConfigProvider.getConfig().getValue("cq.jt400.url", String.class);
private static final String JT400_USERNAME = ConfigProvider.getConfig().getValue("cq.jt400.username", String.class);
private static final String JT400_PASSWORD = ConfigProvider.getConfig().getValue("cq.jt400.password", String.class);
@@ -73,14 +65,15 @@
private static final String JT400_LIFO_QUEUE = ConfigProvider.getConfig().getValue("cq.jt400.lifo-queue",
String.class);
private static final String JT400_KEYED_QUEUE = ConfigProvider.getConfig().getValue("cq.jt400.keyed-queue", String.class);
+ private static final Optional<String> JT400_LOCK_FILE = ConfigProvider.getConfig().getOptionalValue("cq.jt400.lock-file",
+ String.class);
//depth of repetitive reads for lifo queue clearing
private final static int CLEAR_DEPTH = 100;
- public final static String LOCK_KEY = "cq.jt400.global-lock";
- //5 minute timeout to obtain a log for the tests execution
- private final static int LOCK_TIMEOUT = 300000;
+ //10 minute timeout to obtain a log for the tests execution
+ private final static int LOCK_TIMEOUT = 600000;
- private static AS400 as400 = new AS400(JT400_URL, JT400_USERNAME, JT400_PASSWORD);;
+ private static AS400 lockAs400;
@Override
public Map<String, String> start() {
@@ -90,13 +83,9 @@
@Override
public void stop() {
- if (as400 != null) {
- try {
- CLIENT_HELPER.clearAll(JT400_CLEAR_ALL.isPresent() && Boolean.parseBoolean(JT400_CLEAR_ALL.get()));
- } catch (Exception e) {
- LOGGER.debug("Clearing of the external queues failed", e);
- }
- as400.close();
+ //no need to unlock, once the as400 connection is release, the lock is released
+ if (lockAs400 != null) {
+ lockAs400.close();
}
}
@@ -106,27 +95,36 @@
public static Jt400ClientHelper CLIENT_HELPER = new Jt400ClientHelper() {
- private String key = null;
+ private boolean cleared = false;
Map<RESOURCE_TYPE, Set<Object>> toRemove = new HashMap<>();
+ IFSFileInputStream lockFile;
+ IFSKey lockKey;
+
@Override
public QueuedMessage peekReplyToQueueMessage(String msg) throws Exception {
return getQueueMessage(JT400_REPLY_TO_MESSAGE_QUEUE, msg);
}
private QueuedMessage getQueueMessage(String queue, String msg) throws Exception {
- MessageQueue messageQueue = new MessageQueue(as400,
- getObjectPath(queue));
- Enumeration<QueuedMessage> msgs = messageQueue.getMessages();
+ AS400 as400 = createAs400();
+ try {
+ MessageQueue messageQueue = new MessageQueue(as400,
+ getObjectPath(queue));
+ Enumeration<QueuedMessage> msgs = messageQueue.getMessages();
- while (msgs.hasMoreElements()) {
- QueuedMessage queuedMessage = msgs.nextElement();
+ while (msgs.hasMoreElements()) {
+ QueuedMessage queuedMessage = msgs.nextElement();
- if (msg.equals(queuedMessage.getText())) {
- return queuedMessage;
+ if (msg.equals(queuedMessage.getText())) {
+ return queuedMessage;
+ }
}
+ return null;
+
+ } finally {
+ as400.close();
}
- return null;
}
@Override
@@ -141,192 +139,126 @@
}
@Override
- public void clearAll(boolean all) throws Exception {
- //message queue
- MessageQueue mq = new MessageQueue(as400, getObjectPath(JT400_MESSAGE_QUEUE));
- if (all) {
- mq.remove();
- } else if (toRemove.containsKey(RESOURCE_TYPE.messageQueue)) {
- clearMessageQueue(RESOURCE_TYPE.messageQueue, mq);
+ public boolean clear() throws Exception {
+
+ //clear only once
+ if (cleared) {
+ return false;
}
- //lifo queue
- DataQueue dq = new DataQueue(as400, getObjectPath(JT400_LIFO_QUEUE));
- if (all) {
+ try (AS400 as400 = createAs400()) {
+ //reply-to queue
+ new MessageQueue(as400, getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)).remove();
+
+ //message queue
+ new MessageQueue(as400, getObjectPath(JT400_MESSAGE_QUEUE)).remove();
+
+ //lifo queue
+ DataQueue dq = new DataQueue(as400, getObjectPath(JT400_LIFO_QUEUE));
for (int i = 01; i < CLEAR_DEPTH; i++) {
if (dq.read() == null) {
break;
}
}
- } else if (toRemove.containsKey(RESOURCE_TYPE.lifoQueueu)) {
- for (Object entry : toRemove.get(RESOURCE_TYPE.lifoQueueu)) {
- List<byte[]> otherMessages = new LinkedList<>();
- DataQueueEntry dqe = dq.read();
- while (dqe != null && !(entry.equals(dqe.getString())
- || entry.equals(new String(dqe.getData(), StandardCharsets.UTF_8)))) {
- otherMessages.add(dqe.getData());
- dqe = dq.read();
- }
- //write back other messages in reverse order (it is a lifo)
- Collections.reverse(otherMessages);
- for (byte[] msg : otherMessages) {
- dq.write(msg);
- }
- }
- }
- //reply-to queue
- MessageQueue rq = new MessageQueue(as400, getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE));
- if (all) {
- rq.remove();
- } else if (toRemove.containsKey(RESOURCE_TYPE.replyToQueueu)) {
- clearMessageQueue(RESOURCE_TYPE.replyToQueueu, rq);
+
+ //keyed queue
+ new KeyedDataQueue(as400, getObjectPath(JT400_KEYED_QUEUE)).clear();
}
- //keyed queue
- KeyedDataQueue kdq = new KeyedDataQueue(as400, getObjectPath(JT400_KEYED_QUEUE));
- if (all) {
- kdq.clear();
- } else if (toRemove.containsKey(RESOURCE_TYPE.keyedDataQue)) {
- for (Object entry : toRemove.get(RESOURCE_TYPE.keyedDataQue)) {
- kdq.clear((String) entry);
- }
- }
- }
-
- private void clearMessageQueue(RESOURCE_TYPE type, MessageQueue mq) throws AS400SecurityException,
- ErrorCompletingRequestException, InterruptedException, IOException, ObjectDoesNotExistException {
- if (!toRemove.get(type).isEmpty()) {
- List<QueuedMessage> msgs = Collections.list(mq.getMessages());
- Map<String, byte[]> keys = msgs.stream().collect(Collectors.toMap(q -> q.getText(), q -> q.getKey()));
- for (Object entry : toRemove.get(type)) {
- if (entry instanceof String) {
- mq.remove(keys.get((String) entry));
- } else {
- mq.remove((byte[]) entry);
- }
- }
- }
+ return true;
}
/**
- * Keyed dataque (FIFO) is used for locking purposes.
- *
- * - Each participant saves unique token into a key cq.jt400.global-lock
- * - Each participant the reads the FIFO queue and if the resulted string is its own unique token, execution is allowed
- * - When execution ends, the key is removed
- *
- * If the token is not its own
- * -read of the token is repeated until timeout or its own token is returned (so the second participant waits, until the
- * first participant removes its token)
- *
- * Dead lock prevention
- *
- * - part of the unique token is timestamp, if participant finds a token, which is too old, token is removed
- * - action to clear-all data removes also the locking tokens
- *
- *
- * Therefore only 1 token (thus 1 participant) is allowed to run the tests, the others have to wait
- *
- * @throws Exception
+ * Locking is implemented via file locking, which is present in JTOpen.
*/
@Override
public void lock() throws Exception {
- if (key == null) {
- key = generateKey();
- //write key into keyed queue
- KeyedDataQueue kdq = new KeyedDataQueue(as400, getObjectPath(JT400_KEYED_QUEUE));
- Assertions.assertTrue(kdq.isFIFO(), "keyed dataqueue has to be FIFO");
-
- kdq.write(LOCK_KEY, key);
-
- //added 5 seconds for the timeout, to have some spare time for removing old locks
- Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(LOCK_TIMEOUT + 5000, TimeUnit.SECONDS)
- .until(
- () -> {
- KeyedDataQueueEntry kdqe = kdq.peek(LOCK_KEY);
- if (kdqe == null) {
- //if kdqe is null, try to lock again
- LOGGER.debug("locked in the queueu was removed, locking again with " + key);
- kdq.write(LOCK_KEY, key);
- }
- String peekedKey = kdqe == null ? null : kdqe.getString();
- //if waiting takes more than 300s, check whether the actual lock can be removed
- LOGGER.debug("peeked lock " + peekedKey + "(my lock is " + key + ")");
-
- if (peekedKey != null && !key.equals(peekedKey)) {
- long peekedTime = Long.parseLong(peekedKey.substring(11));
- if (System.currentTimeMillis() - peekedTime > LOCK_TIMEOUT) {
- //read the key (therefore remove it)
- String readKey = kdq.read(LOCK_KEY).getString();
- System.out.println("Removed old lock " + readKey);
- peekedKey = kdq.peek(LOCK_KEY).getString();
- }
- }
- return peekedKey;
- },
- Matchers.is(key));
+ //if no lock file is proposed, throw an error
+ if (JT400_LOCK_FILE.isEmpty()) {
+ throw new IllegalStateException("No file for locking is provided.");
}
- }
- @Override
- public void unlock() throws Exception {
- Assertions.assertEquals(key,
- new KeyedDataQueue(as400, getObjectPath(JT400_KEYED_QUEUE)).read(LOCK_KEY).getString());
- //clear key
- key = null;
- }
+ if (lockKey == null) {
+ lockAs400 = createAs400();
+ lockFile = new IFSFileInputStream(lockAs400, JT400_LOCK_FILE.get());
- private String generateKey() {
- return RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT) + ":" + System.currentTimeMillis();
+ LOGGER.debug("Asked for lock.");
+
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(LOCK_TIMEOUT, TimeUnit.SECONDS)
+ .until(() -> {
+ try {
+ lockKey = lockFile.lock(1l);
+ } catch (Exception e) {
+ //lock was not acquired
+ return false;
+ }
+ LOGGER.debug("Acquired lock (for file `" + JT400_LOCK_FILE.get() + "`.");
+ return true;
+
+ },
+ Matchers.is(true));
+ }
}
@Override
public String dumpQueues() throws Exception {
- StringBuilder sb = new StringBuilder();
+ AS400 as400 = createAs400();
+ try {
+ StringBuilder sb = new StringBuilder();
- sb.append("\n* MESSAGE QUEUE\n");
- sb.append("\t" + Collections.list(new MessageQueue(as400, getObjectPath(JT400_MESSAGE_QUEUE)).getMessages())
- .stream().map(mq -> mq.getText()).sorted().collect(Collectors.joining(", ")));
+ sb.append("\n* MESSAGE QUEUE\n");
+ sb.append("\t" + Collections.list(new MessageQueue(as400, getObjectPath(JT400_MESSAGE_QUEUE)).getMessages())
+ .stream().map(mq -> mq.getText()).sorted().collect(Collectors.joining(", ")));
- sb.append("\n* INQUIRY QUEUE\n");
- sb.append("\t" + Collections
- .list(new MessageQueue(as400, getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)).getMessages())
- .stream().map(mq -> mq.getText()).sorted().collect(Collectors.joining(", ")));
+ sb.append("\n* INQUIRY QUEUE\n");
+ sb.append("\t" + Collections
+ .list(new MessageQueue(as400, getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)).getMessages())
+ .stream().map(mq -> mq.getText()).sorted().collect(Collectors.joining(", ")));
- sb.append("\n* LIFO QUEUE\n");
- DataQueue dq = new DataQueue(as400, getObjectPath(JT400_LIFO_QUEUE));
- DataQueueEntry dqe;
- List<byte[]> lifoMessages = new LinkedList<>();
- List<String> lifoTexts = new LinkedList<>();
- do {
- dqe = dq.read();
- if (dqe != null) {
- lifoTexts.add(dqe.getString() + " (" + new String(dqe.getData(), StandardCharsets.UTF_8) + ")");
- lifoMessages.add(dqe.getData());
+ sb.append("\n* LIFO QUEUE\n");
+ DataQueue dq = new DataQueue(as400, getObjectPath(JT400_LIFO_QUEUE));
+ DataQueueEntry dqe;
+ List<byte[]> lifoMessages = new LinkedList<>();
+ List<String> lifoTexts = new LinkedList<>();
+ do {
+ dqe = dq.read();
+ if (dqe != null) {
+ lifoTexts.add(dqe.getString() + " (" + new String(dqe.getData(), StandardCharsets.UTF_8) + ")");
+ lifoMessages.add(dqe.getData());
+ }
+ } while (dqe != null);
+
+ //write back other messages in reverse order (it is a lifo)
+ Collections.reverse(lifoMessages);
+ for (byte[] msg : lifoMessages) {
+ dq.write(msg);
}
- } while (dqe != null);
+ sb.append(lifoTexts.stream().collect(Collectors.joining(", ")));
- //write back other messages in reverse order (it is a lifo)
- Collections.reverse(lifoMessages);
- for (byte[] msg : lifoMessages) {
- dq.write(msg);
+ //there is no api to list keyed queue, without knowledge of keys
+ return sb.toString();
+
+ } finally {
+ as400.close();
}
- sb.append(lifoTexts.stream().collect(Collectors.joining(", ")));
-
- sb.append("\n* KEYED DATA QUEUE\n");
- KeyedDataQueue kdq = new KeyedDataQueue(as400, getObjectPath(JT400_KEYED_QUEUE));
- KeyedDataQueueEntry kdqe = kdq.peek(LOCK_KEY);
- sb.append("\tlock: " + (kdqe == null ? "null" : kdqe.getString()));
- return sb.toString();
}
public void sendInquiry(String msg) throws Exception {
- new MessageQueue(as400, getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)).sendInquiry(msg,
- getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE));
+ AS400 as400 = createAs400();
+ try {
+ new MessageQueue(as400, getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)).sendInquiry(msg,
+ getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE));
+ } finally {
+ as400.close();
+ }
}
};
+ private static AS400 createAs400() {
+ return new AS400(JT400_URL, JT400_USERNAME, JT400_PASSWORD);
+ }
+
}
interface Jt400ClientHelper {
@@ -339,14 +271,12 @@
//------------------- clear listeners ------------------------------
- void clearAll(boolean all) throws Exception;
+ boolean clear() throws Exception;
//----------------------- locking
void lock() throws Exception;
- void unlock() throws Exception;
-
String dumpQueues() throws Exception;
}