/*
 * Copyright 2001-2008 The Apache Software Foundation.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */
package org.apache.juddi.api.impl;

import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.math.BigInteger;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jws.WebParam;
import javax.jws.WebResult;
import javax.jws.WebService;
import javax.jws.soap.SOAPBinding;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import javax.persistence.Query;
import javax.xml.bind.JAXB;
import javax.xml.bind.annotation.XmlSeeAlso;
import javax.xml.ws.BindingProvider;
import org.apache.juddi.api.util.QueryStatus;
import org.apache.juddi.api.util.ReplicationQuery;
import org.apache.juddi.config.AppConfig;
import org.apache.juddi.config.PersistenceManager;
import org.apache.juddi.config.Property;
import org.apache.juddi.mapping.MappingApiToModel;
import org.apache.juddi.mapping.MappingModelToApi;
import org.apache.juddi.model.BindingTemplate;
import org.apache.juddi.model.BusinessEntity;
import org.apache.juddi.model.BusinessService;
import org.apache.juddi.model.Operator;
import org.apache.juddi.model.PublisherAssertion;
import org.apache.juddi.model.PublisherAssertionId;
import org.apache.juddi.model.Tmodel;
import org.apache.juddi.model.UddiEntity;
import org.apache.juddi.replication.ReplicationNotifier;
import static org.apache.juddi.replication.ReplicationNotifier.FetchEdges;
import org.apache.juddi.v3.client.UDDIService;
import org.apache.juddi.v3.client.cryptor.TransportSecurityHelper;
import org.apache.juddi.v3.error.ErrorMessage;
import org.apache.juddi.v3.error.FatalErrorException;
import org.apache.juddi.v3.error.TransferNotAllowedException;
import org.apache.juddi.validation.ValidateReplication;
import org.uddi.custody_v3.TransferEntities;
import org.uddi.repl_v3.ChangeRecord;
import org.uddi.repl_v3.ChangeRecordAcknowledgement;
import org.uddi.repl_v3.ChangeRecordIDType;
import org.uddi.repl_v3.ChangeRecords;
import org.uddi.repl_v3.CommunicationGraph.Edge;
import org.uddi.repl_v3.DoPing;
import org.uddi.repl_v3.GetChangeRecords;
import org.uddi.repl_v3.HighWaterMarkVectorType;
import org.uddi.repl_v3.NotifyChangeRecordsAvailable;
import org.uddi.repl_v3.ReplicationConfiguration;
import org.uddi.repl_v3.TransferCustody;
import org.uddi.v3_service.DispositionReportFaultMessage;
import org.uddi.v3_service.UDDIReplicationPortType;

/**
 * UDDI Replication defines four APIs. The first two presented here are used to
 * perform replication and issue notifications. The latter ancillary APIs
 * provide support for other aspects of UDDI Replication.
 * <ul>
 * <li>get_changeRecords</li>
 * <li>notify_changeRecordsAvailable</li>
 * <li>do_ping</li>
 * <li>get_highWaterMarks</li></ul>
 *
 * @author <a href="mailto:alexoree@apache.org">Alex O'Ree</a>
 */
@WebService(serviceName = "UDDI_Replication_PortType", targetNamespace = "urn:uddi-org:api_v3_portType",
        endpointInterface = "org.uddi.v3_service.UDDIReplicationPortType")
@XmlSeeAlso({
        org.uddi.custody_v3.ObjectFactory.class,
        org.uddi.repl_v3.ObjectFactory.class,
        org.uddi.subr_v3.ObjectFactory.class,
        org.uddi.api_v3.ObjectFactory.class,
        org.uddi.vscache_v3.ObjectFactory.class,
        org.uddi.vs_v3.ObjectFactory.class,
        org.uddi.sub_v3.ObjectFactory.class,
        org.w3._2000._09.xmldsig_.ObjectFactory.class,
        org.uddi.policy_v3.ObjectFactory.class,
        org.uddi.policy_v3_instanceparms.ObjectFactory.class
})
public class UDDIReplicationImpl extends AuthenticatedService implements UDDIReplicationPortType {

