blob: f5394c019fe3c5878682ba77801a581b5ed87f6a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.samza.Partition;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.junit.Test;
public class TestBlockingEnvelopeMap {
private static final SystemStreamPartition SSP = new SystemStreamPartition("test", "test", new Partition(0));
private static final IncomingMessageEnvelope ENVELOPE = new IncomingMessageEnvelope(SSP, null, null, null);
private static final IncomingMessageEnvelope ENVELOPE_WITH_SIZE = new IncomingMessageEnvelope(SSP, null, null, null, 100);
private static final Set<SystemStreamPartition> FETCH = new HashSet<SystemStreamPartition>();
private static final Clock CLOCK = new Clock() {
public long currentTimeMillis() {
return System.currentTimeMillis();
}
};
static {
FETCH.add(SSP);
}
@Test
public void testEmptyMapReturnsEmptyList() throws InterruptedException {
BlockingEnvelopeMap map = new MockBlockingEnvelopeMap();
map.register(SSP, "0");
map.poll(FETCH, 0);
map.poll(FETCH, 30);
map.setIsAtHead(SSP, true);
map.poll(FETCH, -1);
}
@Test
public void testShouldBlockAtLeast100Ms() throws InterruptedException {
BlockingEnvelopeMap map = new MockBlockingEnvelopeMap();
map.register(SSP, "0");
long now = System.currentTimeMillis();
map.poll(FETCH, 100);
assertTrue(System.currentTimeMillis() - now >= 100);
}
@Test
public void testShouldGetSomeMessages() throws InterruptedException {
BlockingEnvelopeMap map = new MockBlockingEnvelopeMap();
map.register(SSP, "0");
map.put(SSP, ENVELOPE);
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = map.poll(FETCH, 0);
assertEquals(1, envelopes.size());
assertEquals(1, envelopes.get(SSP).size());
map.put(SSP, ENVELOPE);
map.put(SSP, ENVELOPE);
envelopes = map.poll(FETCH, 0);
assertEquals(1, envelopes.size());
assertEquals(2, envelopes.get(SSP).size());
// Size info.
assertEquals(0, map.getMessagesSizeInQueue(SSP));
}
@Test
public void testNoSizeComputation() throws InterruptedException {
BlockingEnvelopeMap map = new MockBlockingEnvelopeMap();
map.register(SSP, "0");
map.put(SSP, ENVELOPE);
map.put(SSP, ENVELOPE);
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = map.poll(FETCH, 0);
// Size info.
assertEquals(0, map.getMessagesSizeInQueue(SSP));
}
@Test
public void testSizeComputation() throws InterruptedException {
BlockingEnvelopeMap map = new MockBlockingEnvelopeMap(true);
map.register(SSP, "0");
map.put(SSP, ENVELOPE_WITH_SIZE);
map.put(SSP, ENVELOPE_WITH_SIZE);
// Size info.
assertEquals(200, map.getMessagesSizeInQueue(SSP));
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = map.poll(FETCH, 0);
assertEquals(0, map.getMessagesSizeInQueue(SSP));
}
@Test
public void testShouldBlockWhenNotAtHead() throws InterruptedException {
MockQueue q = new MockQueue();
final BlockingEnvelopeMap map = new MockBlockingEnvelopeMap(q);
map.register(SSP, "0");
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
// Should trigger a take() call.
map.poll(FETCH, -1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.setDaemon(true);
t.start();
q.awaitPollTimeout();
t.join(60000);
// 1000 = blocking timeout constant
assertEquals(1000, q.timeout);
assertFalse(t.isAlive());
}
@Test
public void testShouldPollWithATimeout() throws InterruptedException {
MockQueue q = new MockQueue();
// Always use the same time in this test so that we can be sure we get a
// 100ms poll, rather than a 99ms poll (for example). Have to do this
// because BlockingEnvelopeMap calls clock.currentTimeMillis twice, and
// uses the second call to determine the actual poll time.
final BlockingEnvelopeMap map = new MockBlockingEnvelopeMap(q, new Clock() {
private final long now = System.currentTimeMillis();
public long currentTimeMillis() {
return now;
}
});
map.register(SSP, "0");
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
// Should trigger a poll(100, TimeUnit.MILLISECONDS) call.
map.poll(FETCH, 100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.setDaemon(true);
t.start();
q.awaitPollTimeout();
t.join(60000);
assertEquals(100, q.timeout);
assertFalse(t.isAlive());
}
public class MockQueue extends LinkedBlockingQueue<IncomingMessageEnvelope> {
private static final long serialVersionUID = 1L;
private final CountDownLatch pollTimeoutBarrier;
private long timeout;
public MockQueue() {
this.pollTimeoutBarrier = new CountDownLatch(1);
}
public void awaitPollTimeout() throws InterruptedException {
pollTimeoutBarrier.await();
}
@Override
public IncomingMessageEnvelope poll(long timeout, TimeUnit unit) {
this.timeout = timeout;
pollTimeoutBarrier.countDown();
return ENVELOPE;
}
}
public class MockBlockingEnvelopeMap extends BlockingEnvelopeMap {
private final BlockingQueue<IncomingMessageEnvelope> injectedQueue;
public MockBlockingEnvelopeMap() {
this(null);
}
public MockBlockingEnvelopeMap(boolean fetchLimitByBytesEnabled) {
super(new NoOpMetricsRegistry(), CLOCK, null);
injectedQueue = new MockQueue();
}
public MockBlockingEnvelopeMap(BlockingQueue<IncomingMessageEnvelope> injectedQueue) {
this(injectedQueue, CLOCK);
}
public MockBlockingEnvelopeMap(BlockingQueue<IncomingMessageEnvelope> injectedQueue, Clock clock) {
super(new NoOpMetricsRegistry(), clock);
this.injectedQueue = injectedQueue;
}
@Override
public void start() {
}
@Override
public void stop() {
}
protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
if (injectedQueue != null) {
return injectedQueue;
} else {
return super.newBlockingQueue();
}
}
}
}