blob: 48ea677ac25da8510bc3c28858eadcb6f501c2ee [file] [log] [blame]
/*
* Copyright 2001-2008 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.juddi.replication;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import javax.persistence.Query;
import javax.xml.bind.JAXB;
import javax.xml.ws.BindingProvider;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.juddi.api_v3.Node;
import org.apache.juddi.config.AppConfig;
import org.apache.juddi.config.PersistenceManager;
import org.apache.juddi.config.Property;
import org.apache.juddi.mapping.MappingApiToModel;
import org.apache.juddi.mapping.MappingModelToApi;
import org.apache.juddi.model.ChangeRecord;
import org.apache.juddi.model.ReplicationConfiguration;
import org.apache.juddi.v3.client.UDDIService;
import org.apache.juddi.v3.client.cryptor.TransportSecurityHelper;
import org.uddi.repl_v3.ChangeRecordIDType;
import org.uddi.repl_v3.CommunicationGraph.Edge;
import org.uddi.repl_v3.HighWaterMarkVectorType;
import org.uddi.repl_v3.NotifyChangeRecordsAvailable;
import org.uddi.repl_v3.Operator;
import org.uddi.v3_service.UDDIReplicationPortType;
/**
* Handles when local records have been changed, change journal storage and
* notifications to all remote replication nodes that something has been
* altered.
*
* @author <a href="mailto:alexoree@apache.org">Alex O'Ree</a>
*
*/
public class ReplicationNotifier extends TimerTask {
private static Log log = LogFactory.getLog(ReplicationNotifier.class);
private Timer timer = null;
private long startBuffer = 5000;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default
private long interval = 5000;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
private static String node = null;
private static UDDIService uddiService = new UDDIService();
/**
* default constructor
*
* @throws ConfigurationException
*/
public ReplicationNotifier() throws ConfigurationException {
super();
init();
}
private synchronized void init() throws ConfigurationException {
timer = new Timer(true);
startBuffer=AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_START_BUFFER, 5000l);
interval = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_INTERVAL, 5000l);
timer.scheduleAtFixedRate(this, startBuffer, interval);
if (queue == null) {
queue = new ConcurrentLinkedQueue();
}
node = AppConfig.getConfiguration().getString(Property.JUDDI_NODE_ID, "UNDEFINED_NODE_NAME");
}
@Override
public boolean cancel() {
timer.cancel();
//TODO notify other nodes that i'm going down
return super.cancel();
}
//ReplicationNotifier.Enqueue(this);
public synchronized static void enqueue(org.apache.juddi.model.ChangeRecord change) {
if (queue == null) {
queue = new ConcurrentLinkedQueue<org.apache.juddi.model.ChangeRecord>();
}
queue.add(change);
}
public synchronized static void EnqueueRetransmit(org.uddi.repl_v3.ChangeRecord change) {
if (queue2 == null) {
queue2 = new ConcurrentLinkedQueue<org.uddi.repl_v3.ChangeRecord>();
}
queue2.add(change);
}
static Queue<org.apache.juddi.model.ChangeRecord> queue;
static Queue<org.uddi.repl_v3.ChangeRecord> queue2;
/**
* Note: this is for locally originated changes only, see null null null {@link org.apache.juddi.api.impl.UDDIReplicationImpl.PullTimerTask#PersistChangeRecord PersistChangeRecord
* } for how remote changes are processed
*
* @param j must be one of the UDDI save APIs
*
*/
protected void ProcessChangeRecord(org.apache.juddi.model.ChangeRecord j) {
//store and convert the changes to database model
//TODO need a switch to send the notification without persisting the record
//this is to support multihop notifications
EntityManager em = PersistenceManager.getEntityManager();
EntityTransaction tx = null;
try {
tx = em.getTransaction();
tx.begin();
j.setIsAppliedLocally(true);
em.persist(j);
j.setOriginatingUSN(j.getId());
em.merge(j);
log.info("CR saved locally, it was from " + j.getNodeID()
+ " USN:" + j.getOriginatingUSN()
+ " Type:" + j.getRecordType().name()
+ " Key:" + j.getEntityKey()
+ " Local id:" + j.getId());
tx.commit();
} catch (Exception ex) {
log.fatal("unable to store local change record locally!!", ex);
if (tx != null && tx.isActive()) {
tx.rollback();
}
JAXB.marshal(MappingModelToApi.mapChangeRecord(j), System.out);
} finally {
em.close();
}
log.debug("ChangeRecord: " + j.getId() + "," + j.getEntityKey() + "," + j.getNodeID() + "," + j.getOriginatingUSN() + "," + j.getRecordType().toString());
SendNotifications(j.getId(), j.getNodeID(), false);
}
private void SendNotifications(Long id, String origin_node, boolean isRetrans) {
org.uddi.repl_v3.ReplicationConfiguration repcfg = FetchEdges();
if (repcfg == null) {
log.debug("No replication configuration is defined!");
return;
}
if (id == null || origin_node == null) {
log.fatal("Either the id is null or the origin_node is null. I can't send out this alert!!");
//throw new Exception(node);
return;
}
Set<Object> destinationUrls = new HashSet<Object>();
/**
* In the absence of a communicationGraph element from the
* Replication Configuration Structure (although it's mandatory
* in the xsd), all nodes listed in the node element MAY send
* any and all messages to any other node of the registry.
*/
if (repcfg.getCommunicationGraph() == null
|| repcfg.getCommunicationGraph().getEdge().isEmpty() && !isRetrans) {
//no edges or graph defined, default to the operator list
//retransmission only applies to non-directed-edge replication, thus the extra check
for (Operator o : repcfg.getOperator()) {
//no need to tell myself about a change at myself or the origin
if (!o.getOperatorNodeID().equalsIgnoreCase(node) && !o.getOperatorNodeID().equalsIgnoreCase(origin_node)) {
destinationUrls.add(o.getSoapReplicationURL());
}
}
} else {
//this is for directed graph replication
//find all nodes that i need to notify
Iterator<Edge> iterator = repcfg.getCommunicationGraph().getEdge().iterator();
while (iterator.hasNext()) {
Edge next = iterator.next();
if (next.getMessageSender().equalsIgnoreCase(node)) {
//this is my server and i need to transmit the notification to
String messageReceiver = next.getMessageReceiver();
PrimaryAlternate container = new PrimaryAlternate();
//pointless to send a notification to myself or the origin
if (!messageReceiver.equalsIgnoreCase(node) && !messageReceiver.equalsIgnoreCase(origin_node)) {
//look up the endpoint urls
for (int x = 0; x < repcfg.getOperator().size(); x++) {
if (repcfg.getOperator().get(x).getOperatorNodeID().equalsIgnoreCase(messageReceiver)) {
container.primaryUrl = repcfg.getOperator().get(x).getSoapReplicationURL();
}
}
for (int y = 0; y < next.getMessageReceiverAlternate().size(); y++) {
for (int x = 0; x < repcfg.getOperator().size(); x++) {
if (repcfg.getOperator().get(x).getOperatorNodeID().equalsIgnoreCase(next.getMessageReceiverAlternate().get(y))) {
container.alternateUrls.add(repcfg.getOperator().get(x).getSoapReplicationURL());
}
}
}
}
if (container.primaryUrl != null) {
destinationUrls.add(container);
} else {
log.warn("Unable to find primary url for directed edge graph replication from this node " + node + " to "
+ "destination node " + next.getMessageReceiver() + " it will be ignored!");
}
}
}
}
if (destinationUrls.isEmpty()) {
log.debug("Something is bizarre with the replication config. I should have had at least one node to notify, but I have none!");
return;
}
UDDIReplicationPortType x = uddiService.getUDDIReplicationPort();
TransportSecurityHelper.applyTransportSecurity((BindingProvider) x);
for (Object s : destinationUrls) {
NotifyChangeRecordsAvailable req = new NotifyChangeRecordsAvailable();
req.setNotifyingNode(node);
HighWaterMarkVectorType highWaterMarkVectorType = new HighWaterMarkVectorType();
highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(origin_node, id));
req.setChangesAvailable(highWaterMarkVectorType);
if (s instanceof String) {
SendNotification(x, (String) s, req);
} else if (s instanceof PrimaryAlternate) {
//more complex directed graph stuff
PrimaryAlternate pa = (PrimaryAlternate) s;
if (!SendNotification(x, pa.primaryUrl, req)) {
for (String url : pa.alternateUrls) {
if (SendNotification(x, url, req)) {
break;
}
//no need to continue to additional alternates
}
} else {
//primary url succeeded, no further action required
}
}
//TODO the spec talks about control messages, should we even support it? seems pointless
}
}
/**
* return true if successful
*
* @param x
* @param s
* @param req
* @return
*/
private boolean SendNotification(UDDIReplicationPortType x, String s, NotifyChangeRecordsAvailable req) {
((BindingProvider) x).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, s);
try {
x.notifyChangeRecordsAvailable(req);
log.info("Successfully sent change record available message to " + s + " this node: " + node);
return true;
} catch (Exception ex) {
log.warn("Unable to send change notification to " + s + " this node: " + node + " reason: " + ex.getMessage());
log.debug("Unable to send change notification to " + s, ex);
}
return false;
}
private static class PrimaryAlternate {
String primaryUrl = null;
List<String> alternateUrls = new ArrayList<String>();
}
public synchronized void run() {
log.debug("Replication thread triggered");
if (queue == null) {
queue = new ConcurrentLinkedQueue();
}
if (queue2 == null) {
queue2 = new ConcurrentLinkedQueue();
}
//TODO revisie this
if (!queue.isEmpty()) {
log.info("Replication, Notifying nodes of new change records. " + queue.size() + " queued");
}
//TODO check for replication config changes
while (!queue.isEmpty()) {
//for each change at this node
ChangeRecord j = queue.poll();
ProcessChangeRecord(j);
}
while (!queue2.isEmpty()) {
//for each change at this node
org.uddi.repl_v3.ChangeRecord j = queue2.poll();
ChangeRecord model = new ChangeRecord();
try {
model = MappingApiToModel.mapChangeRecord(j);
} catch (UnsupportedEncodingException ex) {
Logger.getLogger(ReplicationNotifier.class.getName()).log(Level.SEVERE, null, ex);
}
log.info("retransmitting CR notificationm entity owner: " + j.getChangeID().getNodeID() + " CR: " + j.getChangeID().getOriginatingUSN() + " key:" + model.getEntityKey() + " " + model.getRecordType().name() + " accepted locally:" + model.getIsAppliedLocally());
SendNotifications(j.getChangeID().getOriginatingUSN(), j.getChangeID().getNodeID(), true);
}
}
/**
* returns the latest version of the replication config or null if there
* is no config
*
* @return
*/
public static org.uddi.repl_v3.ReplicationConfiguration FetchEdges() {
EntityManager em = PersistenceManager.getEntityManager();
EntityTransaction tx = null;
org.uddi.repl_v3.ReplicationConfiguration item = new org.uddi.repl_v3.ReplicationConfiguration();
try {
tx = em.getTransaction();
tx.begin();
Query q = em.createQuery("SELECT item FROM ReplicationConfiguration item order by item.serialNumber DESC");
q.setMaxResults(1);
ReplicationConfiguration results = (ReplicationConfiguration) q.getSingleResult();
// ReplicationConfiguration find = em.find(ReplicationConfiguration.class, null);
if (results != null) {
MappingModelToApi.mapReplicationConfiguration(results, item);
}
tx.commit();
return item;
} catch (Exception ex) {
//log.error("error", ex);
//no config available
if (tx != null && tx.isActive()) {
tx.rollback();
}
} finally {
em.close();
}
return null;
}
@Deprecated
private Node getNode(String messageSender) {
EntityManager em = PersistenceManager.getEntityManager();
EntityTransaction tx = null;
try {
tx = em.getTransaction();
tx.begin();
Node api = new Node();
org.apache.juddi.model.Node find = em.find(org.apache.juddi.model.Node.class, messageSender);
if (find != null) {
MappingModelToApi.mapNode(find, api);
}
tx.commit();
return api;
} catch (Exception ex) {
log.error("error", ex);
if (tx != null && tx.isActive()) {
tx.rollback();
}
} finally {
em.close();
}
return null;
}
}