        static void notifyConfigurationChange(ReplicationConfiguration oldConfig, ReplicationConfiguration newConfig, AuthenticatedService service) {

                //if the config is different
                Set<String> oldnodes = getNodes(oldConfig);
                Set<String> newNodes = getNodes(newConfig);

                Set<String> addedNodes = diffNodeList(oldnodes, newNodes);
                if (queue == null) {
                        queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>();
                }
                for (String s : addedNodes) {
                        if (!s.equals(service.getNode())) {
                                logger.info("This node: " + service.getNode() + ". New replication node queue for synchronization: " + s);
                                HighWaterMarkVectorType highWaterMarkVectorType = new HighWaterMarkVectorType();
                                highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(s, 0L));
                                queue.add(new NotifyChangeRecordsAvailable(s, highWaterMarkVectorType));
                        }
                }

        }

        private static Set<String> getNodes(ReplicationConfiguration oldConfig) {
                Set<String> ret = new HashSet<String>();
                if (oldConfig == null) {
                        return ret;
                }
                for (org.uddi.repl_v3.Operator o : oldConfig.getOperator()) {
                        ret.add(o.getOperatorNodeID());
                }
                if (oldConfig.getCommunicationGraph() != null) {
                        ret.addAll(oldConfig.getCommunicationGraph().getNode());
                }
                return ret;
        }

        /**
         * returns items in "newNodes" that are not in "oldNodes"
         *
         * @param oldnodes
         * @param newNodes
         * @return
         */
        private static Set<String> diffNodeList(Set<String> oldnodes, Set<String> newNodes) {
                Set<String> diff = new HashSet<String>();
                Iterator<String> iterator = newNodes.iterator();
                while (iterator.hasNext()) {
                        String lhs = iterator.next();
                        Iterator<String> iterator1 = oldnodes.iterator();
                        boolean found = false;
                        while (iterator1.hasNext()) {
                                String rhs = iterator1.next();
                                if (rhs.equalsIgnoreCase(lhs)) {
                                        found = true;
                                        break;
                                }
                        }
                        if (!found) {
                                diff.add(lhs);
                        }

                }
                return diff;
        }

        private UDDIServiceCounter serviceCounter;

        private static PullTimerTask timer = null;
        private long startBuffer;
        private long interval;

        private static UDDIPublicationImpl pub = null;

        public UDDIReplicationImpl() {
                super();
                try {
                        this.interval = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_INTERVAL, 5000L);
                        this.startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_START_BUFFER, 5000L);
                } catch (Exception ex) {
                        logger.warn("Config error!", ex);
                }
               
                serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIReplicationImpl.class);
                init();

        }

        private synchronized void init() {
                if (pub == null) {
                        pub = new UDDIPublicationImpl();
                }
                if (queue == null) {
                        queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>();
                }
                timer = new PullTimerTask();

        }


        /**
         * handles when a remote node tells me that there's an update(s)
         * available
         */
        private class PullTimerTask extends TimerTask {

                private Timer timer = null;

                public PullTimerTask() {
                        super();
                        timer = new Timer(true);
                        timer.scheduleAtFixedRate(this, startBuffer, interval);
                }
                boolean firstrun = true;

                @Override
                public void run() {
                        if (firstrun) {
                                enqueueAllReceivingNodes();
                                firstrun = false;
                        }

                        if (!queue.isEmpty()) {
                                logger.info("Replication change puller thread started. Queue size: " + queue.size());
                        }
                        //ok someone told me there's a change available
                        while (!queue.isEmpty()) {
                                NotifyChangeRecordsAvailable poll = queue.poll();
                                if (poll != null && !poll.getNotifyingNode().equalsIgnoreCase(getNode())) {
                                        UDDIReplicationPortType replicationClient = getReplicationClient(poll.getNotifyingNode());
                                        if (replicationClient == null) {
                                                logger.fatal("unable to obtain a replication client to node " + poll);
                                        } else {
                                                try {
                                                        //get the high water marks for this node
                                                        //ok now get all the changes

                                                        //done  replace with last known record from the given node
                                                        //for (int xx = 0; xx < poll.getChangesAvailable().getHighWaterMark().size(); xx++) {
                                                        //        logger.info("Node " + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID()
                                                        //                + " USN " + poll.getChangesAvailable().getHighWaterMark().get(xx).getOriginatingUSN());
                                                        //}
                                                        Set<String> nodesHitThisCycle = new HashSet<String>();
                                                        for (int xx = 0; xx < poll.getChangesAvailable().getHighWaterMark().size(); xx++) {
                                                                int recordsreturned = 21;
                                                                while (recordsreturned >= 20) {
                                                                        if (nodesHitThisCycle.contains(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID())) {
                                                                                logger.info("i've already hit the node " + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID() + " this cycle, skipping");
                                                                                break;
                                                                        }
                                                                        if (poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID().equalsIgnoreCase(getNode())) {
                                                                                logger.info("ignoring updates that were generated here " + poll.getChangesAvailable().getHighWaterMark().get(xx).getOriginatingUSN() + " sent by " + poll.getNotifyingNode() + " this node is " + getNode());
                                                                                break;
                                                                        }
                                                                        nodesHitThisCycle.add(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID());
                                                                        GetChangeRecords body = new GetChangeRecords();
                                                                        body.setRequestingNode(getNode());
                                                                        body.setResponseLimitCount(BigInteger.valueOf(100L));

                                                                        body.setChangesAlreadySeen(getLastChangeRecordFrom(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID()));
                                                                        logger.info("fetching updates from " + poll.getNotifyingNode() + " since " + body.getChangesAlreadySeen().getHighWaterMark().get(0).getNodeID() + ":" + body.getChangesAlreadySeen().getHighWaterMark().get(0).getOriginatingUSN() + ", items still in the queue: " + queue.size());
                                                                        //JAXB.marshal(body, System.out);
                                                                        List<ChangeRecord> records
                                                                                = replicationClient.getChangeRecords(body).getChangeRecord();
                                                                        //ok now we need to persist the change records
                                                                        logger.info("Change records retrieved from " + poll.getNotifyingNode() + ", " + records.size());
                                                                        for (int i = 0; i < records.size(); i++) {
                                                                                logger.info("Change records retrieved " + records.get(i).getChangeID().getNodeID() + " USN " + records.get(i).getChangeID().getOriginatingUSN());
                                                                                persistChangeRecord(records.get(i));
                                                                        }
                                                                        recordsreturned = records.size();
                                                                }
                                                        }
                                                } catch (Exception ex) {
                                                        logger.error("Error caught fetching replication changes from " + poll + " @" + ((BindingProvider) replicationClient).getRequestContext().get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY), ex);
                                                }
                                        }
                                } else {
                                        if (poll == null) {
                                                logger.warn("strange, popped a null object");
                                        } else if (poll.getNotifyingNode().equalsIgnoreCase(getNode())) {
                                                logger.warn("strange, popped an object from the queue but it was from myself. This probably indicates a configuration error! ignoring...first record: " + poll.getChangesAvailable().getHighWaterMark().get(0).getNodeID()+":" + poll.getChangesAvailable().getHighWaterMark().get(0).getOriginatingUSN());
                                        }
                                }
                        }
                }

                @Override
                public boolean cancel() {
                        timer.cancel();
                        return super.cancel();
                }

                /**
                 * someone told me there's a change available, we retrieved it
                 * and are processing the changes locally.
                 *
                 * @param rec
                 */
                private void persistChangeRecord(ChangeRecord rec) {
                        if (rec == null) {
                                return;
                        }
                        logger.debug("_______________________Remote change request " + rec.getChangeID().getNodeID() + ":" + rec.getChangeID().getOriginatingUSN());

                        if (rec.getChangeID().getNodeID().equalsIgnoreCase(getNode())) {
                                logger.info("Just received a change record that i created, ignoring....");
                                return;
                        }
                        EntityManager em = PersistenceManager.getEntityManager();
                        EntityTransaction tx = em.getTransaction();
                        org.apache.juddi.model.ChangeRecord mapChangeRecord = null;
                        /**
                         * In nodes that support pre-bundled replication
                         * responses, the recipient of the get_changeRecords
                         * message MAY return more change records than requested
                         * by the caller. In this scenario, the caller MUST also
                         * be prepared to deal with such redundant changes where
                         * a USN is less than the USN specified in the
                         * changesAlreadySeen highWaterMarkVector.
                         */

                        try {
                                tx.begin();
                                //check to see if we have this update already
                                Query createQuery = em.createQuery("select c from ChangeRecord c where c.nodeID=:node and c.originatingUSN=:oid");
                                createQuery.setParameter("node", rec.getChangeID().getNodeID());
                                createQuery.setParameter("oid", rec.getChangeID().getOriginatingUSN());
                                Object existingrecord = null;
                                try {
                                        existingrecord = createQuery.getSingleResult();
                                } catch (Exception ex) {
                                        logger.debug("error checking to see if change record exists already (expected failure)", ex);
                                }
                                if (existingrecord != null) {
                                        logger.info("I've already processed change record " + rec.getChangeID().getNodeID() + " " + rec.getChangeID().getOriginatingUSN());
                                        return;
                                }
                                //if it didn't come from here and i haven't seen it yet
                                ReplicationNotifier.EnqueueRetransmit(rec);
                                //the remotechange record rec must also be persisted!!
                                mapChangeRecord = MappingApiToModel.mapChangeRecord(rec);
                                mapChangeRecord.setId(null);
                                mapChangeRecord.setIsAppliedLocally(true);
                                em.persist(mapChangeRecord);
                                tx.commit();
                                logger.info("Remote CR saved, it was from " + mapChangeRecord.getNodeID() //this is the origin of the change
                                        + " USN:" + mapChangeRecord.getOriginatingUSN()
                                        + " Type:" + mapChangeRecord.getRecordType().name()
                                        + " Key:" + mapChangeRecord.getEntityKey()
                                        + " Local id from sender:" + mapChangeRecord.getId());
                                tx = em.getTransaction();
                                tx.begin();
                                //<editor-fold defaultstate="collapsed" desc="delete a record">

                                if (rec.getChangeRecordDelete() != null) {
                                        if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBindingKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBindingKey())) {
                                                //delete a binding template
                                                UddiEntity ue = em.find(BindingTemplate.class, rec.getChangeRecordDelete().getBindingKey());
                                                validateNodeIdMisMatches(ue, getNode());
                                                pub.deleteBinding(rec.getChangeRecordDelete().getBindingKey(), em);
                                        }
                                        if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBusinessKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBusinessKey())) {
                                                //delete a business 
                                                UddiEntity ue = em.find(BusinessEntity.class, rec.getChangeRecordDelete().getBusinessKey());
                                                validateNodeIdMisMatches(ue, getNode());
                                                pub.deleteBusiness(rec.getChangeRecordDelete().getBusinessKey(), em);
                                        }
                                        if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getServiceKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getServiceKey())) {
                                                UddiEntity ue = em.find(BusinessService.class, rec.getChangeRecordDelete().getServiceKey());
                                                validateNodeIdMisMatches(ue, getNode());
                                                //delete a service 
                                                pub.deleteService(rec.getChangeRecordDelete().getServiceKey(), em);
                                        }
                                        if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getTModelKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getTModelKey())) {
                                                //delete a tmodel 
                                                /**
                                                 * The changeRecordDelete for a
                                                 * tModel does not correspond to
                                                 * any API described in this
                                                 * specification and should only
                                                 * appear in the replication
                                                 * stream as the result of an
                                                 * administrative function to
                                                 * permanently remove a tModel.
                                                 */
                                                UddiEntity tm = em.find(Tmodel.class, rec.getChangeRecordDelete().getTModelKey());
                                                if (tm != null) {
                                                        validateNodeIdMisMatches(tm, getNode());
                                                        em.remove(tm);
                                                } else {
                                                        logger.error("failed to adminstratively delete tmodel because it doesn't exist. " + rec.getChangeRecordDelete().getTModelKey());
                                                }
                                                //pub.deleteTModel(rec.getChangeRecordDelete().getTModelKey(), em);
                                        }
                                }
                                if (rec.getChangeRecordDeleteAssertion() != null && rec.getChangeRecordDeleteAssertion().getPublisherAssertion() != null) {
                                        //delete a pa template                            
                                        pub.deletePublisherAssertion(rec.getChangeRecordDeleteAssertion(), em);
                                }

