Retry mechanism for CoAP protocol
diff --git a/coap/pom.xml b/coap/pom.xml
index bd11ec9..8b8aea9 100644
--- a/coap/pom.xml
+++ b/coap/pom.xml
@@ -47,5 +47,12 @@
<groupId>${project.groupId}</groupId>
<artifactId>mina-codec</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git a/coap/src/main/java/org/apache/mina/coap/retry/CoapRetryFilter.java b/coap/src/main/java/org/apache/mina/coap/retry/CoapRetryFilter.java
new file mode 100644
index 0000000..aa72568
--- /dev/null
+++ b/coap/src/main/java/org/apache/mina/coap/retry/CoapRetryFilter.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.coap.retry;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.mina.api.AbstractIoFilter;
+import org.apache.mina.api.IoFilter;
+import org.apache.mina.api.IoSession;
+import org.apache.mina.coap.CoapMessage;
+import org.apache.mina.filterchain.ReadFilterChainController;
+import org.apache.mina.filterchain.WriteFilterChainController;
+import org.apache.mina.session.WriteRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link IoFilter} in charge of messages retransmissions.
+ *
+ * <p>
+ * In case of messages to be sent to the client, the filter retransmits the <i>Confirmable</i> message at exponentially
+ * increasing intervals, until it receives an acknowledgment (or <i>Reset</i> message), or runs out of attempts.
+ * </p>
+ *
+ * <p>
+ * In case of received <i>Confirmable</i> messages, the filter keeps track of the acknowledged transmissions in order to
+ * avoid multiple processing of duplicated messages.
+ * </p>
+ */
+public class CoapRetryFilter extends AbstractIoFilter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CoapRetryFilter.class);
+
+ /** The executor in charge of scheduling the retransmissions */
+ private ScheduledExecutorService retryExecutor = Executors.newSingleThreadScheduledExecutor();
+
+ /** The confirmable messages waiting to be acknowledged */
+ private Map<Integer, CoapTransmission> inFlight = new ConcurrentHashMap<>();
+
+ /** The list of processed messages used to handle duplicate copies of Confirmable messages */
+ private ExpiringMap<Integer, CoapMessage> processed = new ExpiringMap<Integer, CoapMessage>();
+
+ public CoapRetryFilter() {
+ processed.start();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void messageReceived(IoSession session, Object in, ReadFilterChainController controller) {
+ LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session);
+
+ CoapMessage coapMsg = (CoapMessage) in;
+
+ switch (coapMsg.getType()) {
+ case NON_CONFIRMABLE:
+ // non confirmable message, let's move to the next filter
+ controller.callReadNextFilter(coapMsg);
+ break;
+ case CONFIRMABLE:
+ // check if this is a duplicate of a message already processed
+ CoapMessage ack = processed.get(coapMsg.requestId());
+ if (ack != null) {
+ // stop the filter chain and send again the ack since it was probably lost
+ LOGGER.debug("Duplicated messages detected for ID {}", coapMsg.requestId());
+ controller.callWriteMessageForRead(ack);
+ } else {
+ controller.callReadNextFilter(coapMsg);
+ }
+
+ break;
+ case ACK:
+ case RESET:
+ CoapTransmission t = inFlight.get(coapMsg.requestId());
+ if (t != null) {
+ // cancel the scheduled retransmission
+ t.getRetryFuture().cancel(false);
+ inFlight.remove(coapMsg.requestId());
+ }
+ controller.callReadNextFilter(coapMsg);
+ break;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void messageWriting(final IoSession session, final WriteRequest message,
+ WriteFilterChainController controller) {
+ LOGGER.debug("Processing a MESSAGE_WRITING for session {}", session);
+
+ final CoapMessage coapMsg = (CoapMessage) message.getMessage();
+ final Integer coapMsgId = (Integer) coapMsg.requestId();
+
+ switch (coapMsg.getType()) {
+
+ case NON_CONFIRMABLE:
+ controller.callWriteNextFilter(message);
+ break;
+ case RESET:
+ case ACK:
+ // let's keep track of the message to avoid processing it again in case of duplicate copy.
+ processed.put(coapMsgId, coapMsg);
+
+ controller.callWriteNextFilter(message);
+ break;
+
+ case CONFIRMABLE:
+ // initialize a transmission if this is not a retry
+ CoapTransmission t = inFlight.get(coapMsgId);
+ if (t == null) {
+ t = new CoapTransmission(coapMsg);
+ inFlight.put(coapMsgId, t);
+ }
+
+ // schedule a retry
+ ScheduledFuture<?> future = retryExecutor.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ CoapTransmission t = inFlight.get(coapMsgId);
+
+ // send again the message if the maximum number of attempts is not reached
+ if (t != null && t.timeout()) {
+ LOGGER.debug("Retry for message with ID {}", coapMsgId);
+ session.write(coapMsg);
+ } else {
+ // abort transmission
+ LOGGER.debug("No more retry for message with ID {}", coapMsgId);
+ }
+ }
+ }, t.getNextTimeout(), TimeUnit.MILLISECONDS);
+
+ t.setRetryFuture(future);
+
+ // move to the next filter
+ controller.callWriteNextFilter(message);
+ break;
+ }
+
+ }
+}
diff --git a/coap/src/main/java/org/apache/mina/coap/retry/CoapTransmission.java b/coap/src/main/java/org/apache/mina/coap/retry/CoapTransmission.java
new file mode 100644
index 0000000..6f1359c
--- /dev/null
+++ b/coap/src/main/java/org/apache/mina/coap/retry/CoapTransmission.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.coap.retry;
+
+import java.util.Random;
+import java.util.concurrent.ScheduledFuture;
+
+import org.apache.mina.coap.CoapMessage;
+
+/**
+ * A transmission is a wrapper of a <i>Confirmable</i> {@link CoapMessage} carrying additional data used to ensure a
+ * reliable communication.
+ *
+ * <p>
+ * Basically, retransmission is controlled by two things : a timeout and retransmission counter.
+ * </p>
+ */
+public class CoapTransmission {
+
+ /** Default value of the initial timeout - in milliseconds */
+ private static final long ACK_TIMEOUT = 2000L;
+
+ /** Default value of the random factor used to compute the initial timeout */
+ private static final float ACK_RANDOM_FACTOR = 1.5F;
+
+ /** Default value of the maximum number of retransmissions */
+ private static final int MAX_RETRANSMIT = 4;
+
+ /**
+ * The CoAP message waiting to be acknowledged
+ */
+ private CoapMessage message;
+
+ /**
+ * The future in charge of the retransmission when the timeout is reached. It is needed to keep track of this future
+ * to be able to cancel it when the expected acknowledgment is received
+ */
+ private ScheduledFuture<?> retryFuture;
+
+ /**
+ * The number of transmission retry
+ */
+ private int transmissionCount;
+
+ /**
+ * the timeout in millisecond before the next retransmission
+ */
+ private long nextTimeout;
+
+ public CoapTransmission(CoapMessage message) {
+ this.message = message;
+
+ this.transmissionCount = 0;
+
+ // the initial timeout is set to a random duration between ACK_TIMEOUT and (ACK_TIMEOUT * ACK_RANDOM_FACTOR)
+ this.nextTimeout = ACK_TIMEOUT + new Random().nextInt((int) ((ACK_RANDOM_FACTOR - 1.0F) * ACK_TIMEOUT));
+ }
+
+ /**
+ * This method is called when a timeout is triggered for this transmission.
+ *
+ * @return <code>true</code> if the message must be retransmitted and <code>false</code> if the transmission attempt
+ * must be canceled
+ */
+ public boolean timeout() {
+ if (transmissionCount < MAX_RETRANSMIT) {
+ this.nextTimeout = this.nextTimeout * 2;
+ this.transmissionCount++;
+ return true;
+ }
+ return false;
+ }
+
+ public CoapMessage getMessage() {
+ return message;
+ }
+
+ public ScheduledFuture<?> getRetryFuture() {
+ return retryFuture;
+ }
+
+ public void setRetryFuture(ScheduledFuture<?> retryFuture) {
+ this.retryFuture = retryFuture;
+ }
+
+ public long getNextTimeout() {
+ return nextTimeout;
+ }
+
+}
diff --git a/coap/src/main/java/org/apache/mina/coap/retry/ExpiringMap.java b/coap/src/main/java/org/apache/mina/coap/retry/ExpiringMap.java
new file mode 100644
index 0000000..796778d
--- /dev/null
+++ b/coap/src/main/java/org/apache/mina/coap/retry/ExpiringMap.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.coap.retry;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A {@link Map} implementation backed with a {@link ConcurrentHashMap} providing entry expiration facilities.
+ *
+ * <p>
+ * A worker thread is started to check periodically if expired entries should be removed from the underlying map.
+ * </p>
+ *
+ * @see ConcurrentHashMap
+ * @param <K> type of keys maintained by this map
+ * @param <V> the type of mapped values
+ */
+public class ExpiringMap<K, V> implements Map<K, V> {
+
+ private final Map<K, ExpiringValue<V>> map = new ConcurrentHashMap<>();
+
+ /** The default time to live for an entry : 30 seconds */
+ private static final int EXPIRATION_PERIOD_IN_SEC = 30;
+
+ /** The default period between two expiration checks : 10 seconds */
+ private static final int CHECKER_PERIOD_IN_SEC = 10;
+
+ private final int expirationPeriod;
+ private final int checkerPeriod;
+
+ /** The worker in charge of expiring the entries */
+ private final Worker worker = new Worker();
+
+ private volatile boolean running = true;
+
+ /**
+ * A new expiring map
+ *
+ * @param expirationPeriod the expiration period for an entry
+ * @param checkerPeriod the period between two checks of expired elements
+ */
+ public ExpiringMap(int expirationPeriod, int checkerPeriod) {
+ this.expirationPeriod = expirationPeriod;
+ this.checkerPeriod = checkerPeriod;
+ }
+
+ /**
+ * A map with an expiration period of 30 seconds. The worker in charge of expiring the map entries will run every 10
+ * seconds.
+ */
+ public ExpiringMap() {
+ this(EXPIRATION_PERIOD_IN_SEC, CHECKER_PERIOD_IN_SEC);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int size() {
+ return map.size();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean containsKey(Object key) {
+ return map.containsKey(key);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean containsValue(Object value) {
+ return map.containsValue(value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public V get(Object key) {
+ ExpiringValue<V> expValue = map.get(key);
+ if (expValue != null) {
+ return expValue.value;
+ }
+ return null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public V put(K key, V value) {
+ ExpiringValue<V> expValue = map.put(key, new ExpiringValue<V>(value));
+ if (expValue != null) {
+ return expValue.value;
+ }
+ return null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public V remove(Object key) {
+ ExpiringValue<V> expValue = map.remove(key);
+ if (expValue != null) {
+ return expValue.value;
+ }
+ return null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void putAll(Map<? extends K, ? extends V> m) {
+ for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) {
+ map.put(e.getKey(), new ExpiringValue<V>(e.getValue()));
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void clear() {
+ map.clear();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Set<K> keySet() {
+ return map.keySet();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Collection<V> values() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Set<java.util.Map.Entry<K, V>> entrySet() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Remove all expired entries.
+ *
+ * @param date all entries with an expiration date after this date are removed.
+ */
+ private void expire(long date) {
+ for (Entry<K, ExpiringValue<V>> e : map.entrySet()) {
+ if (e.getValue().expiringDate < date) {
+ map.remove(e.getKey());
+ }
+ }
+ }
+
+ /**
+ * Start the thread in charge of expiring the elements
+ */
+ public void start() {
+ worker.start();
+ }
+
+ /**
+ * Stop the cleaning thread
+ */
+ @Override
+ public void finalize() throws IOException {
+ running = false;
+ try {
+ // interrupt the sleep
+ worker.interrupt();
+ // wait for worker to stop
+ worker.join();
+ } catch (InterruptedException e) {
+ // interrupted, we don't care much
+ }
+ }
+
+ /**
+ * Thread in charge of removing the expired entries
+ */
+ private class Worker extends Thread {
+
+ public Worker() {
+ super("ExpiringMapChecker");
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ while (running) {
+ try {
+ sleep(checkerPeriod);
+ expire(System.currentTimeMillis());
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * An entry value with an expiration date.
+ *
+ * @param <T> the type of the value
+ */
+ class ExpiringValue<T> {
+
+ private T value;
+ private long expiringDate;
+
+ public ExpiringValue(T value) {
+ this.value = value;
+
+ Calendar c = Calendar.getInstance();
+ c.add(Calendar.SECOND, expirationPeriod);
+ expiringDate = c.getTime().getTime();
+ }
+
+ }
+
+}
diff --git a/coap/src/test/java/org/apache/mina/coap/retry/CoapRetryFilterTest.java b/coap/src/test/java/org/apache/mina/coap/retry/CoapRetryFilterTest.java
new file mode 100644
index 0000000..34c0e36
--- /dev/null
+++ b/coap/src/test/java/org/apache/mina/coap/retry/CoapRetryFilterTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.coap.retry;
+
+import static org.mockito.Mockito.*;
+
+import org.apache.mina.api.IoSession;
+import org.apache.mina.coap.CoapMessage;
+import org.apache.mina.coap.MessageType;
+import org.apache.mina.filterchain.ReadFilterChainController;
+import org.apache.mina.filterchain.WriteFilterChainController;
+import org.apache.mina.session.DefaultWriteRequest;
+import org.apache.mina.session.WriteRequest;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Unit tests for {@link CoapRetryFilter}
+ */
+public class CoapRetryFilterTest {
+
+ private CoapRetryFilter filter = new CoapRetryFilter();
+
+ private ReadFilterChainController readController = mock(ReadFilterChainController.class);
+
+ private WriteFilterChainController writeController = mock(WriteFilterChainController.class);
+
+ private IoSession session = mock(IoSession.class);
+
+ @Test
+ public void non_confirmable_message_received() {
+ CoapMessage in = new CoapMessage(1, MessageType.NON_CONFIRMABLE, 1, 1234, "token".getBytes(), null,
+ "payload".getBytes());
+
+ filter.messageReceived(session, in, readController);
+
+ // verify
+ verify(readController).callReadNextFilter(in);
+
+ Mockito.verifyNoMoreInteractions(readController);
+ }
+
+ @Test
+ public void first_time_confirmable_message_received() {
+ CoapMessage in = new CoapMessage(1, MessageType.CONFIRMABLE, 1, 1234, "token".getBytes(), null,
+ "payload".getBytes());
+
+ filter.messageReceived(session, in, readController);
+
+ // verify
+ verify(readController).callReadNextFilter(in);
+
+ Mockito.verifyNoMoreInteractions(readController);
+ }
+
+ @Test
+ public void duplicate_confirmable_processed_once() {
+ CoapMessage in = new CoapMessage(1, MessageType.CONFIRMABLE, 1, 1234, "token".getBytes(), null,
+ "payload".getBytes());
+
+ // first confirmable
+ filter.messageReceived(session, in, readController);
+
+ // ack
+ CoapMessage ack = new CoapMessage(1, MessageType.ACK, 1, 1234, null, null, null);
+ filter.messageWriting(session, new DefaultWriteRequest(ack), writeController);
+
+ // duplicate confirmable
+ filter.messageReceived(session, in, readController);
+
+ // verify
+ verify(readController).callReadNextFilter(in);
+ verify(readController).callWriteMessageForRead(ack);
+
+ Mockito.verifyNoMoreInteractions(readController);
+ }
+
+ @Test
+ public void retry_confirmable_message() throws InterruptedException {
+ CoapMessage msg = new CoapMessage(1, MessageType.CONFIRMABLE, 1, 1234, null, null, null);
+
+ WriteRequest writeRequest = new DefaultWriteRequest(msg);
+ filter.messageWriting(session, writeRequest, writeController);
+
+ // verify
+
+ // wait more than the first timeout
+ Thread.sleep(3500L);
+
+ // first write
+ verify(writeController).callWriteNextFilter(writeRequest);
+
+ // retry
+ session.write(msg);
+ }
+
+ @Test
+ public void no_retry_if_ack_received() throws InterruptedException {
+
+ // confirmable
+ CoapMessage msg = new CoapMessage(1, MessageType.CONFIRMABLE, 1, 1234, null, null, null);
+ WriteRequest writeRequest = new DefaultWriteRequest(msg);
+ filter.messageWriting(session, writeRequest, writeController);
+
+ // ack
+ CoapMessage ack = new CoapMessage(1, MessageType.ACK, 1, 1234, null, null, null);
+ filter.messageReceived(session, ack, readController);
+
+ // wait more than the first timeout
+ Thread.sleep(3500L);
+
+ // first write
+ verify(writeController).callWriteNextFilter(writeRequest);
+
+ // no retry
+ Mockito.verifyZeroInteractions(session);
+ }
+
+ @Test
+ public void no_retry_if_reset_received() throws InterruptedException {
+
+ // confirmable
+ CoapMessage msg = new CoapMessage(1, MessageType.CONFIRMABLE, 1, 1234, null, null, null);
+ WriteRequest writeRequest = new DefaultWriteRequest(msg);
+ filter.messageWriting(session, writeRequest, writeController);
+
+ // reset
+ CoapMessage ack = new CoapMessage(1, MessageType.RESET, 1, 1234, null, null, null);
+ filter.messageReceived(session, ack, readController);
+
+ // wait more than the first timeout
+ Thread.sleep(3500L);
+
+ // first write
+ verify(writeController).callWriteNextFilter(writeRequest);
+
+ // no retry
+ Mockito.verifyZeroInteractions(session);
+ }
+
+ @Test
+ public void non_confirmable_message_writing() {
+ CoapMessage msg = new CoapMessage(1, MessageType.NON_CONFIRMABLE, 1, 1234, "token".getBytes(), null,
+ "payload".getBytes());
+ WriteRequest writeRequest = new DefaultWriteRequest(msg);
+
+ filter.messageWriting(session, writeRequest, writeController);
+
+ // verify
+ verify(writeController).callWriteNextFilter(writeRequest);
+
+ Mockito.verifyNoMoreInteractions(writeController);
+ }
+
+}
diff --git a/coap/src/test/java/org/apache/mina/coap/retry/CoapTransmissionTest.java b/coap/src/test/java/org/apache/mina/coap/retry/CoapTransmissionTest.java
new file mode 100644
index 0000000..1280885
--- /dev/null
+++ b/coap/src/test/java/org/apache/mina/coap/retry/CoapTransmissionTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.coap.retry;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.mina.coap.CoapMessage;
+import org.apache.mina.coap.MessageType;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link CoapTransmission}
+ */
+public class CoapTransmissionTest {
+
+ private static final long MIN_INIT_TIMEOUT = 2000L;
+ private static final long MAX_INIT_TIMEOUT = 3000L;
+
+ @Test
+ public void timeout() {
+ CoapTransmission transmission = new CoapTransmission(new CoapMessage(1, MessageType.CONFIRMABLE, 1, 1234,
+ "token".getBytes(), null, "payload".getBytes()));
+
+ assertTrue(transmission.getNextTimeout() > MIN_INIT_TIMEOUT);
+ assertTrue(transmission.getNextTimeout() < MAX_INIT_TIMEOUT);
+
+ // timeout #1
+ assertTrue(transmission.timeout());
+ assertTrue(transmission.getNextTimeout() > MIN_INIT_TIMEOUT * 2);
+ assertTrue(transmission.getNextTimeout() < MAX_INIT_TIMEOUT * 2);
+
+ // timeout #2
+ assertTrue(transmission.timeout());
+ assertTrue(transmission.getNextTimeout() > MIN_INIT_TIMEOUT * 4);
+ assertTrue(transmission.getNextTimeout() < MAX_INIT_TIMEOUT * 4);
+
+ // timeout #3
+ assertTrue(transmission.timeout());
+ assertTrue(transmission.getNextTimeout() > MIN_INIT_TIMEOUT * 8);
+ assertTrue(transmission.getNextTimeout() < MAX_INIT_TIMEOUT * 8);
+
+ // timeout #4
+ assertTrue(transmission.timeout());
+ assertTrue(transmission.getNextTimeout() > MIN_INIT_TIMEOUT * 16);
+ assertTrue(transmission.getNextTimeout() < MAX_INIT_TIMEOUT * 16);
+
+ // timeout #5 - no retry
+ assertFalse(transmission.timeout());
+ }
+}
diff --git a/coap/src/test/java/org/apache/mina/coap/retry/ExpiringMapTest.java b/coap/src/test/java/org/apache/mina/coap/retry/ExpiringMapTest.java
new file mode 100644
index 0000000..44af22d
--- /dev/null
+++ b/coap/src/test/java/org/apache/mina/coap/retry/ExpiringMapTest.java
@@ -0,0 +1,68 @@
+package org.apache.mina.coap.retry;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+
+import org.junit.Test;
+
+/**
+ * Unit test for {@link ExpiringMap}
+ */
+public class ExpiringMapTest {
+
+ @Test
+ public void put_get() {
+ Map<String, String> map = new ExpiringMap<>();
+ map.put("key1", "value1");
+
+ assertTrue(map.containsKey("key1"));
+ assertEquals("value1", map.get("key1"));
+
+ assertFalse(map.containsKey("key2"));
+ assertNull(map.get("key2"));
+ }
+
+ @Test
+ public void size() {
+ Map<String, String> map = new ExpiringMap<>();
+ map.put("key1", "value1");
+ map.put("key2", "value2");
+
+ assertEquals(2, map.size());
+ }
+
+ @Test
+ public void remove() {
+ Map<String, String> map = new ExpiringMap<>();
+ map.put("key1", "value1");
+ map.put("key2", "value2");
+
+ String val = map.remove("key2");
+ assertEquals("value2", val);
+
+ assertEquals(1, map.size());
+ assertTrue(map.containsKey("key1"));
+ }
+
+ @Test
+ public void expiring_element() throws InterruptedException {
+ ExpiringMap<String, String> map = new ExpiringMap<>(5, 1);
+ map.start();
+
+ map.put("key1", "value1");
+
+ assertEquals(1, map.size());
+
+ // check before expiration
+ Thread.sleep(4000L);
+
+ assertEquals(1, map.size());
+
+ // check after expiration
+ Thread.sleep(3000L);
+
+ assertEquals(0, map.size());
+ }
+
+}
diff --git a/coap/src/test/resources/org/log4j2-test.xml b/coap/src/test/resources/org/log4j2-test.xml
new file mode 100644
index 0000000..bbc59ca
--- /dev/null
+++ b/coap/src/test/resources/org/log4j2-test.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+
+<configuration>
+ <appenders>
+ <Console name="STDOUT" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d %-5p [%t] %C{2} (%F:%L) - %m%n %ex"/>
+ </Console>
+ </appenders>
+ <loggers>
+ <logger name="org.apache.log4j.xml" level="info"/>
+ <root level="error">
+ <appender-ref ref="STDOUT"/>
+ </root>
+ </loggers>
+</configuration>