blob: bb4502c2c4f7d3f3415c0f3e7a5d08a793cded61 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram;
import java.net.InetSocketAddress;
import java.util.Map;
import org.junit.Assert;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.datatorrent.api.Stats.OperatorStats;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeat;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
/**
* Mock container for testing container manager and heartbeat protocol.
*/
public class MockContainer
{
final StreamingContainerAgent sca;
final PTContainer container;
final Map<Integer, MockOperatorStats> stats = Maps.newHashMap();
public MockContainer(StreamingContainerManager scm, PTContainer c)
{
this.sca = assignContainer(scm, c);
this.container = c;
Assert.assertEquals(c, sca.container);
Assert.assertEquals(PTContainer.State.ALLOCATED, container.getState());
ContainerStats cstats = new ContainerStats(sca.container.getExternalId());
ContainerHeartbeat hb = new ContainerHeartbeat();
hb.setContainerStats(cstats);
sca.dnmgr.processHeartbeat(hb); // activate container
Assert.assertEquals(PTContainer.State.ACTIVE, container.getState());
}
private StreamingContainerAgent assignContainer(StreamingContainerManager scm, PTContainer c)
{
c.setResourceRequestPriority(c.getId());
String containerId = "container" + c.getId();
InetSocketAddress bufferServerAddress = InetSocketAddress.createUnresolved(containerId + "Host", 0);
return scm.assignContainer(new StreamingContainerManager.ContainerResource(c.getId(), containerId, "localhost",
1024, 0, null), bufferServerAddress);
}
public void deploy()
{
Assert.assertNotNull(sca.container.getExternalId());
Assert.assertEquals(PTContainer.State.ACTIVE, container.getState());
//Assert.assertEquals(PTOperator.State.PENDING_DEPLOY, o1p1.getState());
ContainerStats cstats = new ContainerStats(sca.container.getExternalId());
ContainerHeartbeat hb = new ContainerHeartbeat();
hb.setContainerStats(cstats);
ContainerHeartbeatResponse chr = sca.dnmgr.processHeartbeat(hb); // get deploy request
Assert.assertNotNull(chr.deployRequest);
Assert.assertEquals("" + chr.deployRequest, container.getOperators().size(), chr.deployRequest.size());
Assert.assertEquals(PTContainer.State.ACTIVE, container.getState());
for (PTOperator oper : container.getOperators()) {
Assert.assertEquals("state " + oper, PTOperator.State.PENDING_DEPLOY, oper.getState());
}
}
public void sendHeartbeat()
{
ContainerStats cstats = new ContainerStats(sca.container.getExternalId());
ContainerHeartbeat hb = new ContainerHeartbeat();
hb.setContainerStats(cstats);
for (Map.Entry<Integer, MockOperatorStats> oe : this.stats.entrySet()) {
OperatorHeartbeat ohb = new OperatorHeartbeat();
ohb.setNodeId(oe.getKey());
ohb.setState(oe.getValue().deployState);
OperatorStats lstats = new OperatorStats();
lstats.checkpoint = new Checkpoint(oe.getValue().checkpointWindowId, 0, 0);
lstats.windowId = oe.getValue().currentWindowId;
//stats.outputPorts = Lists.newArrayList();
//PortStats ps = new PortStats(TestGeneratorInputOperator.OUTPUT_PORT);
//ps.bufferServerBytes = 101;
//ps.tupleCount = 1;
//stats.outputPorts.add(ps);
ohb.windowStats = Lists.newArrayList(lstats);
cstats.operators.add(ohb);
}
ContainerHeartbeatResponse chr = sca.dnmgr.processHeartbeat(hb);
Assert.assertNull(chr.deployRequest);
}
public MockOperatorStats stats(int operatorId)
{
MockOperatorStats os = this.stats.get(operatorId);
if (os == null) {
os = new MockOperatorStats(operatorId);
this.stats.put(operatorId, os);
}
return os;
}
public class MockOperatorStats
{
final int operatorId;
OperatorHeartbeat.DeployState deployState;
long currentWindowId;
long checkpointWindowId;
private MockOperatorStats(int operatorId)
{
this.operatorId = operatorId;
}
public MockOperatorStats deployState(OperatorHeartbeat.DeployState s)
{
this.deployState = s;
return this;
}
public MockOperatorStats currentWindowId(long windowId)
{
this.currentWindowId = windowId;
return this;
}
public MockOperatorStats checkpointWindowId(long windowId)
{
this.checkpointWindowId = windowId;
return this;
}
}
}