blob: 75784cfb562b40a385e47b7047c150ef039cd971 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.scalar.examples
import java.util.UUID
import java.util.concurrent.CountDownLatch
import org.apache.ignite.messaging.MessagingListenActor
import org.apache.ignite.scalar.scalar
import org.apache.ignite.scalar.scalar._
/**
* Demonstrates simple protocol-based exchange in playing a ping-pong between
* two nodes. It is analogous to `MessagingPingPongExample` on Java side.
* <p/>
* Remote nodes should always be started with special configuration file which
* enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`.
* <p/>
* Alternatively you can run `ExampleNodeStartup` in another JVM which will
* start node with `examples/config/example-ignite.xml` configuration.
*/
object ScalarPingPongExample extends App {
scalar("examples/config/example-ignite.xml") {
pingPong()
//pingPong2()
}
/**
* Implements Ping Pong example between local and remote node.
*/
def pingPong() {
val g = ignite$
if (g.cluster().nodes().size < 2) {
println(">>>")
println(">>> I need a partner to play a ping pong!")
println(">>>")
return
}
else {
// Pick first remote node as a partner.
val nodeB = g.cluster().forNode(g.remoteNodes$().head)
// Set up remote player: configure remote node 'rmt' to listen
// for messages from local node 'loc'.
g.message(nodeB).remoteListen(null, new MessagingListenActor[String]() {
def receive(nodeId: UUID, msg: String) {
println(msg)
msg match {
case "PING" => respond("PONG")
case "STOP" => stop()
}
}
})
val latch = new CountDownLatch(10)
// Set up local player: configure local node 'loc'
// to listen for messages from remote node 'rmt'.
ignite$.message().localListen(null, new MessagingListenActor[String]() {
def receive(nodeId: UUID, msg: String) {
println(msg)
if (latch.getCount == 1)
stop("STOP")
else // We know it's 'PONG'.
respond("PING")
latch.countDown()
}
})
// Serve!
nodeB.send$("PING", null)
// Wait til the match is over.
latch.await()
}
}
/**
* Implements Ping Pong example between two remote nodes.
*/
def pingPong2() {
val g = ignite$
if (g.cluster().forRemotes().nodes().size() < 2) {
println(">>>")
println(">>> I need at least two remote nodes!")
println(">>>")
}
else {
// Pick two remote nodes.
val n1 = g.cluster().forRemotes().head
val n2 = g.cluster().forRemotes().tail.head
val n1p = g.cluster().forNode(n1)
val n2p = g.cluster().forNode(n2)
// Configure remote node 'n1' to receive messages from 'n2'.
g.message(n1p).remoteListen(null, new MessagingListenActor[String] {
def receive(nid: UUID, msg: String) {
println(msg)
msg match {
case "PING" => respond("PONG")
case "STOP" => stop()
}
}
})
// Configure remote node 'n2' to receive messages from 'n1'.
g.message(n2p).remoteListen(null, new MessagingListenActor[String] {
// Get local count down latch.
private lazy val latch: CountDownLatch = g.cluster().nodeLocalMap().get("latch")
def receive(nid: UUID, msg: String) {
println(msg)
latch.getCount match {
case 1 => stop("STOP")
case _ => respond("PING")
}
latch.countDown()
}
})
// 1. Sets latch into node local storage so that local actor could use it.
// 2. Sends first 'PING' to 'n1'.
// 3. Waits until all messages are exchanged between two remote nodes.
n2p.run$(() => {
val latch = new CountDownLatch(10)
g.cluster().nodeLocalMap[String, CountDownLatch].put("latch", latch)
n1p.send$("PING", null)
latch.await()
}, null)
}
}
}