//</editor-fold>
                                //<editor-fold defaultstate="collapsed" desc="New Data">
                                if (rec.getChangeRecordNewData() != null) {

                                        //The operationalInfo element MUST contain the operational information associated with the indicated new data.
                                        if (rec.getChangeRecordNewData().getOperationalInfo() == null) {
                                                logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored");
                                        } else {
                                                if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {
                                                        throw new Exception("Inbound replication data is missiong node id! Change will not be applied");
                                                }
                                                if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equalsIgnoreCase(getNode())) {
                                                        logger.warn("Inbound replication data is modifying locally owned data. This is not allowed, except for custody transfer");
                                                }
                                                if (rec.getChangeRecordNewData().getBindingTemplate() != null) {
                                                        //fetch the binding template if it exists already
                                                        //if it exists, 
                                                        //      confirm the owning node, it shouldn't be the local node id, if it is, throw
                                                        //      the owning node should be the same as it was before

                                                        BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBindingTemplate().getServiceKey());
                                                        if (model == null) {
                                                                logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");
                                                        } else {
                                                                validateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());

                                                                org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewData().getBindingTemplate().getBindingKey());
                                                                if (bt != null) {
                                                                        //ValidateNodeIdMatches(node, bt.getNodeId());
                                                                        em.remove(bt);
                                                                }
                                                                bt = new BindingTemplate();
                                                                MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewData().getBindingTemplate(), bt, model);
                                                                MappingApiToModel.mapOperationalInfo(bt, rec.getChangeRecordNewData().getOperationalInfo());
                                                                // MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
                                                                em.persist(bt);
                                                        }

                                                } else if (rec.getChangeRecordNewData().getBusinessEntity() != null) {

                                                        BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey());
                                                        if (model != null) {
                                                                //if the owner of the new data is me, and the update didn't originate from me
                                                                if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
                                                                        && !model.getNodeId().equals(getNode())) {
                                                                        if (model.getIsTransferInProgress()) {
                                                                                //allow the transfer
                                                                                MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
                                                                                MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
                                                                                MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
                                                                                model.setIsTransferInProgress(false);
                                                                                em.merge(model);
                                                                        } else {
                                                                                //block it, unexpected transfer
                                                                                throw new Exception("Unexpected entity transfer to to node " + getNode() + " from " + rec.getChangeID().getNodeID());
                                                                        }

                                                                } else if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
                                                                        && model.getNodeId().equals(getNode())) {
                                                                        //if destination is here and it's staying here, then this is strange also
                                                                        //someone else updated one of my records
                                                                        throw new Exception("unexpected modification of records that this server owns, " + model.getEntityKey());
                                                                } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
                                                                        && model.getNodeId().equals(getNode())) {
                                                                        //this is also strange, destination is elsewhere however it's owned by me.
                                                                        throw new Exception("unexpected transfer from this node to elsewhere, possible that the key in question exists at two places prior to replication sync, " + model.getEntityKey());

                                                                } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
                                                                        && !model.getNodeId().equals(getNode())) {
                                                                        //changes on a remote node, for an existing item
                                                                        MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
                                                                        MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
                                                                        em.merge(model);

                                                                }

                                                        } else {
                                                                model = new BusinessEntity();
                                                                MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
                                                                MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
                                                                em.persist(model);
                                                        }
                                                }
                                                if (rec.getChangeRecordNewData().getBusinessService() != null) {
                                                        BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessService().getBusinessKey());
                                                        if (find == null) {
                                                                logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");
                                                        } else {

                                                                org.apache.juddi.model.BusinessService model = null;
                                                                model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBusinessService().getServiceKey());
                                                                if (model != null) {
                                                                        validateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
                                                                        em.remove(model);
                                                                }

                                                                model = new org.apache.juddi.model.BusinessService();
                                                                MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(), model, find);
                                                                MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
                                                                MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());

                                                                em.persist(model);
                                                        }

                                                } else if (rec.getChangeRecordNewData().getTModel() != null) {

                                                        Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewData().getTModel().getTModelKey());
                                                        if (model != null) {
                                                                //in the case of a transfer
                                                                //if the new entity is being transfer to ME, accept and i didn't previously own it, but only if the local record is flagged as transferable
                                                                //meaning, only accept if i'm expecting a transfer
                                                                if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
                                                                        && !model.getNodeId().equals(getNode())) {
                                                                        if (model.getIsTransferInProgress()) {
                                                                                //allow the transfer
                                                                                em.remove(model);
                                                                                model = new Tmodel();
                                                                                MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
                                                                                MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
                                                                                model.setIsTransferInProgress(false);
                                                                                em.persist(model);
                                                                        } else {
                                                                                //block it, unexpected transfer
                                                                                throw new Exception("Unexpected entity transfer to this node from " + rec.getChangeID().getNodeID());
                                                                        }

                                                                } else if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
                                                                        && model.getNodeId().equals(getNode())) {
                                                                        //if destination is here and it's staying here, then this is strange also
                                                                        //someone else updated one of my records
                                                                        throw new Exception("unexpected modification of records that this server owns, " + model.getEntityKey());
                                                                } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
                                                                        && model.getNodeId().equals(getNode())) {
                                                                        //this is also strange, destination is elsewhere however it's owned by me.
                                                                        throw new Exception("unexpected transfer from this node to elsewhere, possible that the key in question exists at two places prior to replication sync, " + model.getEntityKey());

                                                                } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())
                                                                        && !model.getNodeId().equals(getNode())) {
                                                                        //changes on a remote node, for an existing item
                                                                        em.remove(model);
                                                                        model = new Tmodel();
                                                                        MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);

                                                                        MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());

                                                                        em.persist(model);

                                                                }
                                                        } else {
                                                                model = new Tmodel();
                                                                MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);

                                                                MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());

                                                                em.persist(model);
                                                        }
                                                }

                                        }

                                }
