blob: 2b7e8710c1e233d9225c2389f74214e19079f516 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.examples.datastructures;
import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCondition;
import org.apache.ignite.IgniteLock;
import org.apache.ignite.Ignition;
import org.apache.ignite.examples.ExampleNodeStartup;
import org.apache.ignite.lang.IgniteRunnable;
/**
* This example demonstrates cache based reentrant lock.
* <p>
* Remote nodes should always be started with special configuration
* file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
* <p>
* Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code
* examples/config/example-ignite.xml} configuration.
*/
public class IgniteLockExample {
/** Number of items for each producer/consumer to produce/consume. */
private static final int OPS_CNT = 100;
/** Number of producers. */
private static final int NUM_PRODUCERS = 5;
/** Number of consumers. */
private static final int NUM_CONSUMERS = 5;
/** Cache name. */
private static final String CACHE_NAME = "cache";
/** Name of the global resource. */
private static final String QUEUE_ID = "queue";
/** Name of the synchronization variable. */
private static final String SYNC_NAME = "done";
/** Name of the condition object. */
private static final String NOT_FULL = "notFull";
/** Name of the condition object. */
private static final String NOT_EMPTY = "notEmpty";
/**
* Executes example.
*
* @param args Command line arguments, none required.
*/
public static void main(String[] args) {
try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
System.out.println();
System.out.println(">>> Cache atomic reentrant lock example started.");
// Make name of reentrant lock.
final String reentrantLockName = UUID.randomUUID().toString();
// Initialize lock.
IgniteLock lock = ignite.reentrantLock(reentrantLockName, true, false, true);
// Init distributed cache.
IgniteCache<String, Integer> cache = ignite.getOrCreateCache(CACHE_NAME);
// Init shared variable.
cache.put(QUEUE_ID, 0);
// Shared variable indicating number of jobs left to be completed.
cache.put(SYNC_NAME, NUM_PRODUCERS + NUM_CONSUMERS);
// Start consumers on all cluster nodes.
for (int i = 0; i < NUM_CONSUMERS; i++)
ignite.compute().runAsync(new Consumer(reentrantLockName));
// Start producers on all cluster nodes.
for (int i = 0; i < NUM_PRODUCERS; i++)
ignite.compute().runAsync(new Producer(reentrantLockName));
System.out.println("Master node is waiting for all other nodes to finish...");
// Wait for everyone to finish.
try {
lock.lock();
IgniteCondition notDone = lock.getOrCreateCondition(SYNC_NAME);
int cnt = cache.get(SYNC_NAME);
while (cnt > 0) {
notDone.await();
cnt = cache.get(SYNC_NAME);
}
}
finally {
lock.unlock();
}
}
System.out.flush();
System.out.println();
System.out.println("Finished reentrant lock example...");
System.out.println("Check all nodes for output (this node is also part of the cluster).");
}
/**
* Closure which simply acquires reentrant lock.
*/
private abstract static class ReentrantLockExampleClosure implements IgniteRunnable {
/** Semaphore name. */
protected final String reentrantLockName;
/**
* @param reentrantLockName Reentrant lock name.
*/
ReentrantLockExampleClosure(String reentrantLockName) {
this.reentrantLockName = reentrantLockName;
}
}
/**
* Closure which simulates producer.
*/
private static class Producer extends ReentrantLockExampleClosure {
/**
* @param reentrantLockName Reentrant lock name.
*/
public Producer(String reentrantLockName) {
super(reentrantLockName);
}
/** {@inheritDoc} */
@Override public void run() {
System.out.println("Producer started. ");
IgniteLock lock = Ignition.ignite().reentrantLock(reentrantLockName, true, false, true);
// Condition to wait on when queue is full.
IgniteCondition notFull = lock.getOrCreateCondition(NOT_FULL);
// Signaled to wait on when queue is empty.
IgniteCondition notEmpty = lock.getOrCreateCondition(NOT_EMPTY);
// Signaled when job is done.
IgniteCondition done = lock.getOrCreateCondition(SYNC_NAME);
IgniteCache<String, Integer> cache = Ignition.ignite().cache(CACHE_NAME);
for (int i = 0; i < OPS_CNT; i++) {
try {
lock.lock();
int val = cache.get(QUEUE_ID);
while (val >= 100) {
System.out.println("Queue is full. Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() +
" paused.");
notFull.await();
val = cache.get(QUEUE_ID);
}
val++;
System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() +
", available=" + val + ']');
cache.put(QUEUE_ID, val);
notEmpty.signalAll();
}
finally {
lock.unlock();
}
}
System.out.println("Producer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']');
try {
lock.lock();
int cnt = cache.get(SYNC_NAME);
cnt--;
cache.put(SYNC_NAME, cnt);
// Signals the master thread.
done.signal();
}
finally {
lock.unlock();
}
}
}
/**
* Closure which simulates consumer.
*/
private static class Consumer extends ReentrantLockExampleClosure {
/**
* @param reentrantLockName ReentrantLock name.
*/
public Consumer(String reentrantLockName) {
super(reentrantLockName);
}
/** {@inheritDoc} */
@Override public void run() {
System.out.println("Consumer started. ");
Ignite g = Ignition.ignite();
IgniteLock lock = g.reentrantLock(reentrantLockName, true, false, true);
// Condition to wait on when queue is full.
IgniteCondition notFull = lock.getOrCreateCondition(NOT_FULL);
// Signaled to wait on when queue is empty.
IgniteCondition notEmpty = lock.getOrCreateCondition(NOT_EMPTY);
// Signaled when job is done.
IgniteCondition done = lock.getOrCreateCondition(SYNC_NAME);
IgniteCache<String, Integer> cache = g.cache(CACHE_NAME);
for (int i = 0; i < OPS_CNT; i++) {
try {
lock.lock();
int val = cache.get(QUEUE_ID);
while (val <= 0) {
System.out.println("Queue is empty. Consumer [nodeId=" +
Ignition.ignite().cluster().localNode().id() + " paused.");
notEmpty.await();
val = cache.get(QUEUE_ID);
}
val--;
System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() +
", available=" + val + ']');
cache.put(QUEUE_ID, val);
notFull.signalAll();
}
finally {
lock.unlock();
}
}
System.out.println("Consumer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']');
try {
lock.lock();
int cnt = cache.get(SYNC_NAME);
cnt--;
cache.put(SYNC_NAME, cnt);
// Signals the master thread.
done.signal();
}
finally {
lock.unlock();
}
}
}
}