[REEF-1385] Implement TaskMessage.MessageSourceID in REEF.NET
This addressed the issue by
* Implementing TaskMessage.MessageSourceID.
* Modifying messaging test to check for MessageSourceID.
JIRA:
[REEF-1385](https://issues.apache.org/jira/browse/REEF-1385)
Pull Request:
This closes #1010
diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
index e3fef8a..cf4b947 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
+++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
@@ -111,12 +111,14 @@
jobject _jobjectTaskMessage = NULL;
JavaVM* _jvm;
jstring _jstringId = NULL;
+ jstring _jstringMessageSourceId = NULL;
public:
TaskMessageClr2Java(JNIEnv *env, jobject jtaskMessage);
~TaskMessageClr2Java();
!TaskMessageClr2Java();
virtual void OnError(String^ message);
virtual String^ GetId();
+ virtual String^ GetMessageSourceId();
};
public ref class FailedTaskClr2Java : public IFailedTaskClr2Java {
diff --git a/lang/cs/Org.Apache.REEF.Bridge/TaskMessageClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/TaskMessageClr2Java.cpp
index 22db358..b5cee9d 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/TaskMessageClr2Java.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/TaskMessageClr2Java.cpp
@@ -36,8 +36,14 @@
}
_jobjectTaskMessage = reinterpret_cast<jobject>(env->NewGlobalRef(jtaskMessage));
+ // Get the Task ID.
jclass jclassTaskMessage = env->GetObjectClass(_jobjectTaskMessage);
_jstringId = CommonUtilities::GetJObjectId(env, _jobjectTaskMessage, jclassTaskMessage);
+
+ // Get the Task Message Source ID.
+ jmethodID jmid = env->GetMethodID(jclassTaskMessage, "getMessageSourceId", "()Ljava/lang/String;");
+ _jstringMessageSourceId = CommonUtilities::CallGetMethodNewGlobalRef<jstring>(env, _jobjectTaskMessage, jmid);
+
ManagedLog::LOGGER->LogStop("TaskMessageClr2Java::TaskMessageClr2Java");
}
@@ -54,6 +60,10 @@
if (_jstringId != NULL) {
env->DeleteGlobalRef(_jstringId);
}
+
+ if (_jstringMessageSourceId != NULL) {
+ env->DeleteGlobalRef(_jstringMessageSourceId);
+ }
}
void TaskMessageClr2Java::OnError(String^ message) {
@@ -67,6 +77,11 @@
JNIEnv *env = RetrieveEnv(_jvm);
return ManagedStringFromJavaString(env, _jstringId);
}
+
+ String^ TaskMessageClr2Java::GetMessageSourceId() {
+ JNIEnv *env = RetrieveEnv(_jvm);
+ return ManagedStringFromJavaString(env, _jstringMessageSourceId);
+ }
}
}
}
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ITaskMessageClr2Java.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ITaskMessageClr2Java.cs
index fc6a362..0b2be83 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ITaskMessageClr2Java.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/ITaskMessageClr2Java.cs
@@ -23,5 +23,7 @@
public interface ITaskMessageClr2Java : IClr2Java
{
string GetId();
+
+ string GetMessageSourceId();
}
}
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/TaskMessage.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/TaskMessage.cs
index 718d898..ea4c1e9 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/TaskMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/TaskMessage.cs
@@ -23,7 +23,6 @@
{
/// <summary>
/// TaskMessage which wraps ITaskMessageClr2Java
- /// TODO[JIRA REEF-1385]: Implement MessageSourceID.
/// </summary>
[DataContract]
internal sealed class TaskMessage : ITaskMessage
@@ -43,6 +42,11 @@
get { return _taskMessageClr2Java.GetId(); }
}
+ public string MessageSourceId
+ {
+ get { return _taskMessageClr2Java.GetMessageSourceId(); }
+ }
+
[DataMember]
public byte[] Message
{
diff --git a/lang/cs/Org.Apache.REEF.Driver/Task/ITaskMessage.cs b/lang/cs/Org.Apache.REEF.Driver/Task/ITaskMessage.cs
index b411a45..1cdbd6a 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Task/ITaskMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Task/ITaskMessage.cs
@@ -31,5 +31,10 @@
/// The ID of the task that sent the message.
/// </summary>
string TaskId { get; }
+
+ /// <summary>
+ /// The message source ID of the TaskMessage.
+ /// </summary>
+ string MessageSourceId { get; }
}
}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs
index 3183b0a..2f6aeaa 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs
@@ -78,14 +78,19 @@
public void OnNext(ITaskMessage taskMessage)
{
- // TODO[JIRA REEF-1385]: Check the MessageTaskSourceID.
var msgReceived = ByteUtilities.ByteArraysToString(taskMessage.Message);
if (!msgReceived.Equals(MessageTask.MessageSend))
{
- Exceptions.Throw(new Exception("Unexpected message: " + msgReceived),
- "Unexpected task message received: " + msgReceived,
- Logger);
+ var errorMessage = "Unexpected task message received: " + msgReceived + ". Expected: " +
+ MessageTask.MessageSend;
+ Exceptions.Throw(new Exception(errorMessage), errorMessage, Logger);
+ }
+ else if (!taskMessage.MessageSourceId.Equals(MessageTask.MessageTaskSourceId))
+ {
+ var errorMessage = "Unexpected TaskMessage.MessageSourceId received: " + taskMessage.MessageSourceId +
+ ". Expected: " + MessageTask.MessageTaskSourceId;
+ Exceptions.Throw(new Exception(errorMessage), errorMessage, Logger);
}
else
{
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs
index 60662a9..cfe2483 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs
@@ -36,7 +36,6 @@
public const string MessageSend = "MESSAGE:TASK";
- // TODO[JIRA REEF-1385]: Check the MessageTaskSourceID on the Driver side.
public const string MessageTaskSourceId = "MessageTaskSourceId";
public const string MessageSentToDriverLog = "Message sent to Driver from Task.";
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java
index dd688b4..ec1156e 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java
@@ -32,12 +32,10 @@
CsFiles = { "ITaskMessageClr2Java.cs", "TaskMessage.cs" })
public final class TaskMessageBridge extends NativeBridge implements Identifiable {
private TaskMessage jtaskMessage;
- private String taskId;
// we don't really need to pass this around, just have this as place holder for future.
public TaskMessageBridge(final TaskMessage taskMessage) {
jtaskMessage = taskMessage;
- taskId = taskMessage.getId();
}
@Override
@@ -46,6 +44,10 @@
@Override
public String getId() {
- return taskId;
+ return jtaskMessage.getId();
+ }
+
+ public String getMessageSourceId() {
+ return jtaskMessage.getMessageSourceID();
}
}