blob: 44f7f7d6dc610d1a61ee010fd4f844db2bf87571 [file] [log] [blame]
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*******************************************************************************/
package org.apache.ofbiz.entityext.synchronization;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.ofbiz.base.util.Debug;
import org.apache.ofbiz.base.util.UtilDateTime;
import org.apache.ofbiz.base.util.UtilMisc;
import org.apache.ofbiz.base.util.UtilValidate;
import org.apache.ofbiz.entity.Delegator;
import org.apache.ofbiz.entity.DelegatorFactory;
import org.apache.ofbiz.entity.GenericEntity;
import org.apache.ofbiz.entity.GenericEntityException;
import org.apache.ofbiz.entity.GenericValue;
import org.apache.ofbiz.entity.condition.EntityCondition;
import org.apache.ofbiz.entity.condition.EntityOperator;
import org.apache.ofbiz.entity.model.ModelEntity;
import org.apache.ofbiz.entity.serialize.SerializeException;
import org.apache.ofbiz.entity.serialize.XmlSerializer;
import org.apache.ofbiz.entity.transaction.GenericTransactionException;
import org.apache.ofbiz.entity.transaction.TransactionUtil;
import org.apache.ofbiz.entity.util.EntityListIterator;
import org.apache.ofbiz.entity.util.EntityQuery;
import org.apache.ofbiz.entityext.EntityGroupUtil;
import org.apache.ofbiz.service.DispatchContext;
import org.apache.ofbiz.service.GeneralServiceException;
import org.apache.ofbiz.service.GenericServiceException;
import org.apache.ofbiz.service.LocalDispatcher;
import org.apache.ofbiz.service.ModelService;
import org.apache.ofbiz.service.ServiceUtil;
import org.xml.sax.SAXException;
/**
* Entity Engine Sync Services
*/
public class EntitySyncContext {
public static final String module = EntitySyncContext.class.getName();
// set default split to 10 seconds, ie try not to get too much data moving over at once
public static final long defaultSyncSplitMillis = 10000;
// default offline split is 30 minutes
public static final long defaultOfflineSyncSplitMillis = 1800000;
// default to 5 minutes
public static final long defaultSyncEndBufferMillis = 300000;
// default to 2 hours, 120m, 7200s
public static final long defaultMaxRunningNoUpdateMillis = 7200000;
public Delegator delegator;
public LocalDispatcher dispatcher;
public Map<String, ? extends Object> context;
public GenericValue userLogin;
public boolean isOfflineSync = false;
public String entitySyncId;
public GenericValue entitySync;
public String targetServiceName;
public String targetDelegatorName;
public Timestamp syncEndStamp;
public long offlineSyncSplitMillis = defaultOfflineSyncSplitMillis;
public long syncSplitMillis = defaultSyncSplitMillis;
public long syncEndBufferMillis = defaultSyncEndBufferMillis;
public long maxRunningNoUpdateMillis = defaultMaxRunningNoUpdateMillis;
public Timestamp lastSuccessfulSynchTime;
public List<ModelEntity> entityModelToUseList;
public Set<String> entityNameToUseSet;
public Timestamp currentRunStartTime;
public Timestamp currentRunEndTime;
// these values are used to make this more efficient; if we run into an entity that has 0
//results for a given time block, we will do a query to find the next create/update/remove
//time for that entity, and also keep track of a global next with the lowest future next value;
//using these we can skip a lot of queries and speed this up significantly
public Map<String, Timestamp> nextEntityCreateTxTime = new HashMap<String, Timestamp>();
public Map<String, Timestamp> nextEntityUpdateTxTime = new HashMap<String, Timestamp>();
public Timestamp nextCreateTxTime = null;
public Timestamp nextUpdateTxTime = null;
public Timestamp nextRemoveTxTime = null;
// this is the other part of the history PK, leave null until we create the history object
public Timestamp startDate = null;
long toCreateInserted = 0;
long toCreateUpdated = 0;
long toCreateNotUpdated = 0;
long toStoreInserted = 0;
long toStoreUpdated = 0;
long toStoreNotUpdated = 0;
long toRemoveDeleted = 0;
long toRemoveAlreadyDeleted = 0;
long totalRowsExported = 0;
long totalRowsToCreate = 0;
long totalRowsToStore = 0;
long totalRowsToRemove = 0;
long totalRowsPerSplit = 0;
long totalStoreCalls = 0;
long totalSplits = 0;
long perSplitMinMillis = Long.MAX_VALUE;
long perSplitMaxMillis = 0;
long perSplitMinItems = Long.MAX_VALUE;
long perSplitMaxItems = 0;
long splitStartTime = 0;
public EntitySyncContext(DispatchContext dctx, Map<String, ? extends Object> context) throws SyncDataErrorException, SyncAbortException {
this.context = context;
this.dispatcher = dctx.getDispatcher();
this.delegator = dctx.getDelegator();
// what to do with the delegatorName? this is the delegatorName to use in this service...
String delegatorName = (String) context.get("delegatorName");
if (UtilValidate.isNotEmpty(delegatorName)) {
this.delegator = DelegatorFactory.getDelegator(delegatorName);
}
this.userLogin = (GenericValue) context.get("userLogin");
this.entitySyncId = (String) context.get("entitySyncId");
Debug.logInfo("Creating EntitySyncContext with entitySyncId=" + entitySyncId, module);
boolean beganTransaction = false;
try {
beganTransaction = TransactionUtil.begin(7200);
} catch (GenericTransactionException e) {
throw new SyncDataErrorException("Unable to begin JTA transaction", e);
}
try {
this.entitySync = EntityQuery.use(delegator).from("EntitySync").where("entitySyncId", this.entitySyncId).queryOne();
if (this.entitySync == null) {
throw new SyncAbortException("Not running EntitySync [" + entitySyncId + "], no record found with that ID.");
}
targetServiceName = entitySync.getString("targetServiceName");
targetDelegatorName = entitySync.getString("targetDelegatorName");
// make the last time to sync X minutes before the current time so that if this machines clock is up to that amount of time
//ahead of another machine writing to the DB it will still work fine and not lose any data
syncEndStamp = new Timestamp(System.currentTimeMillis() - syncEndBufferMillis);
this.offlineSyncSplitMillis = getOfflineSyncSplitMillis(entitySync);
this.syncSplitMillis = getSyncSplitMillis(entitySync);
this.syncEndBufferMillis = getSyncEndBufferMillis(entitySync);
this.maxRunningNoUpdateMillis = getMaxRunningNoUpdateMillis(entitySync);
this.lastSuccessfulSynchTime = entitySync.getTimestamp("lastSuccessfulSynchTime");
this.entityModelToUseList = this.makeEntityModelToUseList();
this.entityNameToUseSet = this.makeEntityNameToUseSet();
// set start and end times for the first/current pass
this.currentRunStartTime = getCurrentRunStartTime(lastSuccessfulSynchTime, entityModelToUseList, delegator);
this.setCurrentRunEndTime();
// this is mostly for the pull side... will always be null for at the beginning of a push process, to be filled in later
this.startDate = (Timestamp) context.get("startDate");
} catch (GenericEntityException e) {
try {
TransactionUtil.rollback(beganTransaction, "Entity Engine error while getting Entity Sync init information", e);
} catch (GenericTransactionException e2) {
Debug.logWarning(e2, "Unable to call rollback()", module);
}
throw new SyncDataErrorException("Error initializing EntitySync Context", e);
}
try {
TransactionUtil.commit(beganTransaction);
} catch (GenericTransactionException e) {
throw new SyncDataErrorException("Unable to commit transaction", e);
}
}
/**
* To see if it is running check:
* - in the running status
* - AND when the entitySync was last updated, and if it was more than maxRunningNoUpdateMillis ago, then don't consider it to be running
* @return boolean representing if the EntitySync should be considered running
*/
public boolean isEntitySyncRunning() {
boolean isInRunning = ("ESR_RUNNING".equals(this.entitySync.getString("runStatusId")) ||
"ESR_PENDING".equals(this.entitySync.getString("runStatusId")));
if (!isInRunning) {
return false;
}
Timestamp esLastUpdated = this.entitySync.getTimestamp(ModelEntity.STAMP_FIELD);
if (esLastUpdated == null) {
// shouldn't ever happen, but just in case; assume is running if we don't know when it was last updated
return true;
}
long esLastUpdatedMillis = esLastUpdated.getTime();
long nowTimestampMillis = UtilDateTime.nowTimestamp().getTime();
long timeSinceUpdated = nowTimestampMillis - esLastUpdatedMillis;
if (timeSinceUpdated > this.maxRunningNoUpdateMillis) {
// it has been longer than the maxRunningNoUpdateMillis, so don't consider it running
return false;
}
return true;
}
public boolean hasMoreTimeToSync() {
return currentRunStartTime.before(syncEndStamp);
}
protected void setCurrentRunEndTime() {
this.currentRunEndTime = getNextRunEndTime();
}
protected Timestamp getNextRunEndTime() {
long syncSplit = this.isOfflineSync ? offlineSyncSplitMillis : syncSplitMillis;
Timestamp nextRunEndTime = new Timestamp(this.currentRunStartTime.getTime() + syncSplit);
if (nextRunEndTime.after(this.syncEndStamp)) {
nextRunEndTime = this.syncEndStamp;
}
return nextRunEndTime;
}
public void advanceRunTimes() {
this.currentRunStartTime = this.currentRunEndTime;
this.setCurrentRunEndTime();
}
public void setSplitStartTime() {
this.splitStartTime = System.currentTimeMillis();
}
protected static long getSyncSplitMillis(GenericValue entitySync) {
long splitMillis = defaultSyncSplitMillis;
Long syncSplitMillis = entitySync.getLong("syncSplitMillis");
if (syncSplitMillis != null) {
splitMillis = syncSplitMillis.longValue();
}
return splitMillis;
}
protected static long getOfflineSyncSplitMillis(GenericValue entitySync) {
long splitMillis = defaultOfflineSyncSplitMillis;
Long syncSplitMillis = entitySync.getLong("offlineSyncSplitMillis");
if (syncSplitMillis != null) {
splitMillis = syncSplitMillis.longValue();
}
return splitMillis;
}
protected static long getSyncEndBufferMillis(GenericValue entitySync) {
long syncEndBufferMillis = defaultSyncEndBufferMillis;
Long syncEndBufferMillisLong = entitySync.getLong("syncEndBufferMillis");
if (syncEndBufferMillisLong != null) {
syncEndBufferMillis = syncEndBufferMillisLong.longValue();
}
return syncEndBufferMillis;
}
protected static long getMaxRunningNoUpdateMillis(GenericValue entitySync) {
long maxRunningNoUpdateMillis = defaultMaxRunningNoUpdateMillis;
Long maxRunningNoUpdateMillisLong = entitySync.getLong("maxRunningNoUpdateMillis");
if (maxRunningNoUpdateMillisLong != null) {
maxRunningNoUpdateMillis = maxRunningNoUpdateMillisLong.longValue();
}
return maxRunningNoUpdateMillis;
}
/** create history record, target service should run in own tx */
public void createInitialHistory() throws SyncDataErrorException, SyncServiceErrorException {
String errorMsg = "Not running EntitySync [" + entitySyncId + "], could not create EntitySyncHistory";
try {
Map<String, Object> initialHistoryRes = dispatcher.runSync("createEntitySyncHistory", UtilMisc.toMap("entitySyncId", entitySyncId, "runStatusId", "ESR_RUNNING", "beginningSynchTime", this.currentRunStartTime, "lastCandidateEndTime", this.currentRunEndTime, "userLogin", userLogin));
if (ServiceUtil.isError(initialHistoryRes)) {
throw new SyncDataErrorException(errorMsg, null, null, initialHistoryRes, null);
}
this.startDate = (Timestamp) initialHistoryRes.get("startDate");
} catch (GenericServiceException e) {
throw new SyncServiceErrorException(errorMsg, e);
}
}
public ArrayList<GenericValue> assembleValuesToCreate() throws SyncDataErrorException {
// first grab all values inserted in the date range, then get the updates (leaving out all values inserted in the data range)
ArrayList<GenericValue> valuesToCreate = new ArrayList<GenericValue>(); // make it an ArrayList to easily merge in sorted lists
if (this.nextCreateTxTime != null && (this.nextCreateTxTime.equals(currentRunEndTime) || this.nextCreateTxTime.after(currentRunEndTime))) {
// this means that for all entities in this pack we found on the last pass that there would be nothing for this one, so just return nothing...
return valuesToCreate;
}
//Debug.logInfo("Getting values to create; currentRunStartTime=" + currentRunStartTime + ", currentRunEndTime=" + currentRunEndTime, module);
int entitiesSkippedForKnownNext = 0;
// iterate through entities, get all records with tx stamp in the current time range, put all in a single list
for (ModelEntity modelEntity: entityModelToUseList) {
int insertBefore = 0;
// first test to see if we know that there are no records for this entity in this time period...
Timestamp knownNextCreateTime = this.nextEntityCreateTxTime.get(modelEntity.getEntityName());
if (knownNextCreateTime != null && (knownNextCreateTime.equals(currentRunEndTime) || knownNextCreateTime.after(currentRunEndTime))) {
//Debug.logInfo("In assembleValuesToCreate found knownNextCreateTime [" + knownNextCreateTime + "] after currentRunEndTime [" + currentRunEndTime + "], so skipping time per period for entity [" + modelEntity.getEntityName() + "]", module);
entitiesSkippedForKnownNext++;
continue;
}
boolean beganTransaction = false;
try {
beganTransaction = TransactionUtil.begin(7200);
} catch (GenericTransactionException e) {
throw new SyncDataErrorException("Unable to begin JTA transaction", e);
}
try {
// get the values created within the current time range
EntityCondition findValCondition = EntityCondition.makeCondition(
EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.GREATER_THAN_EQUAL_TO, currentRunStartTime),
EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.LESS_THAN, currentRunEndTime));
EntityListIterator eli = EntityQuery.use(delegator)
.from(modelEntity.getEntityName())
.where(findValCondition)
.orderBy(ModelEntity.CREATE_STAMP_TX_FIELD, ModelEntity.CREATE_STAMP_FIELD)
.queryIterator();
GenericValue nextValue = null;
long valuesPerEntity = 0;
while ((nextValue = eli.next()) != null) {
// sort by the tx stamp and then the record stamp
// find first value in valuesToCreate list, starting with the current insertBefore value, that has a CREATE_STAMP_TX_FIELD after the nextValue.CREATE_STAMP_TX_FIELD, then do the same with CREATE_STAMP_FIELD
while (insertBefore < valuesToCreate.size() && valuesToCreate.get(insertBefore).getTimestamp(ModelEntity.CREATE_STAMP_TX_FIELD).before(nextValue.getTimestamp(ModelEntity.CREATE_STAMP_TX_FIELD))) {
insertBefore++;
}
while (insertBefore < valuesToCreate.size() && valuesToCreate.get(insertBefore).getTimestamp(ModelEntity.CREATE_STAMP_FIELD).before(nextValue.getTimestamp(ModelEntity.CREATE_STAMP_FIELD))) {
insertBefore++;
}
valuesToCreate.add(insertBefore, nextValue);
valuesPerEntity++;
}
eli.close();
// definately remove this message and related data gathering
//long preCount = delegator.findCountByCondition(modelEntity.getEntityName(), findValCondition, null);
//long entityTotalCount = delegator.findCountByCondition(modelEntity.getEntityName(), null, null);
//if (entityTotalCount > 0 || preCount > 0 || valuesPerEntity > 0) Debug.logInfo("Got " + valuesPerEntity + "/" + preCount + "/" + entityTotalCount + " values for entity " + modelEntity.getEntityName(), module);
// if we didn't find anything for this entity, find the next value's Timestamp and keep track of it
if (valuesPerEntity == 0) {
Timestamp startCheckStamp = new Timestamp(System.currentTimeMillis() - syncEndBufferMillis);
EntityCondition findNextCondition = EntityCondition.makeCondition(
EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.NOT_EQUAL, null),
EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.GREATER_THAN_EQUAL_TO, currentRunEndTime));
EntityListIterator eliNext = EntityQuery.use(delegator)
.from(modelEntity.getEntityName())
.where(findNextCondition)
.orderBy(ModelEntity.CREATE_STAMP_TX_FIELD)
.queryIterator();
// get the first element and it's tx time value...
GenericValue firstVal = eliNext.next();
eliNext.close();
Timestamp nextTxTime;
if (firstVal != null) {
nextTxTime = firstVal.getTimestamp(ModelEntity.CREATE_STAMP_TX_FIELD);
} else {
// no results? well, then it's safe to say that up to the pre-querytime (minus the buffer, as usual) we are okay
nextTxTime = startCheckStamp;
}
if (this.nextCreateTxTime == null || nextTxTime.before(this.nextCreateTxTime)) {
this.nextCreateTxTime = nextTxTime;
Debug.logInfo("EntitySync: Set nextCreateTxTime to [" + nextTxTime + "]", module);
}
Timestamp curEntityNextTxTime = this.nextEntityCreateTxTime.get(modelEntity.getEntityName());
if (curEntityNextTxTime == null || nextTxTime.before(curEntityNextTxTime)) {
this.nextEntityCreateTxTime.put(modelEntity.getEntityName(), nextTxTime);
Debug.logInfo("EntitySync: Set nextEntityCreateTxTime to [" + nextTxTime + "] for the entity [" + modelEntity.getEntityName() + "]", module);
}
}
} catch (GenericEntityException e) {
try {
TransactionUtil.rollback(beganTransaction, "Entity Engine error in assembleValuesToCreate", e);
} catch (GenericTransactionException e2) {
Debug.logWarning(e2, "Unable to call rollback()", module);
}
throw new SyncDataErrorException("Error getting values to create from the datasource", e);
} catch (Throwable t) {
try {
TransactionUtil.rollback(beganTransaction, "Throwable error in assembleValuesToCreate", t);
} catch (GenericTransactionException e2) {
Debug.logWarning(e2, "Unable to call rollback()", module);
}
throw new SyncDataErrorException("Caught runtime error while getting values to create", t);
}
try {
TransactionUtil.commit(beganTransaction);
} catch (GenericTransactionException e) {
throw new SyncDataErrorException("Commit transaction failed", e);
}
}
if (entitiesSkippedForKnownNext > 0) {
if (Debug.infoOn()) Debug.logInfo("In assembleValuesToCreate skipped [" + entitiesSkippedForKnownNext + "/" + entityModelToUseList + "] entities for the time period ending at [" + currentRunEndTime + "] because of next known create times", module);
}
// TEST SECTION: leave false for normal use
boolean logValues = false;
if (logValues && valuesToCreate.size() > 0) {
StringBuilder toCreateInfo = new StringBuilder();
for (GenericValue valueToCreate: valuesToCreate) {
toCreateInfo.append("\n-->[");
toCreateInfo.append(valueToCreate.get(ModelEntity.CREATE_STAMP_TX_FIELD));
toCreateInfo.append(":");
toCreateInfo.append(valueToCreate.get(ModelEntity.CREATE_STAMP_FIELD));
toCreateInfo.append("] ");
toCreateInfo.append(valueToCreate.getPrimaryKey());
}
Debug.logInfo(toCreateInfo.toString(), module);
}
// As the this.nextCreateTxTime calculation is only based on entities without values to create, if there at least one value to create returned
// this calculation is false, so it needs to be nullified
if (valuesToCreate.size() > 0) {
this.nextCreateTxTime = null;
}
return valuesToCreate;
}
public ArrayList<GenericValue> assembleValuesToStore() throws SyncDataErrorException {
// simulate two ordered lists and merge them on-the-fly for faster combined sorting
ArrayList<GenericValue> valuesToStore = new ArrayList<GenericValue>(); // make it an ArrayList to easily merge in sorted lists
if (this.nextUpdateTxTime != null && (this.nextUpdateTxTime.equals(currentRunEndTime) || this.nextUpdateTxTime.after(currentRunEndTime))) {
// this means that for all entities in this pack we found on the last pass that there would be nothing for this one, so just return nothing...
return valuesToStore;
}
// Debug.logInfo("Getting values to store; currentRunStartTime=" + currentRunStartTime + ", currentRunEndTime=" + currentRunEndTime, module);
int entitiesSkippedForKnownNext = 0;
// iterate through entities, get all records with tx stamp in the current time range, put all in a single list
for (ModelEntity modelEntity: entityModelToUseList) {
int insertBefore = 0;
// first test to see if we know that there are no records for this entity in this time period...
Timestamp knownNextUpdateTime = this.nextEntityUpdateTxTime.get(modelEntity.getEntityName());
if (knownNextUpdateTime != null && (knownNextUpdateTime.equals(currentRunEndTime) || knownNextUpdateTime.after(currentRunEndTime))) {
entitiesSkippedForKnownNext++;
continue;
}
boolean beganTransaction = false;
try {
beganTransaction = TransactionUtil.begin(7200);
} catch (GenericTransactionException e) {
throw new SyncDataErrorException("Unable to begin JTA transaction", e);
}
try {
// get all values that were updated, but NOT created in the current time range; if no info on created stamp, that's okay we'll include it here because it won't have been included in the valuesToCreate list
EntityCondition createdBeforeStartCond = EntityCondition.makeCondition(
EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.EQUALS, null),
EntityOperator.OR,
EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.LESS_THAN, currentRunStartTime));
EntityCondition findValCondition = EntityCondition.makeCondition(
EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.GREATER_THAN_EQUAL_TO, currentRunStartTime),
EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.LESS_THAN, currentRunEndTime),
createdBeforeStartCond);
EntityListIterator eli = EntityQuery.use(delegator)
.from(modelEntity.getEntityName())
.where(findValCondition)
.orderBy(ModelEntity.STAMP_TX_FIELD, ModelEntity.STAMP_FIELD)
.queryIterator();
GenericValue nextValue = null;
long valuesPerEntity = 0;
while ((nextValue = eli.next()) != null) {
// sort by the tx stamp and then the record stamp
// find first value in valuesToStore list, starting with the current insertBefore value, that has a STAMP_TX_FIELD after the nextValue.STAMP_TX_FIELD, then do the same with STAMP_FIELD
while (insertBefore < valuesToStore.size() && valuesToStore.get(insertBefore).getTimestamp(ModelEntity.STAMP_TX_FIELD).before(nextValue.getTimestamp(ModelEntity.STAMP_TX_FIELD))) {
insertBefore++;
}
while (insertBefore < valuesToStore.size() && valuesToStore.get(insertBefore).getTimestamp(ModelEntity.STAMP_FIELD).before(nextValue.getTimestamp(ModelEntity.STAMP_FIELD))) {
insertBefore++;
}
valuesToStore.add(insertBefore, nextValue);
valuesPerEntity++;
}
eli.close();
// definately remove this message and related data gathering
//long preCount = delegator.findCountByCondition(modelEntity.getEntityName(), findValCondition, null);
//long entityTotalCount = delegator.findCountByCondition(modelEntity.getEntityName(), null, null);
//if (entityTotalCount > 0 || preCount > 0 || valuesPerEntity > 0) Debug.logInfo("Got " + valuesPerEntity + "/" + preCount + "/" + entityTotalCount + " values for entity " + modelEntity.getEntityName(), module);
// if we didn't find anything for this entity, find the next value's Timestamp and keep track of it
if (valuesPerEntity == 0) {
Timestamp startCheckStamp = new Timestamp(System.currentTimeMillis() - syncEndBufferMillis);
EntityCondition findNextCondition = EntityCondition.makeCondition(
EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.NOT_EQUAL, null),
EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.GREATER_THAN_EQUAL_TO, currentRunEndTime),
EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.NOT_EQUAL, null),
EntityCondition.makeCondition(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.LESS_THAN, currentRunEndTime));
EntityListIterator eliNext = EntityQuery.use(delegator)
.from(modelEntity.getEntityName())
.where(findNextCondition)
.orderBy(ModelEntity.STAMP_TX_FIELD)
.queryIterator();
// get the first element and it's tx time value...
GenericValue firstVal = eliNext.next();
eliNext.close();
Timestamp nextTxTime;
if (firstVal != null) {
nextTxTime = firstVal.getTimestamp(ModelEntity.CREATE_STAMP_TX_FIELD);
} else {
// no results? well, then it's safe to say that up to the pre-querytime (minus the buffer, as usual) we are okay
nextTxTime = startCheckStamp;
}
if (this.nextUpdateTxTime == null || nextTxTime.before(this.nextUpdateTxTime)) {
this.nextUpdateTxTime = nextTxTime;
Debug.logInfo("EntitySync: Set nextUpdateTxTime to [" + nextTxTime + "]", module);
}
Timestamp curEntityNextTxTime = this.nextEntityUpdateTxTime.get(modelEntity.getEntityName());
if (curEntityNextTxTime == null || nextTxTime.before(curEntityNextTxTime)) {
this.nextEntityUpdateTxTime.put(modelEntity.getEntityName(), nextTxTime);
Debug.logInfo("EntitySync: Set nextEntityUpdateTxTime to [" + nextTxTime + "] for the entity [" + modelEntity.getEntityName() + "]", module);
}
}
} catch (GenericEntityException e) {
try {
TransactionUtil.rollback(beganTransaction, "Entity Engine error in assembleValuesToStore", e);
} catch (GenericTransactionException e2) {
Debug.logWarning(e2, "Unable to call rollback()", module);
}
throw new SyncDataErrorException("Error getting values to store from the datasource", e);
} catch (Throwable t) {
try {
TransactionUtil.rollback(beganTransaction, "General error in assembleValuesToStore", t);
} catch (GenericTransactionException e2) {
Debug.logWarning(e2, "Unable to call rollback()", module);
}
throw new SyncDataErrorException("Caught runtime error while getting values to store", t);
}
try {
TransactionUtil.commit(beganTransaction);
} catch (GenericTransactionException e) {
throw new SyncDataErrorException("Commit transaction failed", e);
}
}
if (entitiesSkippedForKnownNext > 0) {
if (Debug.infoOn()) Debug.logInfo("In assembleValuesToStore skipped [" + entitiesSkippedForKnownNext + "/" + entityModelToUseList + "] entities for the time period ending at [" + currentRunEndTime + "] because of next known update times", module);
}
// TEST SECTION: leave false for normal use
boolean logValues = false;
if (logValues && valuesToStore.size() > 0) {
StringBuilder toStoreInfo = new StringBuilder();
for (GenericValue valueToStore: valuesToStore) {
toStoreInfo.append("\n-->[");
toStoreInfo.append(valueToStore.get(ModelEntity.STAMP_TX_FIELD));
toStoreInfo.append(":");
toStoreInfo.append(valueToStore.get(ModelEntity.STAMP_FIELD));
toStoreInfo.append("] ");
toStoreInfo.append(valueToStore.getPrimaryKey());
}
Debug.logInfo(toStoreInfo.toString(), module);
}
// As the this.nextUpdateTxTime calculation is only based on entities without values to store, if there at least one value to store returned
// this calculation is false, so it needs to be nullified
if (valuesToStore.size() > 0) {
this.nextUpdateTxTime = null;
}
return valuesToStore;
}
public LinkedList<GenericEntity> assembleKeysToRemove() throws SyncDataErrorException {
// get all removed items from the given time range, add to list for those
LinkedList<GenericEntity> keysToRemove = new LinkedList<GenericEntity>();
if (this.nextRemoveTxTime != null && (this.nextRemoveTxTime.equals(currentRunEndTime) || this.nextRemoveTxTime.after(currentRunEndTime))) {
// this means that for all entities in this pack we found on the last pass that there would be nothing for this one, so just return nothing...
return keysToRemove;
}
//Debug.logInfo("Getting keys to remove; currentRunStartTime=" + currentRunStartTime + ", currentRunEndTime=" + currentRunEndTime, module);
boolean beganTransaction = false;
try {
beganTransaction = TransactionUtil.begin(7200);
} catch (GenericTransactionException e) {
throw new SyncDataErrorException("Unable to begin JTA transaction", e);
}
try {
// find all instances of this entity with the STAMP_TX_FIELD != null, sort ascending to get lowest/oldest value first, then grab first and consider as candidate currentRunStartTime
EntityCondition findValCondition = EntityCondition.makeCondition(
EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.GREATER_THAN_EQUAL_TO, currentRunStartTime),
EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.LESS_THAN, currentRunEndTime));
EntityListIterator removeEli = EntityQuery.use(delegator)
.from("EntitySyncRemove")
.where(findValCondition)
.orderBy(ModelEntity.STAMP_TX_FIELD, ModelEntity.STAMP_FIELD)
.queryIterator();
GenericValue entitySyncRemove = null;
while ((entitySyncRemove = removeEli.next()) != null) {
// pull the PK from the EntitySyncRemove in the primaryKeyRemoved field, de-XML-serialize it
String primaryKeyRemoved = entitySyncRemove.getString("primaryKeyRemoved");
GenericEntity pkToRemove = null;
try {
pkToRemove = (GenericEntity) XmlSerializer.deserialize(primaryKeyRemoved, delegator);
} catch (IOException e) {
String errorMsg = "Error deserializing GenericPK to remove in Entity Sync Data for entitySyncId [" + entitySyncId + "] and entitySyncRemoveId [" + entitySyncRemove.getString("entitySyncRemoveId") + "]: " + e.toString();
Debug.logError(e, errorMsg, module);
throw new SyncDataErrorException(errorMsg, e);
} catch (SAXException e) {
String errorMsg = "Error deserializing GenericPK to remove in Entity Sync Data for entitySyncId [" + entitySyncId + "] and entitySyncRemoveId [" + entitySyncRemove.getString("entitySyncRemoveId") + "]: " + e.toString();
Debug.logError(e, errorMsg, module);
throw new SyncDataErrorException(errorMsg, e);
} catch (ParserConfigurationException e) {
String errorMsg = "Error deserializing GenericPK to remove in Entity Sync Data for entitySyncId [" + entitySyncId + "] and entitySyncRemoveId [" + entitySyncRemove.getString("entitySyncRemoveId") + "]: " + e.toString();
Debug.logError(e, errorMsg, module);
throw new SyncDataErrorException(errorMsg, e);
} catch (SerializeException e) {
String errorMsg = "Error deserializing GenericPK to remove in Entity Sync Data for entitySyncId [" + entitySyncId + "] and entitySyncRemoveId [" + entitySyncRemove.getString("entitySyncRemoveId") + "]: " + e.toString();
Debug.logError(e, errorMsg, module);
throw new SyncDataErrorException(errorMsg, e);
}
// set the stamp fields for future reference
pkToRemove.set(ModelEntity.STAMP_TX_FIELD, entitySyncRemove.get(ModelEntity.STAMP_TX_FIELD));
pkToRemove.set(ModelEntity.STAMP_FIELD, entitySyncRemove.get(ModelEntity.STAMP_FIELD));
pkToRemove.set(ModelEntity.CREATE_STAMP_TX_FIELD, entitySyncRemove.get(ModelEntity.CREATE_STAMP_TX_FIELD));
pkToRemove.set(ModelEntity.CREATE_STAMP_FIELD, entitySyncRemove.get(ModelEntity.CREATE_STAMP_FIELD));
if (this.entityNameToUseSet.contains(pkToRemove.getEntityName())) {
keysToRemove.add(pkToRemove);
}
}
removeEli.close();
// if we didn't find anything for this entity, find the next value's Timestamp and keep track of it
if (keysToRemove.size() == 0) {
EntityCondition findNextCondition = EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.GREATER_THAN_EQUAL_TO, currentRunEndTime);
EntityListIterator eliNext = EntityQuery.use(delegator)
.from("EntitySyncRemove")
.where(findNextCondition)
.orderBy(ModelEntity.STAMP_TX_FIELD)
.queryIterator();
// get the first element and it's tx time value...
GenericValue firstVal = eliNext.next();
eliNext.close();
if (firstVal != null) {
Timestamp nextTxTime = firstVal.getTimestamp(ModelEntity.STAMP_TX_FIELD);
if (this.nextRemoveTxTime == null || nextTxTime.before(this.nextRemoveTxTime)) {
this.nextRemoveTxTime = nextTxTime;
}
}
}
} catch (GenericEntityException e) {
try {
TransactionUtil.rollback(beganTransaction, "Entity Engine error in assembleKeysToRemove", e);
} catch (GenericTransactionException e2) {
Debug.logWarning(e2, "Unable to call rollback()", module);
}
throw new SyncDataErrorException("Error getting keys to remove from the datasource", e);
} catch (Throwable t) {
try {
TransactionUtil.rollback(beganTransaction, "General error in assembleKeysToRemove", t);
} catch (GenericTransactionException e2) {
Debug.logWarning(e2, "Unable to call rollback()", module);
}
throw new SyncDataErrorException("Caught runtime error while getting keys to remove", t);
}
try {
TransactionUtil.commit(beganTransaction);
} catch (GenericTransactionException e) {
throw new SyncDataErrorException("Commit transaction failed", e);
}
// TEST SECTION: leave false for normal use
boolean logValues = false;
if (logValues && keysToRemove.size() > 0) {
StringBuilder toRemoveInfo = new StringBuilder();
for (GenericEntity keyToRemove: keysToRemove) {
toRemoveInfo.append("\n-->[");
toRemoveInfo.append(keyToRemove.get(ModelEntity.STAMP_TX_FIELD));
toRemoveInfo.append(":");
toRemoveInfo.append(keyToRemove.get(ModelEntity.STAMP_FIELD));
toRemoveInfo.append("] ");
toRemoveInfo.append(keyToRemove);
}
Debug.logInfo(toRemoveInfo.toString(), module);
}
// As this.nextRemoveTxTime calculation is only based on entities without keys to remove, if there at least one key to remove returned
// this calculation is false, so it needs to be nullified
if (keysToRemove.size() > 0) {
this.nextRemoveTxTime = null;
}
return keysToRemove;
}
public void saveResultsReportedFromDataStore() throws SyncDataErrorException, SyncServiceErrorException {
try {
long runningTimeMillis = System.currentTimeMillis() - startDate.getTime();
// get the total for this split
long splitTotalTime = System.currentTimeMillis() - this.splitStartTime;
if (splitTotalTime < this.perSplitMinMillis) {
this.perSplitMinMillis = splitTotalTime;
}
if (splitTotalTime > this.perSplitMaxMillis) {
this.perSplitMaxMillis = splitTotalTime;
}
// start the timer for the next split
setSplitStartTime();
// total the rows saved so far, and gather some info about them before saving
this.totalRowsPerSplit = this.toCreateInserted + this.toCreateNotUpdated + this.toCreateUpdated +
this.toStoreInserted + this.toStoreNotUpdated + this.toStoreUpdated +
this.toRemoveAlreadyDeleted + this.toRemoveDeleted;
if (this.totalRowsPerSplit < this.perSplitMinItems) {
this.perSplitMinItems = this.totalRowsPerSplit;
}
if (this.totalRowsPerSplit > this.perSplitMaxItems) {
this.perSplitMaxItems = this.totalRowsPerSplit;
}
this.totalRowsToCreate += this.toCreateInserted + this.toCreateNotUpdated + this.toCreateUpdated;
this.totalRowsToStore += this.toStoreInserted + this.toStoreNotUpdated + this.toStoreUpdated;
this.totalRowsToRemove += this.toRemoveAlreadyDeleted + this.toRemoveDeleted;
// store latest result on EntitySync, ie update lastSuccessfulSynchTime, should run in own tx
Map<String, Object> updateEsRunResult = dispatcher.runSync("updateEntitySyncRunning", UtilMisc.toMap("entitySyncId", entitySyncId, "lastSuccessfulSynchTime", this.currentRunEndTime, "userLogin", userLogin));
// store result of service call on history with results so far, should run in own tx
Map<String, Object> updateHistoryMap = UtilMisc.toMap("entitySyncId", entitySyncId, "startDate", startDate,
"lastSuccessfulSynchTime", this.currentRunEndTime, "lastCandidateEndTime", this.getNextRunEndTime(),
"lastSplitStartTime", Long.valueOf(this.splitStartTime));
updateHistoryMap.put("toCreateInserted", Long.valueOf(toCreateInserted));
updateHistoryMap.put("toCreateUpdated", Long.valueOf(toCreateUpdated));
updateHistoryMap.put("toCreateNotUpdated", Long.valueOf(toCreateNotUpdated));
updateHistoryMap.put("toStoreInserted", Long.valueOf(toStoreInserted));
updateHistoryMap.put("toStoreUpdated", Long.valueOf(toStoreUpdated));
updateHistoryMap.put("toStoreNotUpdated", Long.valueOf(toStoreNotUpdated));
updateHistoryMap.put("toRemoveDeleted", Long.valueOf(toRemoveDeleted));
updateHistoryMap.put("toRemoveAlreadyDeleted", Long.valueOf(toRemoveAlreadyDeleted));
updateHistoryMap.put("runningTimeMillis", Long.valueOf(runningTimeMillis));
updateHistoryMap.put("totalStoreCalls", Long.valueOf(totalStoreCalls));
updateHistoryMap.put("totalSplits", Long.valueOf(totalSplits));
updateHistoryMap.put("totalRowsExported", Long.valueOf(totalRowsExported));
updateHistoryMap.put("totalRowsToCreate", Long.valueOf(totalRowsToCreate));
updateHistoryMap.put("totalRowsToStore", Long.valueOf(totalRowsToStore));
updateHistoryMap.put("totalRowsToRemove", Long.valueOf(totalRowsToRemove));
updateHistoryMap.put("perSplitMinMillis", Long.valueOf(perSplitMinMillis));
updateHistoryMap.put("perSplitMaxMillis", Long.valueOf(perSplitMaxMillis));
updateHistoryMap.put("perSplitMinItems", Long.valueOf(perSplitMinItems));
updateHistoryMap.put("perSplitMaxItems", Long.valueOf(perSplitMaxItems));
updateHistoryMap.put("userLogin", userLogin);
Map<String, Object> updateEsHistRunResult = dispatcher.runSync("updateEntitySyncHistory", updateHistoryMap);
// now we have updated EntitySync and EntitySyncHistory, check both ops for errors...
if (ServiceUtil.isError(updateEsRunResult)) {
String errorMsg = "Error running EntitySync [" + entitySyncId + "], update of EntitySync record with lastSuccessfulSynchTime failed.";
throw new SyncDataErrorException(errorMsg, null, null, updateEsRunResult, null);
}
if (ServiceUtil.isError(updateEsHistRunResult)) {
String errorMsg = "Error running EntitySync [" + entitySyncId + "], update of EntitySyncHistory (startDate:[" + startDate + "]) record with lastSuccessfulSynchTime and result stats failed.";
throw new SyncDataErrorException(errorMsg, null, null, updateEsHistRunResult, null);
}
} catch (GenericServiceException e) {
throw new SyncServiceErrorException("Error saving results reported from data store", e);
}
}
public void saveFinalSyncResults() throws SyncDataErrorException, SyncServiceErrorException {
String newStatusId = "ESR_COMPLETE";
if (this.isOfflineSync && totalRowsExported > 0) {
newStatusId = "ESR_PENDING";
}
// the lastSuccessfulSynchTime on EntitySync will already be set, so just set status as completed
String esErrMsg = "Could not mark Entity Sync as complete, but all synchronization was successful";
try {
Map<String, Object> completeEntitySyncRes = dispatcher.runSync("updateEntitySyncRunning", UtilMisc.toMap("entitySyncId", entitySyncId, "runStatusId", newStatusId, "userLogin", userLogin));
if (ServiceUtil.isError(completeEntitySyncRes)) {
// what to do here? try again?
throw new SyncDataErrorException(esErrMsg, null, null, completeEntitySyncRes, null);
}
} catch (GenericServiceException e) {
throw new SyncServiceErrorException(esErrMsg, e);
}
// if nothing moved over, remove the history record, otherwise store status
long totalRows = totalRowsToCreate + totalRowsToStore + totalRowsToRemove;
if (totalRows == 0) {
String eshRemoveErrMsg = "Could not remove Entity Sync History (done because nothing was synced in this call), but all synchronization was successful";
try {
Map<String, Object> deleteEntitySyncHistRes = dispatcher.runSync("deleteEntitySyncHistory", UtilMisc.toMap("entitySyncId", entitySyncId, "startDate", startDate, "userLogin", userLogin));
if (ServiceUtil.isError(deleteEntitySyncHistRes)) {
throw new SyncDataErrorException(eshRemoveErrMsg, null, null, deleteEntitySyncHistRes, null);
}
} catch (GenericServiceException e) {
throw new SyncServiceErrorException(eshRemoveErrMsg, e);
}
} else {
// the lastSuccessfulSynchTime on EntitySync will already be set, so just set status as completed
String eshCompleteErrMsg = "Could not mark Entity Sync History as complete, but all synchronization was successful";
try {
Map<String, Object> completeEntitySyncHistRes = dispatcher.runSync("updateEntitySyncHistory", UtilMisc.toMap("entitySyncId", entitySyncId, "startDate", startDate, "runStatusId", "ESR_COMPLETE", "userLogin", userLogin));
if (ServiceUtil.isError(completeEntitySyncHistRes)) {
// what to do here? try again?
throw new SyncDataErrorException(eshCompleteErrMsg, null, null, completeEntitySyncHistRes, null);
}
} catch (GenericServiceException e) {
throw new SyncServiceErrorException(eshCompleteErrMsg, e);
}
}
if (Debug.infoOn()) Debug.logInfo("Finished saveFinalSyncResults [" + entitySyncId + "]: totalRows=" + totalRows + ", totalRowsToCreate=" + totalRowsToCreate + ", totalRowsToStore=" + totalRowsToStore + ", totalRowsToRemove=" + totalRowsToRemove, module);
}
public Set<String> makeEntityNameToUseSet() {
Set<String> entityNameToUseSet = new HashSet<String>();
for (ModelEntity modelEntity: this.entityModelToUseList) {
entityNameToUseSet.add(modelEntity.getEntityName());
}
return entityNameToUseSet;
}
/** prepare a list of all entities we want to synchronize: remove all view-entities and all entities that don't match the patterns attached to this EntitySync */
protected List<ModelEntity> makeEntityModelToUseList() throws GenericEntityException {
List<GenericValue> entitySyncIncludes = entitySync.getRelated("EntitySyncInclude", null, null, false);
// get these ones as well, and just add them to the main list, it will have an extra field but that shouldn't hurt anything in the code below
List<GenericValue> entitySyncGroupIncludes = entitySync.getRelated("EntitySyncInclGrpDetailView", null, null, false);
entitySyncIncludes.addAll(entitySyncGroupIncludes);
List<ModelEntity> entityModelToUseList = EntityGroupUtil.getModelEntitiesFromRecords(entitySyncIncludes, delegator, true);
if (Debug.infoOn()) Debug.logInfo("In makeEntityModelToUseList for EntitySync with ID [" + entitySync.get("entitySyncId") + "] syncing " + entityModelToUseList.size() + " entities", module);
return entityModelToUseList;
}
protected static Timestamp getCurrentRunStartTime(Timestamp lastSuccessfulSynchTime, List<ModelEntity> entityModelToUseList, Delegator delegator) throws GenericEntityException {
// if currentRunStartTime is null, what to do? I guess iterate through all entities and find earliest tx stamp
if (lastSuccessfulSynchTime == null) {
Timestamp currentRunStartTime = null;
for (ModelEntity modelEntity: entityModelToUseList) {
// fields to select will be PK and the STAMP_TX_FIELD, slimmed down so we don't get a ton of data back
Set<String> fieldsToSelect = UtilMisc.toSet(modelEntity.getPkFieldNames());
// find all instances of this entity with the STAMP_TX_FIELD != null, sort ascending to get lowest/oldest value first, then grab first and consider as candidate currentRunStartTime
fieldsToSelect.add(ModelEntity.STAMP_TX_FIELD);
EntityListIterator eli = EntityQuery.use(delegator)
.select(fieldsToSelect)
.from(modelEntity.getEntityName())
.where(EntityCondition.makeCondition(ModelEntity.STAMP_TX_FIELD, EntityOperator.NOT_EQUAL, null))
.orderBy(ModelEntity.STAMP_TX_FIELD)
.queryIterator();
GenericValue nextValue = eli.next();
eli.close();
if (nextValue != null) {
Timestamp candidateTime = nextValue.getTimestamp(ModelEntity.STAMP_TX_FIELD);
if (currentRunStartTime == null || candidateTime.before(currentRunStartTime)) {
currentRunStartTime = candidateTime;
}
}
}
if (Debug.infoOn()) Debug.logInfo("No currentRunStartTime was stored on the EntitySync record, so searched for the earliest value and got: " + currentRunStartTime, module);
return currentRunStartTime;
} else {
return lastSuccessfulSynchTime;
}
}
public void saveSyncErrorInfo(String runStatusId, List<Object> errorMessages) {
// set error statuses on the EntitySync and EntitySyncHistory entities
try {
Map<String, Object> errorEntitySyncRes = dispatcher.runSync("updateEntitySyncRunning", UtilMisc.toMap("entitySyncId", entitySyncId, "runStatusId", runStatusId, "userLogin", userLogin));
if (ServiceUtil.isError(errorEntitySyncRes)) {
errorMessages.add("Could not save error run status [" + runStatusId + "] on EntitySync with ID [" + entitySyncId + "]: " + errorEntitySyncRes.get(ModelService.ERROR_MESSAGE));
}
} catch (GenericServiceException e) {
errorMessages.add("Could not save error run status [" + runStatusId + "] on EntitySync with ID [" + entitySyncId + "]: " + e.toString());
}
if (startDate != null) {
try {
Map<String, Object> errorEntitySyncHistoryRes = dispatcher.runSync("updateEntitySyncHistory", UtilMisc.toMap("entitySyncId", entitySyncId, "startDate", startDate, "runStatusId", runStatusId, "userLogin", userLogin));
if (ServiceUtil.isError(errorEntitySyncHistoryRes)) {
errorMessages.add("Could not save error run status [" + runStatusId + "] on EntitySyncHistory with ID [" + entitySyncId + "]: " + errorEntitySyncHistoryRes.get(ModelService.ERROR_MESSAGE));
}
} catch (GenericServiceException e) {
errorMessages.add("Could not save error run status [" + runStatusId + "] on EntitySyncHistory with ID [" + entitySyncId + ":" + startDate + "]: " + e.toString());
}
}
}
// ======================== PUSH Methods ========================
public void runPushStartRunning() throws SyncDataErrorException, SyncServiceErrorException, SyncAbortException {
if (UtilValidate.isEmpty(targetServiceName)) {
throw new SyncAbortException("Not running EntitySync [" + entitySyncId + "], no targetServiceName is specified, where do we send the data?");
}
// check to see if this sync is already running, if so return error
if (this.isEntitySyncRunning()) {
throw new SyncAbortException("Not running EntitySync [" + entitySyncId + "], an instance is already running.");
}
String markErrorMsg = "Could not start Entity Sync service, could not mark as running";
try {
// not running, get started NOW
// set running status on entity sync, run in its own tx
Map<String, Object> startEntitySyncRes = dispatcher.runSync("updateEntitySyncRunning", UtilMisc.toMap("entitySyncId", entitySyncId, "runStatusId", "ESR_RUNNING", "userLogin", userLogin));
if (ModelService.RESPOND_ERROR.equals(startEntitySyncRes.get(ModelService.RESPONSE_MESSAGE))) {
throw new SyncDataErrorException(markErrorMsg, null, null, startEntitySyncRes, null);
}
} catch (GenericServiceException e) {
throw new SyncServiceErrorException(markErrorMsg, e);
}
// finally create the initial history record
this.createInitialHistory();
}
public long setTotalRowCounts(ArrayList<GenericValue> valuesToCreate, ArrayList<GenericValue> valuesToStore, List<GenericEntity> keysToRemove) {
this.totalRowsToCreate = valuesToCreate.size();
this.totalRowsToStore = valuesToStore.size();
this.totalRowsToRemove = keysToRemove.size();
this.totalRowsPerSplit = this.totalRowsToCreate + this.totalRowsToStore + this.totalRowsToRemove;
return this.totalRowsPerSplit;
}
public void runPushSendData(ArrayList<GenericValue> valuesToCreate, ArrayList<GenericValue> valuesToStore, List<GenericEntity> keysToRemove) throws SyncOtherErrorException, SyncServiceErrorException {
// grab the totals for this data
this.setTotalRowCounts(valuesToCreate, valuesToStore, keysToRemove);
// call service named on EntitySync, IFF there is actually data to send over
if (this.totalRowsPerSplit > 0) {
Map<String, Object> targetServiceMap = UtilMisc.toMap("entitySyncId", entitySyncId, "valuesToCreate", valuesToCreate, "valuesToStore", valuesToStore, "keysToRemove", keysToRemove, "userLogin", userLogin);
if (UtilValidate.isNotEmpty(targetDelegatorName)) {
targetServiceMap.put("delegatorName", targetDelegatorName);
}
String serviceErrorMsg = "Error running EntitySync [" + entitySyncId + "], call to store service [" + targetServiceName + "] failed.";
try {
Map<String, Object> remoteStoreResult = dispatcher.runSync(targetServiceName, targetServiceMap);
if (ServiceUtil.isError(remoteStoreResult)) {
throw new SyncOtherErrorException(serviceErrorMsg, null, null, remoteStoreResult, null);
}
this.totalStoreCalls++;
long toCreateInsertedCur = remoteStoreResult.get("toCreateInserted") == null ? 0 : ((Long) remoteStoreResult.get("toCreateInserted")).longValue();
long toCreateUpdatedCur = remoteStoreResult.get("toCreateUpdated") == null ? 0 : ((Long) remoteStoreResult.get("toCreateUpdated")).longValue();
long toCreateNotUpdatedCur = remoteStoreResult.get("toCreateNotUpdated") == null ? 0 : ((Long) remoteStoreResult.get("toCreateNotUpdated")).longValue();
long toStoreInsertedCur = remoteStoreResult.get("toStoreInserted") == null ? 0 : ((Long) remoteStoreResult.get("toStoreInserted")).longValue();
long toStoreUpdatedCur = remoteStoreResult.get("toStoreUpdated") == null ? 0 : ((Long) remoteStoreResult.get("toStoreUpdated")).longValue();
long toStoreNotUpdatedCur = remoteStoreResult.get("toStoreNotUpdated") == null ? 0 : ((Long) remoteStoreResult.get("toStoreNotUpdated")).longValue();
long toRemoveDeletedCur = remoteStoreResult.get("toRemoveDeleted") == null ? 0 : ((Long) remoteStoreResult.get("toRemoveDeleted")).longValue();
long toRemoveAlreadyDeletedCur = remoteStoreResult.get("toRemoveAlreadyDeleted") == null ? 0 : ((Long) remoteStoreResult.get("toRemoveAlreadyDeleted")).longValue();
this.toCreateInserted += toCreateInsertedCur;
this.toCreateUpdated += toCreateUpdatedCur;
this.toCreateNotUpdated += toCreateNotUpdatedCur;
this.toStoreInserted += toStoreInsertedCur;
this.toStoreUpdated += toStoreUpdatedCur;
this.toStoreNotUpdated += toStoreNotUpdatedCur;
this.toRemoveDeleted += toRemoveDeletedCur;
this.toRemoveAlreadyDeleted += toRemoveAlreadyDeletedCur;
} catch (GenericServiceException e) {
throw new SyncServiceErrorException(serviceErrorMsg, e);
}
}
}
// ======================== PULL Methods ========================
public void runPullStartOrRestoreSavedResults() throws SyncDataErrorException, SyncServiceErrorException, SyncAbortException {
// if EntitySync.statusId is ESR_RUNNING, make sure startDate matches EntitySync.lastHistoryStartDate; or return error
if (isEntitySyncRunning() && this.startDate == null) {
throw new SyncAbortException("Not running EntitySync [" + entitySyncId + "], an instance is already running and no startDate for the current run was passed.");
}
if (this.startDate == null) {
// get it started!
String markErrorMsg = "Could not start Entity Sync service, could not mark as running";
try {
// not running, get started NOW
// set running status on entity sync, run in its own tx
Map<String, Object> startEntitySyncRes = dispatcher.runSync("updateEntitySyncRunning", UtilMisc.toMap("entitySyncId", entitySyncId, "runStatusId", "ESR_RUNNING", "userLogin", userLogin));
if (ModelService.RESPOND_ERROR.equals(startEntitySyncRes.get(ModelService.RESPONSE_MESSAGE))) {
throw new SyncDataErrorException(markErrorMsg, null, null, startEntitySyncRes, null);
}
} catch (GenericServiceException e) {
throw new SyncServiceErrorException(markErrorMsg, e);
}
// finally create the initial history record
this.createInitialHistory();
this.setSplitStartTime();
} else {
try {
// set the latest values from the EntitySyncHistory, based on the values on the EntitySync
GenericValue entitySyncHistory = EntityQuery.use(delegator).from("EntitySyncHistory").where("entitySyncId", entitySyncId, "startDate", startDate).queryOne();
this.toCreateInserted = UtilMisc.toLong(entitySyncHistory.getLong("toCreateInserted"));
this.toCreateUpdated = UtilMisc.toLong(entitySyncHistory.getLong("toCreateUpdated"));
this.toCreateNotUpdated = UtilMisc.toLong(entitySyncHistory.getLong("toCreateNotUpdated"));
this.toStoreInserted = UtilMisc.toLong(entitySyncHistory.getLong("toStoreInserted"));
this.toStoreUpdated = UtilMisc.toLong(entitySyncHistory.getLong("toStoreUpdated"));
this.toStoreNotUpdated = UtilMisc.toLong(entitySyncHistory.getLong("toStoreNotUpdated"));
this.toRemoveDeleted = UtilMisc.toLong(entitySyncHistory.getLong("toRemoveDeleted"));
this.toRemoveAlreadyDeleted = UtilMisc.toLong(entitySyncHistory.getLong("toRemoveAlreadyDeleted"));
this.totalStoreCalls = UtilMisc.toLong(entitySyncHistory.getLong("totalStoreCalls"));
this.totalSplits = UtilMisc.toLong(entitySyncHistory.getLong("totalSplits"));
this.totalRowsToCreate = UtilMisc.toLong(entitySyncHistory.getLong("totalRowsToCreate"));
this.totalRowsToStore = UtilMisc.toLong(entitySyncHistory.getLong("totalRowsToStore"));
this.totalRowsToRemove = UtilMisc.toLong(entitySyncHistory.getLong("totalRowsToRemove"));
this.perSplitMinMillis = UtilMisc.toLong(entitySyncHistory.getLong("perSplitMinMillis"));
this.perSplitMaxMillis = UtilMisc.toLong(entitySyncHistory.getLong("perSplitMaxMillis"));
this.perSplitMinItems = UtilMisc.toLong(entitySyncHistory.getLong("perSplitMinItems"));
this.perSplitMaxItems = UtilMisc.toLong(entitySyncHistory.getLong("perSplitMaxItems"));
this.splitStartTime = UtilMisc.toLong(entitySyncHistory.getLong("lastSplitStartTime"));
} catch (GenericEntityException e) {
throw new SyncDataErrorException("Error getting existing EntitySyncHistory values", e);
}
// got the previous values, now add to them with the values from the context...
this.toCreateInserted += UtilMisc.toLong(this.context.get("toCreateInserted"));
this.toCreateUpdated += UtilMisc.toLong(this.context.get("toCreateUpdated"));
this.toCreateNotUpdated += UtilMisc.toLong(this.context.get("toCreateNotUpdated"));
this.toStoreInserted += UtilMisc.toLong(this.context.get("toStoreInserted"));
this.toStoreUpdated += UtilMisc.toLong(this.context.get("toStoreUpdated"));
this.toStoreNotUpdated += UtilMisc.toLong(this.context.get("toStoreNotUpdated"));
this.toRemoveDeleted += UtilMisc.toLong(this.context.get("toRemoveDeleted"));
this.toRemoveAlreadyDeleted += UtilMisc.toLong(this.context.get("toRemoveAlreadyDeleted"));
this.totalStoreCalls++;
this.saveResultsReportedFromDataStore();
}
}
// ======================== OFFLINE Methods ========================
public void runOfflineStartRunning() throws SyncDataErrorException, SyncServiceErrorException, SyncAbortException {
// check to see if this sync is already running, if so return error
if (this.isEntitySyncRunning()) {
throw new SyncAbortException("Not running EntitySync [" + entitySyncId + "], an instance is already running.");
}
// flag this context as offline
this.isOfflineSync = true;
String markErrorMsg = "Could not start Entity Sync service, could not mark as running";
try {
// not running, get started NOW
// set running status on entity sync, run in its own tx
Map<String, Object> startEntitySyncRes = dispatcher.runSync("updateEntitySyncRunning", UtilMisc.toMap("entitySyncId", entitySyncId, "runStatusId", "ESR_RUNNING", "preOfflineSynchTime", this.lastSuccessfulSynchTime, "userLogin", userLogin));
if (ModelService.RESPOND_ERROR.equals(startEntitySyncRes.get(ModelService.RESPONSE_MESSAGE))) {
throw new SyncDataErrorException(markErrorMsg, null, null, startEntitySyncRes, null);
}
} catch (GenericServiceException e) {
throw new SyncServiceErrorException(markErrorMsg, e);
}
// finally create the initial history record
this.createInitialHistory();
}
public void runSaveOfflineSyncInfo(long rowsInSplit) throws SyncDataErrorException, SyncServiceErrorException, SyncAbortException {
this.totalRowsExported += rowsInSplit;
this.saveResultsReportedFromDataStore();
}
/**
* Static method to obtain a list of entity names which will be synchronized
*/
public static Set<String> getEntitySyncModelNamesToUse(LocalDispatcher dispatcher, String entitySyncId) throws SyncDataErrorException, SyncAbortException {
DispatchContext dctx = dispatcher.getDispatchContext();
EntitySyncContext ctx = new EntitySyncContext(dctx, UtilMisc.toMap("entitySyncId", entitySyncId));
return ctx.makeEntityNameToUseSet();
}
/** This class signifies an abort condition, so the state and such of the EntitySync value in the datasource should not be changed */
@SuppressWarnings("serial")
public static class SyncAbortException extends GeneralServiceException {
public SyncAbortException() {
super();
}
public SyncAbortException(String str) {
super(str);
}
public SyncAbortException(String str, Throwable nested) {
super(str, nested);
}
public SyncAbortException(Throwable nested) {
super(nested);
}
public SyncAbortException(String str, List<Object> errorMsgList, Map<String, Object> errorMsgMap, Map<String, Object> nestedServiceResult, Throwable nested) {
super(str, errorMsgList, errorMsgMap, nestedServiceResult, nested);
}
}
@SuppressWarnings("serial")
public static abstract class SyncErrorException extends GeneralServiceException {
public SyncErrorException() { super(); }
public SyncErrorException(String str) { super(str); }
public SyncErrorException(String str, Throwable nested) { super(str, nested); }
public SyncErrorException(Throwable nested) { super(nested); }
public SyncErrorException(String str, List<Object> errorMsgList, Map<String, Object> errorMsgMap, Map<String, Object> nestedServiceResult, Throwable nested) { super(str, errorMsgList, errorMsgMap, nestedServiceResult, nested); }
public abstract void saveSyncErrorInfo(EntitySyncContext esc);
}
/** This class signifies an error condition, so the state of the EntitySync value and the EntitySyncHistory value in the datasource should be changed to reflect the error */
@SuppressWarnings("serial")
public static class SyncOtherErrorException extends SyncErrorException {
public SyncOtherErrorException() { super(); }
public SyncOtherErrorException(String str) { super(str); }
public SyncOtherErrorException(String str, Throwable nested) { super(str, nested); }
public SyncOtherErrorException(Throwable nested) { super(nested); }
public SyncOtherErrorException(String str, List<Object> errorMsgList, Map<String, Object> errorMsgMap, Map<String, Object> nestedServiceResult, Throwable nested) { super(str, errorMsgList, errorMsgMap, nestedServiceResult, nested); }
@Override
public void saveSyncErrorInfo(EntitySyncContext esc) {
if (esc != null) {
List<Object> errorList = new LinkedList<Object>();
esc.saveSyncErrorInfo("ESR_OTHER_ERROR", errorList);
this.addErrorMessages(errorList);
}
}
}
/** This class signifies an error condition, so the state of the EntitySync value and the EntitySyncHistory value in the datasource should be changed to reflect the error */
@SuppressWarnings("serial")
public static class SyncDataErrorException extends SyncErrorException {
public SyncDataErrorException() { super(); }
public SyncDataErrorException(String str) { super(str); }
public SyncDataErrorException(String str, Throwable nested) { super(str, nested); }
public SyncDataErrorException(Throwable nested) { super(nested); }
public SyncDataErrorException(String str, List<Object> errorMsgList, Map<String, Object> errorMsgMap, Map<String, Object> nestedServiceResult, Throwable nested) { super(str, errorMsgList, errorMsgMap, nestedServiceResult, nested); }
@Override
public void saveSyncErrorInfo(EntitySyncContext esc) {
if (esc != null) {
List<Object> errorList = new LinkedList<Object>();
esc.saveSyncErrorInfo("ESR_DATA_ERROR", errorList);
this.addErrorMessages(errorList);
}
}
}
/** This class signifies an error condition, so the state of the EntitySync value and the EntitySyncHistory value in the datasource should be changed to reflect the error */
@SuppressWarnings("serial")
public static class SyncServiceErrorException extends SyncErrorException {
public SyncServiceErrorException() { super(); }
public SyncServiceErrorException(String str) { super(str); }
public SyncServiceErrorException(String str, Throwable nested) { super(str, nested); }
public SyncServiceErrorException(Throwable nested) { super(nested); }
public SyncServiceErrorException(String str, List<Object> errorMsgList, Map<String, Object> errorMsgMap, Map<String, Object> nestedServiceResult, Throwable nested) { super(str, errorMsgList, errorMsgMap, nestedServiceResult, nested); }
@Override
public void saveSyncErrorInfo(EntitySyncContext esc) {
if (esc != null) {
List<Object> errorList = new LinkedList<Object>();
esc.saveSyncErrorInfo("ESR_SERVICE_ERROR", errorList);
this.addErrorMessages(errorList);
}
}
}
}