blob: 7ce56d3ff48476f394e826916ee28d6954d4b5ad [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
using System;
using System.Reactive;
using System.Threading;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Org.Apache.REEF.Wake.RX.Impl;
namespace Org.Apache.REEF.Wake.Tests
{
[TestClass]
public class PubSubSubjectTest
{
[TestMethod]
public void TestPubSubSubjectSingleThread()
{
int sum = 0;
// Observer that adds sum of numbers up to and including x
PubSubSubject<int> subject = new PubSubSubject<int>();
subject.Subscribe(Observer.Create<int>(
x =>
{
for (int i = 0; i <= x; i++)
{
sum += i;
}
}));
subject.OnNext(10);
subject.OnCompleted();
Assert.AreEqual(sum, 55);
}
[TestMethod]
public void TestPubSubSubjectMultipleThreads()
{
int sum = 0;
PubSubSubject<int> subject = new PubSubSubject<int>();
subject.Subscribe(Observer.Create<int>(x => sum += x));
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.Length; i++)
{
threads[i] = new Thread(() =>
{
for (int j = 0; j < 10000; j++)
{
subject.OnNext(1);
}
});
threads[i].Start();
}
foreach (Thread thread in threads)
{
thread.Join();
}
Assert.AreEqual(sum, 100000);
}
[TestMethod]
public void TestMultipleTypes()
{
int sum1 = 0;
int sum2 = 0;
PubSubSubject<SuperEvent> subject = new PubSubSubject<SuperEvent>();
subject.Subscribe(Observer.Create<SubEvent1>(x => sum1 += 100));
subject.Subscribe(Observer.Create<SubEvent2>(x => sum2 += 500));
subject.OnNext(new SubEvent1());
subject.OnNext(new SubEvent2());
subject.OnNext(new SubEvent2());
Assert.AreEqual(sum1, 100);
Assert.AreEqual(sum2, 1000);
}
[TestMethod]
public void TestOnCompleted()
{
int sum = 0;
PubSubSubject<int> subject = new PubSubSubject<int>();
subject.Subscribe(Observer.Create<int>(x => sum += x));
subject.OnNext(10);
Assert.AreEqual(10, sum);
subject.OnNext(10);
Assert.AreEqual(20, sum);
// Check that after calling OnCompleted, OnNext will do nothing
subject.OnCompleted();
subject.OnNext(10);
Assert.AreEqual(20, sum);
}
[TestMethod]
public void TestOnError()
{
int sum = 0;
PubSubSubject<int> subject = new PubSubSubject<int>();
subject.Subscribe(Observer.Create<int>(x => sum += x));
subject.OnNext(10);
Assert.AreEqual(10, sum);
subject.OnNext(10);
Assert.AreEqual(20, sum);
// Check that after calling OnError, OnNext will do nothing
subject.OnError(new Exception("error"));
subject.OnNext(10);
Assert.AreEqual(20, sum);
}
[TestMethod]
public void TestDisposeSingleSubject()
{
int sum = 0;
PubSubSubject<int> subject = new PubSubSubject<int>();
var disposable = subject.Subscribe(Observer.Create<int>(x => sum += x));
subject.OnNext(10);
subject.OnNext(10);
subject.OnNext(10);
Assert.AreEqual(30, sum);
// Unregister the subject and check that calling OnNext does nothing
disposable.Dispose();
subject.OnNext(10);
Assert.AreEqual(30, sum);
}
[TestMethod]
public void TestDisposeMultipleSubjects()
{
int sum1 = 0;
int sum2 = 0;
SubEvent1 event1 = new SubEvent1();
SubEvent2 event2 = new SubEvent2();
PubSubSubject<SuperEvent> subject = new PubSubSubject<SuperEvent>();
var disposable1 = subject.Subscribe(Observer.Create<SubEvent1>(x => sum1 += 100));
var disposable2 = subject.Subscribe(Observer.Create<SubEvent2>(x => sum2 += 500));
subject.OnNext(event1);
subject.OnNext(event2);
subject.OnNext(event2);
Assert.AreEqual(sum1, 100);
Assert.AreEqual(sum2, 1000);
// Check that unsubscribing from SubEvent1 does not affect other subscriptions
disposable1.Dispose();
subject.OnNext(event1);
subject.OnNext(event2);
Assert.AreEqual(sum1, 100);
Assert.AreEqual(sum2, 1500);
// Unsubscribe from the remaining event types
disposable2.Dispose();
subject.OnNext(event1);
subject.OnNext(event2);
Assert.AreEqual(sum1, 100);
Assert.AreEqual(sum2, 1500);
}
class SuperEvent
{
}
class SubEvent1 : SuperEvent
{
}
class SubEvent2 : SuperEvent
{
}
}
}