//</editor-fold>

                                // changeRecordNull no action needed
                                // changeRecordHide tmodel only
                                //<editor-fold defaultstate="collapsed" desc="hide tmodel">
                                if (rec.getChangeRecordHide() != null) {
                                        /*
                                         A changeRecordHide element corresponds to the behavior of hiding a tModel described in the delete_tModel in the Publish API section of this Specification.  A tModel listed in a changeRecordHide should be marked as hidden, so that it is not returned in response to a find_tModel API call.
                                        
                                         The changeRecordHide MUST contain a modified timestamp to allow multi-node registries to calculate consistent modifiedIncludingChildren timestamps as described in Section 3.8 operationalInfo Structure.
                                         */
                                        String key = rec.getChangeRecordHide().getTModelKey();
                                        org.apache.juddi.model.Tmodel existing = em.find(org.apache.juddi.model.Tmodel.class, key);
                                        if (existing == null) {
                                                logger.error("Unexpected delete/hide tmodel message received for non existing key " + key);
                                        } else {
                                                //no one else can delete/hide my tmodel
                                                validateNodeIdMisMatches(existing, getNode());
                                                existing.setDeleted(true);
                                                existing.setModified(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
                                                existing.setModifiedIncludingChildren(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
                                                em.persist(existing);
                                        }
                                }
//</editor-fold>

                                //<editor-fold defaultstate="collapsed" desc="changeRecordPublisherAssertion">
                                if (rec.getChangeRecordPublisherAssertion() != null) {

                                        logger.info("Repl CR Publisher Assertion");
                                        //TODO are publisher assertions owned by a given node?
                                        PublisherAssertionId paid = new PublisherAssertionId(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getFromKey(), rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getToKey());
                                        org.apache.juddi.model.PublisherAssertion model = em.find(org.apache.juddi.model.PublisherAssertion.class, paid);
                                        if (model != null) {
                                                logger.info("Repl CR Publisher Assertion - Existing");

                                                if (rec.getChangeRecordPublisherAssertion().isFromBusinessCheck()) {
                                                        model.setFromCheck("true");
                                                } else {
                                                        model.setFromCheck("false");
                                                }

                                                if (rec.getChangeRecordPublisherAssertion().isToBusinessCheck()) {
                                                        model.setToCheck("true");
                                                } else {
                                                        model.setToCheck("false");
                                                }

                                                model.setKeyName(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getKeyName());
                                                model.setKeyValue(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getKeyValue());
                                                model.setTmodelKey(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getTModelKey());
                                                model.setModified(rec.getChangeRecordPublisherAssertion().getModified().toGregorianCalendar().getTime());
                                                //model.setSignatures(MappingApiToModel.mapApiSignaturesToModelSignatures(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getSignature()));
                                                if ("false".equalsIgnoreCase(model.getFromCheck())
                                                        && "false".equalsIgnoreCase(model.getToCheck())) {
                                                        logger.warn("!!!New publisher assertion is both false and false, strange. no need to save it then!");
                                                        em.remove(model);
                                                }
                                                em.merge(model);
                                        } else {
                                                logger.info("Repl CR Publisher Assertion - new PA");

                                                model = new PublisherAssertion();
                                                MappingApiToModel.mapPublisherAssertion(rec.getChangeRecordPublisherAssertion().getPublisherAssertion(), model);
                                                model.setBusinessEntityByFromKey(null);
                                                model.setBusinessEntityByToKey(null);
                                                model.setBusinessEntityByFromKey(em.find(BusinessEntity.class, rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getFromKey()));
                                                model.setBusinessEntityByToKey(em.find(BusinessEntity.class, rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getToKey()));

                                                if (rec.getChangeRecordPublisherAssertion().isFromBusinessCheck()) {
                                                        model.setFromCheck("true");
                                                } else {
                                                        model.setFromCheck("false");
                                                }

                                                if (rec.getChangeRecordPublisherAssertion().isToBusinessCheck()) {
                                                        model.setToCheck("true");
                                                } else {
                                                        model.setToCheck("false");
                                                }
                                                model.setModified(rec.getChangeRecordPublisherAssertion().getModified().toGregorianCalendar().getTime());
                                                em.persist(model);
                                        }
                                }
//</editor-fold>

                                if (rec.isAcknowledgementRequested()) {
                                        ChangeRecord posack = new ChangeRecord();
                                        posack.setChangeRecordAcknowledgement(new ChangeRecordAcknowledgement());
                                        posack.getChangeRecordAcknowledgement().setAcknowledgedChange(rec.getChangeID());
                                        posack.setAcknowledgementRequested(false);
                                        ReplicationNotifier.enqueue(MappingApiToModel.mapChangeRecord(posack));
                                }
                                if (rec.getChangeRecordNewDataConditional() != null) {

                                        if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {
                                                throw new Exception("Inbound replication data is missiong node id!");
                                        }

                                        //The operationalInfo element MUST contain the operational information associated with the indicated new data.
                                        if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo() == null) {
                                                logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored");
                                        } else {
                                                if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate() != null) {
                                                        //fetch the binding template if it exists already
                                                        //if it exists, 
                                                        //      confirm the owning node, it shouldn't be the local node id, if it is, throw
                                                        //      the owning node should be the same as it was before

                                                        BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate().getServiceKey());
                                                        if (model == null) {
                                                                logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");
                                                        } else {

                                                                org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate().getBindingKey());
                                                                if (bt != null) {
                                                                        validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), bt.getNodeId());

                                                                        em.remove(bt);
                                                                }
                                                                bt = new BindingTemplate();
                                                                MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate(), bt, model);
                                                                MappingApiToModel.mapOperationalInfo(bt, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
                                                                // MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
                                                                em.persist(bt);
                                                        }

                                                } else if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity() != null) {

                                                        BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity().getBusinessKey());
                                                        if (model != null) {
                                                                validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
                                                                //TODO revisit access control rules
                                                                em.remove(model);
                                                        }
                                                        model = new BusinessEntity();
                                                        MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity(), model);
                                                        // MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());

                                                        MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
                                                        logger.warn("Name size on save is " + model.getBusinessNames().size());
                                                        em.persist(model);

                                                }
                                                if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService() != null) {
                                                        BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService().getBusinessKey());
                                                        if (find == null) {
                                                                logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");
                                                        } else {

                                                                org.apache.juddi.model.BusinessService model = null;
                                                                model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService().getServiceKey());
                                                                if (model != null) {
                                                                        validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
                                                                        em.remove(model);
                                                                }

                                                                model = new org.apache.juddi.model.BusinessService();
                                                                MappingApiToModel.mapBusinessService(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService(), model, find);
                                                                MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
                                                                MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());

                                                                em.persist(model);
                                                        }

                                                } else if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel() != null) {

                                                        Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel().getTModelKey());
                                                        if (model != null) {
                                                                validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
                                                                em.remove(model);
                                                        }
                                                        model = new Tmodel();
                                                        MappingApiToModel.mapTModel(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel(), model);

                                                        MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());

                                                        em.persist(model);
                                                }

                                        }

                                }
                                if (rec.getChangeRecordNull() != null) {
                                        //No action required

                                }
                                if (rec.getChangeRecordCorrection() != null) {
                                        //TODO implement

                                }
                                if (rec.getChangeRecordConditionFailed() != null) {
                                        //TODO implement

                                }
                                tx.commit();

                        } catch (Exception drfm) {

                                logger.warn("Error applying the change record! ", drfm);
                                StringWriter sw = new StringWriter();
                                JAXB.marshal(rec, sw);
                                logger.warn("This is the record that failed to persist: " + sw.toString());
                                if (tx.isActive()) {
                                        tx.rollback();
                                }
                                if (mapChangeRecord != null) {
                                        //set the change record's isApplied to false
                                        try {
                                                tx = em.getTransaction();
                                                tx.begin();
                                                mapChangeRecord.setIsAppliedLocally(false);
                                                em.merge(mapChangeRecord);
                                                tx.commit();
                                        } catch (Exception e) {
                                                logger.error("error updating change record!!", e);
                                                if (tx.isActive()) {
                                                        tx.rollback();
                                                }
                                        }
                                } else {
                                        logger.fatal("whoa! change record is null when saving a remote change record, this is unexpected and should be reported");
                                }
                        } finally {
                                if (tx.isActive()) {
                                        tx.rollback();
                                }
                                em.close();
                        }
                }

                private HighWaterMarkVectorType getLastChangeRecordFrom(String sourcenode) {
                        HighWaterMarkVectorType ret = new HighWaterMarkVectorType();
                        ChangeRecordIDType cid = new ChangeRecordIDType();
                        cid.setNodeID(sourcenode);
                        cid.setOriginatingUSN(0L);
                        EntityManager em = PersistenceManager.getEntityManager();
                        EntityTransaction tx = em.getTransaction();
                        try {
                                tx.begin();
                                //Long id = 0L;
                                try {
                                        cid.setOriginatingUSN((Long) em.createQuery("select MAX(e.originatingUSN) from ChangeRecord e where e.nodeID = :node")
                                                .setParameter("node", sourcenode)
                                                .getSingleResult());
                                } catch (Exception ex) {
                                        logger.info("unexpected error searching for last record from " + sourcenode, ex);
                                }

                                tx.rollback();

                        } catch (Exception drfm) {
                                logger.warn("error caught fetching newest record from node " + sourcenode, drfm);
                        } finally {
                                if (tx.isActive()) {
                                        tx.rollback();
                                }
                                em.close();
                        }
                        logger.info("Highest known record for " + sourcenode + " is " + cid.getOriginatingUSN());
                        ret.getHighWaterMark().add(cid);

                        return ret;
                }

                private void enqueueAllReceivingNodes() {
                        if (queue == null) {
                                queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>();
                        }
                        //get the replication config
                        //get everyone we are expecting to receive data from, then enqueue them for pulling
                        ReplicationConfiguration repcfg = ReplicationNotifier.FetchEdges();
                        if (repcfg == null) {
                                return;
                        }
                        Set<String> allnodes = new HashSet<String>();
                        for (int i = 0; i < repcfg.getOperator().size(); i++) {
                                allnodes.add(repcfg.getOperator().get(i).getOperatorNodeID());
                        }
                        Set<String> receivers = new HashSet<String>();
                        if (repcfg.getCommunicationGraph() == null
                                || repcfg.getCommunicationGraph().getEdge().isEmpty()) {
                                //no edges or graph defined, default to the operator list
                                for (org.uddi.repl_v3.Operator o : repcfg.getOperator()) {
                                        //no need to tell myself about a change at myself
                                        if (!o.getOperatorNodeID().equalsIgnoreCase(getNode())) {
                                                receivers.add(o.getOperatorNodeID());
                                        }
                                }
                        } else {
                                //repcfg.getCommunicationGraph()
                                Iterator<Edge> iterator = repcfg.getCommunicationGraph().getEdge().iterator();
                                while (iterator.hasNext()) {
                                        Edge next = iterator.next();

                                        if (next.getMessageReceiver().equalsIgnoreCase(getNode())) {
                                                receivers.add(next.getMessageSender());
                                        }

                                }

                        }
                        for (String s : receivers) {
                                //this is a list of nodes that this node is expecting updates from
                                //here are we ticking the notification engine to ping the remove service for updates
                                for (String nodeping : allnodes) {
                                        queue.add(new NotifyChangeRecordsAvailable(s, getLastChangeRecordFrom(nodeping)));
                                        //for each node we are expecting data from, go fetch it, along the way, we'll request all data for all nodes
                                        //that we know about
                                }

                        }
                }

        }

        /**
         * used to check for alterations on *this node's data from another node,
         * which isn't allowed
         *
         * @param ue
         * @param node
         * @throws Exception
         */
        private static void validateNodeIdMisMatches(UddiEntity ue, String node) throws Exception {
                if (ue == null) {
                        return;//object doesn't exist
                }
                if (ue.getNodeId().equals(node)) {
                        throw new Exception("Alert! attempt to alter locally owned entity " + ue.getEntityKey() + " owned by " + ue.getAuthorizedName() + "@" + ue.getNodeId());
                }
        }

        /**
         * use to validate that changed data maintained ownership, except for
         * business entities and tmodels since they allow transfer
         *
         * @param newNodeId
         * @param currentOwningNode
         * @throws Exception
         */
        private  void validateNodeIdMatches(String newNodeId, String currentOwningNode) throws Exception {
                if (newNodeId == null || currentOwningNode == null) {
                        throw new Exception("either the local node ID is null or the inbound replication data's node id is null");
                }
                //only time this is allowed is custody transfer
                if (!newNodeId.equals(currentOwningNode)) {
                        logger.info("AUDIT, custody transfer from node, " + currentOwningNode + " to " + newNodeId + " current node is " + getNode());
                        //throw new Exception("node id mismatch!");
                }

                //if i already have a record and "own it" and the remote node has a record with the same key, reject the update
                //1.5.8 
                /**
                 * Each node has custody of a portion of the aggregate data
                 * managed by the registry of which it is a part. Each datum is
                 * by definition in the custody of exactly one such node. A
                 * datum in this context can be a businessEntity, a
                 * businessService, a bindingTemplate, a tModel, or a
                 * publisherAssertion. Changes to a datum in the registry MUST
                 * originate at the node which is the custodian of the datum.
                 * The registry defines the policy for data custody and, if
                 * allowed, the custodian node for a given datum can be changed;
                 * such custody transfer processes are discussed in Section 5.4
                 * Custody and Ownership Transfer API.
                 */
                //so someone else attempted to update one of my records, reject it
                if (newNodeId.equals(getNode())) {
                        //throw new Exception("node id mismatch! this node already has a record for key " + newDataOperationalInfo.getEntityKey() + " and I'm the authority for it.");
                }
        }

        private synchronized UDDIReplicationPortType getReplicationClient(String node) {
                if (cache.containsKey(node)) {
                        return cache.get(node);
                }
                UDDIService svc = new UDDIService();
                UDDIReplicationPortType replicationClient = svc.getUDDIReplicationPort();
                TransportSecurityHelper.applyTransportSecurity((BindingProvider) replicationClient);

                EntityManager em = PersistenceManager.getEntityManager();
                EntityTransaction tx = em.getTransaction();
                try {
                        tx.begin();
                        StringBuilder sql = new StringBuilder();
                        sql.append("select c from ReplicationConfiguration c order by c.serialNumber desc");
                        //sql.toString();
                        Query qry = em.createQuery(sql.toString());
                        qry.setMaxResults(1);

                        org.apache.juddi.model.ReplicationConfiguration resultList = (org.apache.juddi.model.ReplicationConfiguration) qry.getSingleResult();
                        for (Operator o : resultList.getOperator()) {
                                if (o.getOperatorNodeID().equalsIgnoreCase(node)) {
                                        ((BindingProvider) replicationClient).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, o.getSoapReplicationURL());
                                        cache.put(node, replicationClient);
                                        return replicationClient;
                                }
                        }
                        tx.rollback();

                } catch (Exception ex) {
                        logger.fatal("Node not found!" + node, ex);
                } finally {
                        if (tx.isActive()) {
                                tx.rollback();
                        }
                        em.close();
                }
                //em.close();
                return null;

        }
        private Map<String, UDDIReplicationPortType> cache = new HashMap<String, UDDIReplicationPortType>();

        /**
         * @since 3.3
         * @param body
         * @return
         * @throws DispositionReportFaultMessage
         */
        public String doPing(DoPing body) throws DispositionReportFaultMessage {
                long startTime = System.currentTimeMillis();
                long procTime = System.currentTimeMillis() - startTime;
                serviceCounter.update(ReplicationQuery.DO_PING, QueryStatus.SUCCESS, procTime);

                return getNode();

        }

        @SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE)
        @WebResult(name = "changeRecords", targetNamespace = "urn:uddi-org:repl_v3", partName = "body")
        // @WebMethod(operationName = "get_changeRecords", action = "get_changeRecords")
        @Override
        public org.uddi.repl_v3.ChangeRecords getChangeRecords(
                @WebParam(partName = "body", name = "get_changeRecords", targetNamespace = "urn:uddi-org:repl_v3") org.uddi.repl_v3.GetChangeRecords body
        ) throws DispositionReportFaultMessage, RemoteException {
                long startTime = System.currentTimeMillis();
                String requestingNode = body.getRequestingNode();
                HighWaterMarkVectorType changesAlreadySeen = body.getChangesAlreadySeen();
                BigInteger responseLimitCount = body.getResponseLimitCount();
                HighWaterMarkVectorType responseLimitVector = body.getResponseLimitVector();

                new ValidateReplication(null).validateGetChangeRecords(requestingNode, changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx);

                //TODO should we validate that "requestingNode" is in the replication config?
                List<ChangeRecord> ret = new ArrayList<ChangeRecord>();
                EntityManager em = PersistenceManager.getEntityManager();
                EntityTransaction tx = em.getTransaction();

                /**
                 * More specifically, the recipient determines the particular
                 * change records that are returned by comparing the originating
                 * USNs in the caller’s high water mark vector with the
                 * originating USNs of each of the changes the recipient has
                 * seen from others or generated by itself. The recipient SHOULD
                 * only return change records that have originating USNs that
                 * are greater than those listed in the changesAlreadySeen
                 * highWaterMarkVector and less than the limit required by
                 * either the responseLimitCount or the responseLimitVector.
                 *
                 *
                 * Part of the message is a high water mark vector that contains
                 * for each node of the registry the originating USN of the most
                 * recent change record that has been successfully processed by
                 * the invocating node
                 */
                try {
                        int maxrecords = AppConfig.getConfiguration().getInt(Property.JUDDI_REPLICATION_GET_CHANGE_RECORDS_MAX, 100);
                        if (responseLimitCount != null) {
                                maxrecords = responseLimitCount.intValue();
                        }
                        tx.begin();
                        Long firstrecord = 0L;
                        Long lastrecord = null;
                        Query createQuery = null;
//SELECT t0.id, t0.change_contents, t0.entity_key, t0.appliedlocal, t0.node_id, t0.orginating_usn, t0.record_type FROM j3_chg_record t0 WHERE (t0.id > NULL AND t0.node_id = ?) ORDER BY t0.id ASC                        
                        if (changesAlreadySeen != null) {
                                //this is basically a lower limit (i.e. the newest record that was processed by the requestor
                                //therefore we want the oldest record stored locally to return to the requestor for processing
                                for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) {
                                        firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN();
                                        if (firstrecord == null) {
                                                firstrecord = 0L;
                                        }
                                        if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(getNode())) {
                                                //special case, search by database id
                                                createQuery = em.createQuery("select e from ChangeRecord e where "
                                                        + "(e.id > :inbound AND e.nodeID = :node) "
                                                        + "order by e.id ASC");

                                        } else {
                                                createQuery = em.createQuery("select e from ChangeRecord e where "
                                                        + "e.originatingUSN > :inbound AND e.nodeID = :node "
                                                        + "order by e.originatingUSN ASC");
                                        }
                                        logger.info("Query db for replication changes, lower index is " + (firstrecord) + " last index " + lastrecord + " record limit " + maxrecords);
                                        logger.info("This node is" + getNode() + ", request is for data originated from " + changesAlreadySeen.getHighWaterMark().get(i).getNodeID() + " and it's being sent back to " + requestingNode);

                                        createQuery.setMaxResults(maxrecords);
                                        createQuery.setParameter("inbound", firstrecord);
                                        createQuery.setParameter("node", changesAlreadySeen.getHighWaterMark().get(i).getNodeID());
                                        List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList();
                                        logger.info(records.size() + " CR records returned from query");
                                        for (int x = 0; x < records.size(); x++) {
                                                ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(x));
                                                //if (!Excluded(changesAlreadySeen, r)) {
                                                ret.add(r);
                                                //}

                                        }
                                }
                        } /*if (responseLimitVector != null) {
                         //using responseLimitVector, indicating for each node in the graph the first change originating there that he does not wish to be returned.
                         //upper limit basically
                         for (int i = 0; i < responseLimitVector.getHighWaterMark().size(); i++) {
                         //if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) {
                         lastrecord = responseLimitVector.getHighWaterMark().get(i).getOriginatingUSN();
                         //}
                         }
                         }*/ else {
                                if (firstrecord == null) {
                                        firstrecord = 0L;
                                }
                                //assume that they just want records that originated from here?
                                logger.info("Query db for replication changes, lower index is " + (firstrecord) + " last index " + lastrecord + " record limit " + maxrecords);
                                logger.info("This node is" + getNode() + " requesting node " + requestingNode);

                                if (lastrecord != null) {
                                        createQuery = em.createQuery("select e from ChangeRecord e where "
                                                + "(e.id > :inbound AND e.nodeID = :node AND e.id < :lastrecord) "
                                                + "order by e.id ASC");
                                        createQuery.setParameter("lastrecord", lastrecord);
                                } else {
                                        createQuery = em.createQuery("select e from ChangeRecord e where "
                                                + "(e.id > :inbound AND e.nodeID = :node) "
                                                + "order by e.id ASC");
                                }
                                createQuery.setMaxResults(maxrecords);
                                createQuery.setParameter("inbound", firstrecord);
                                createQuery.setParameter("node", getNode());

                                List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList();
                                logger.info(records.size() + " CR records returned from query");
                                for (int i = 0; i < records.size(); i++) {
                                        ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(i));
                                        //if (!Excluded(changesAlreadySeen, r)) {
                                        ret.add(r);
                                        //}

                                }
                        }
                        tx.rollback();
                        long procTime = System.currentTimeMillis() - startTime;
                        serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS,
                                QueryStatus.SUCCESS, procTime);

                } catch (Exception ex) {
                        logger.fatal("Error, this node is: " + getNode(), ex);
                        throw new FatalErrorException(new ErrorMessage("E_fatalError", ex.getMessage()));

                } finally {
                        if (tx.isActive()) {
                                tx.rollback();
                        }
                        em.close();
                }
                logger.info("Change records returned for " + requestingNode + ": " + ret.size());
                //JAXB.marshal(ret, System.out);
                ChangeRecords x = new ChangeRecords();
                x.getChangeRecord().addAll(ret);
                //JAXB.marshal(x, System.out);
                return x;
        }

        /**
         * This UDDI API message provides a means to obtain a list of
         * highWaterMark element containing the highest known USN for all nodes
         * in the replication graph. If there is no graph, we just return the
         * local bits
         *
         * @return
         * @throws DispositionReportFaultMessage
         */
        @Override
        public List<ChangeRecordIDType> getHighWaterMarks()
                throws DispositionReportFaultMessage {
                long startTime = System.currentTimeMillis();

                List<ChangeRecordIDType> ret = new ArrayList<ChangeRecordIDType>();

                //fetch from database the highest known watermark
                ReplicationConfiguration FetchEdges = FetchEdges();

                EntityManager em = PersistenceManager.getEntityManager();
                EntityTransaction tx = em.getTransaction();
                HashMap<String, Long> map = new HashMap<String, Long>();
                try {
                        tx.begin();
                        if (FetchEdges != null) {
                                Iterator<String> it = FetchEdges.getCommunicationGraph().getNode().iterator();
                                while (it.hasNext()) {
                                        String nextNode = it.next();
                                        if (!nextNode.equals(getNode())) {
                                                if (!map.containsKey(nextNode)) {
                                                        Long id = 0L;
                                                        try {
                                                                id = (Long) em.createQuery("select e.originatingUSN from ChangeRecord e where e.nodeID = :node order by e.originatingUSN desc").setParameter("node", nextNode).setMaxResults(1).getSingleResult();
                                                        } catch (Exception ex) {
                                                                logger.debug(ex);
                                                        }
                                                        if (id == null) {
                                                                id = 0L;
                                                                //per the spec
                                                        }
                                                        map.put(nextNode, id);

                                                }
                                        }
                                }
                        }
                        //dont forget this node
                        Long id = (Long) em.createQuery("select (e.id) from ChangeRecord e where e.nodeID = :node  order by e.id desc")
                                .setParameter("node", getNode()).setMaxResults(1).getSingleResult();
                        if (id == null) {
                                id = 0L;
                        }
                        ChangeRecordIDType x = new ChangeRecordIDType();
                        x.setNodeID(getNode());
                        x.setOriginatingUSN(id);
                        ret.add(x);

                        tx.rollback();
                        long procTime = System.currentTimeMillis() - startTime;
                        serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, QueryStatus.SUCCESS, procTime);

                } catch (Exception drfm) {
                        throw new FatalErrorException(new ErrorMessage("E_fatalError", drfm.getMessage()));

                } finally {
                        if (tx.isActive()) {
                                tx.rollback();
                        }
                        em.close();
                }

                Iterator<Map.Entry<String, Long>> iterator = map.entrySet().iterator();
                while (iterator.hasNext()) {
                        Map.Entry<String, Long> next = iterator.next();
                        ret.add(new ChangeRecordIDType(next.getKey(), next.getValue()));
                }
                return ret;
        }

        /**
         * this means that another node has a change and we need to pick up the
         * change and apply it to our local database.
         *
         * @param body
         * @throws DispositionReportFaultMessage
         */
        @Override
        public void notifyChangeRecordsAvailable(NotifyChangeRecordsAvailable body)
                throws DispositionReportFaultMessage {
                long startTime = System.currentTimeMillis();

                //some other node just told us there's new records available, call
                //getChangeRecords from the remote node asynch
                new ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx);

                logger.info(body.getNotifyingNode() + " just told me that there are change records available, enqueuing...size is " + queue.size() + " this node is " + getNode());
                //if (!queue.contains(body.getNotifyingNode())) {
                queue.add(body);
                //}
                long procTime = System.currentTimeMillis() - startTime;
                serviceCounter.update(ReplicationQuery.NOTIFY_CHANGERECORDSAVAILABLE,
                        QueryStatus.SUCCESS, procTime);
        }
        private static Queue<NotifyChangeRecordsAvailable> queue = null;

        /**
         * transfers custody of an entity from node1/user1 to node2/user2
         *
         * assume this node is node 2.
         *
         * user1 on node1 requests a transfer token. node 1 issues the token.
         *
         * user1 now has a transfer token for their stuff user now takes the
         * token to node 2 and calls transferEntities
         * <img src="http://www.uddi.org/pubs/uddi-v3.0.2-20041019_files/image086.gif">
         *
         * @param body
         * @throws DispositionReportFaultMessage
         */
        @Override
        public void transferCustody(TransferCustody body)
                throws DispositionReportFaultMessage {
                long startTime = System.currentTimeMillis();
                EntityManager em = PersistenceManager.getEntityManager();
                EntityTransaction tx = em.getTransaction();
                logger.info("Inbound transfer request (via replication api, node to node");
                try {
                        tx.begin();
                //*this node is transfering data to another node
                        //ValidateReplication.unsupportedAPICall();
                        //a remote node just told me to give up control of some of my entities

                        //EntityTransaction tx = em.getTransaction();
                        //confirm i have a replication config
                        boolean ok = false;
                        ReplicationConfiguration FetchEdges = ReplicationNotifier.FetchEdges();
                        if (FetchEdges != null) {
                                for (int i = 0; i < FetchEdges.getOperator().size(); i++) {
                                        //confirm that the destination node is in the replication config
                                        if (FetchEdges.getOperator().get(i).getOperatorNodeID().equals(body.getTransferOperationalInfo().getNodeID())) {
                                                ok = true;
                                                break;
                                        }
                                }
                        }
                        if (!ok) {
                                throw new TransferNotAllowedException(new ErrorMessage("E_transferNotAllowedUnknownNode"));
                        }

                        new ValidateReplication(null).validateTransfer(em, body);

                        TransferEntities te = new TransferEntities();
                        te.setKeyBag(body.getKeyBag());
                        te.setTransferToken(body.getTransferToken());
                        te.setAuthInfo(null);
                        //make the change
                        //enqueue in replication notifier
                        //discard the token
                        logger.debug("request validated, processing transfer");
                        List<ChangeRecord> executeTransfer = new UDDICustodyTransferImpl().executeTransfer(te, em, body.getTransferOperationalInfo().getAuthorizedName(), body.getTransferOperationalInfo().getNodeID());

                        for (ChangeRecord c : executeTransfer) {
                                try {
                                        c.setChangeID(new ChangeRecordIDType());
                                        c.getChangeID().setNodeID(getNode());
                                        c.getChangeID().setOriginatingUSN(null);
                                        ReplicationNotifier.enqueue(MappingApiToModel.mapChangeRecord(c));
                                } catch (UnsupportedEncodingException ex) {
                                        logger.error("", ex);
                                }
                        }
                        /**
                         * The custodial node must verify that it has granted
                         * permission to transfer the entities identified and
                         * that this permission is still valid. This operation
                         * is comprised of two steps:
                         *
                         * 1. Verification that the transferToken was issued by
                         * it, that it has not expired, that it represents the
                         * authority to transfer no more and no less than those
                         * entities identified by the businessKey and tModelKey
                         * elements and that all these entities are still valid
                         * and not yet transferred. The transferToken is
                         * invalidated if any of these conditions are not met.
                         *
                         * 2. If the conditions above are met, the custodial
                         * node will prevent any further changes to the entities
                         * identified by the businessKey and tModelKey elements
                         * identified. The entity will remain in this state
                         * until the replication stream indicates it has been
                         * successfully processed via the replication stream.
                         * Upon successful verification of the custody transfer
                         * request by the custodial node, an empty message is
                         * returned by it indicating the success of the request
                         * and acknowledging the custody transfer. Following the
                         * issue of the empty message, the custodial node will
                         * submit into the replication stream a
                         * changeRecordNewData providing in the operationalInfo,
                         * the nodeID accepting custody of the datum and the
                         * authorizedName of the publisher accepting ownership.
                         * The acknowledgmentRequested attribute of this change
                         * record MUST be set to "true".
                         *
                         *
                         *
                         * Finally, the custodial node invalidates the
                         * transferToken in order to prevent additional calls of
                         * the transfer_entities API.
                         */
                        tx.commit();
                        long procTime = System.currentTimeMillis() - startTime;
                        serviceCounter.update(ReplicationQuery.TRANSFER_CUSTODY,
                                QueryStatus.SUCCESS, procTime);
                } catch (DispositionReportFaultMessage d) {
                        logger.error("Unable to process node to node custody transfer ", d);
                        throw d;
                } finally {
                        if (em != null && em.isOpen()) {
                                em.close();
                        }
                        if (tx.isActive()) {
                                tx.rollback();
                        }
                }
        }

}
