blob: 000887b8159ebeee61456e33e339935c39b18599 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.syncope.core.provisioning.java.pushpull;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.syncope.common.lib.SyncopeConstants;
import org.apache.syncope.common.lib.to.Provision;
import org.apache.syncope.common.lib.types.ConflictResolutionAction;
import org.apache.syncope.core.persistence.api.dao.AnyDAO;
import org.apache.syncope.core.persistence.api.dao.AnySearchDAO;
import org.apache.syncope.core.persistence.api.dao.NotFoundException;
import org.apache.syncope.core.persistence.api.dao.RealmSearchDAO;
import org.apache.syncope.core.persistence.api.dao.search.SearchCond;
import org.apache.syncope.core.persistence.api.entity.Any;
import org.apache.syncope.core.persistence.api.entity.AnyType;
import org.apache.syncope.core.persistence.api.entity.AnyUtilsFactory;
import org.apache.syncope.core.persistence.api.entity.ExternalResource;
import org.apache.syncope.core.persistence.api.entity.Implementation;
import org.apache.syncope.core.persistence.api.entity.Realm;
import org.apache.syncope.core.persistence.api.entity.policy.PushPolicy;
import org.apache.syncope.core.persistence.api.entity.task.PushTask;
import org.apache.syncope.core.persistence.api.search.SearchCondConverter;
import org.apache.syncope.core.persistence.api.search.SearchCondVisitor;
import org.apache.syncope.core.provisioning.api.Connector;
import org.apache.syncope.core.provisioning.api.ProvisionSorter;
import org.apache.syncope.core.provisioning.api.job.JobExecutionContext;
import org.apache.syncope.core.provisioning.api.job.JobExecutionException;
import org.apache.syncope.core.provisioning.api.pushpull.AnyObjectPushResultHandler;
import org.apache.syncope.core.provisioning.api.pushpull.GroupPushResultHandler;
import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningProfile;
import org.apache.syncope.core.provisioning.api.pushpull.PushActions;
import org.apache.syncope.core.provisioning.api.pushpull.RealmPushResultHandler;
import org.apache.syncope.core.provisioning.api.pushpull.SyncopePushExecutor;
import org.apache.syncope.core.provisioning.api.pushpull.SyncopePushResultHandler;
import org.apache.syncope.core.provisioning.api.pushpull.UserPushResultHandler;
import org.apache.syncope.core.spring.ApplicationContextProvider;
import org.apache.syncope.core.spring.implementation.ImplementationManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
public class PushJobDelegate extends AbstractProvisioningJobDelegate<PushTask> implements SyncopePushExecutor {
@Autowired
protected AnySearchDAO searchDAO;
@Autowired
protected RealmSearchDAO realmSearchDAO;
@Autowired
protected AnyUtilsFactory anyUtilsFactory;
@Autowired
protected SearchCondVisitor searchCondVisitor;
protected ProvisioningProfile<PushTask, PushActions> profile;
protected final Map<String, MutablePair<Integer, String>> handled = new ConcurrentHashMap<>();
protected final Map<String, PushActions> perContextActions = new ConcurrentHashMap<>();
@Override
public void reportHandled(final String anyType, final String key) {
MutablePair<Integer, String> pair = handled.get(anyType);
if (pair == null) {
pair = MutablePair.of(0, null);
handled.put(anyType, pair);
}
pair.setLeft(pair.getLeft() + 1);
pair.setRight(key);
if (!handled.isEmpty()) {
StringBuilder builder = new StringBuilder("Processed:\n");
handled.forEach((k, v) -> builder.append(' ').append(v.getLeft()).append('\t').
append(k).
append(" / latest: ").append(v.getRight()).
append('\n'));
setStatus(builder.toString());
}
}
protected boolean doHandle(
final List<? extends Any<?>> anys,
final PushResultHandlerDispatcher dispatcher,
final ExternalResource resource)
throws JobExecutionException {
boolean result = true;
for (int i = 0; i < anys.size() && result; i++) {
try {
result = dispatcher.handle(anys.get(i).getType().getKey(), anys.get(i).getKey());
} catch (Exception e) {
LOG.warn("Failure pushing '{}' on '{}'", anys.get(i), resource, e);
throw new JobExecutionException("While pushing " + anys.get(i) + " on " + resource, e);
}
}
return result;
}
protected RealmPushResultHandler buildRealmHandler() {
return ApplicationContextProvider.getBeanFactory().createBean(DefaultRealmPushResultHandler.class);
}
protected AnyObjectPushResultHandler buildAnyObjectHandler() {
return ApplicationContextProvider.getBeanFactory().createBean(DefaultAnyObjectPushResultHandler.class);
}
protected UserPushResultHandler buildUserHandler() {
return ApplicationContextProvider.getBeanFactory().createBean(DefaultUserPushResultHandler.class);
}
protected GroupPushResultHandler buildGroupHandler() {
return ApplicationContextProvider.getBeanFactory().createBean(DefaultGroupPushResultHandler.class);
}
protected List<PushActions> getPushActions(final List<? extends Implementation> impls) {
List<PushActions> result = new ArrayList<>();
impls.forEach(impl -> {
try {
result.add(ImplementationManager.build(
impl,
() -> perContextActions.get(impl.getKey()),
instance -> perContextActions.put(impl.getKey(), instance)));
} catch (Exception e) {
LOG.warn("While building {}", impl, e);
}
});
return result;
}
@Override
protected String doExecuteProvisioning(
final PushTask pushTask,
final Connector connector,
final boolean dryRun,
final String executor,
final JobExecutionContext context) throws JobExecutionException {
LOG.debug("Executing push on {}", pushTask.getResource());
profile = new ProvisioningProfile<>(connector, pushTask);
profile.getActions().addAll(getPushActions(pushTask.getActions()));
profile.setDryRun(dryRun);
profile.setConflictResolutionAction(
Optional.ofNullable(pushTask.getResource().getPushPolicy()).
map(PushPolicy::getConflictResolutionAction).
orElse(ConflictResolutionAction.IGNORE));
profile.setExecutor(executor);
PushResultHandlerDispatcher dispatcher = new PushResultHandlerDispatcher(profile, this);
if (!profile.isDryRun()) {
for (PushActions action : profile.getActions()) {
action.beforeAll(profile);
}
}
setStatus("Initialization completed");
// First realms...
if (pushTask.getResource().getOrgUnit() != null) {
setStatus("Pushing realms");
dispatcher.addHandlerSupplier(SyncopeConstants.REALM_ANYTYPE, () -> {
RealmPushResultHandler handler = buildRealmHandler();
handler.setProfile(profile);
return handler;
});
// Never push the root realm
List<Realm> realms = realmSearchDAO.findDescendants(
profile.getTask().getSourceRealm().getFullPath(), null, Pageable.unpaged()).stream().
filter(realm -> realm.getParent() != null).toList();
boolean result = true;
for (int i = 0; i < realms.size() && result; i++) {
try {
result = dispatcher.handle(SyncopeConstants.REALM_ANYTYPE, realms.get(i).getKey());
} catch (Exception e) {
LOG.warn("Failure pushing '{}' on '{}'", realms.get(i), pushTask.getResource(), e);
throw new JobExecutionException(
"While pushing " + realms.get(i) + " on " + pushTask.getResource(), e);
}
}
}
// ...then provisions for any types
ProvisionSorter provisionSorter = getProvisionSorter(pushTask);
for (Provision provision : pushTask.getResource().getProvisions().stream().
filter(provision -> provision.getMapping() != null).sorted(provisionSorter).
toList()) {
setStatus("Pushing " + provision.getAnyType());
AnyType anyType = anyTypeDAO.findById(provision.getAnyType()).
orElseThrow(() -> new NotFoundException("AnyType" + provision.getAnyType()));
AnyDAO<?> anyDAO = anyUtilsFactory.getInstance(anyType.getKind()).dao();
dispatcher.addHandlerSupplier(provision.getAnyType(), () -> {
SyncopePushResultHandler handler;
switch (anyType.getKind()) {
case USER:
handler = buildUserHandler();
break;
case GROUP:
handler = buildGroupHandler();
break;
case ANY_OBJECT:
default:
handler = buildAnyObjectHandler();
}
handler.setProfile(profile);
return handler;
});
String filter = pushTask.getFilter(anyType.getKey()).orElse(null);
SearchCond cond = StringUtils.isBlank(filter)
? anyDAO.getAllMatchingCond()
: SearchCondConverter.convert(searchCondVisitor, filter);
long count = searchDAO.count(
profile.getTask().getSourceRealm(),
true,
Set.of(profile.getTask().getSourceRealm().getFullPath()),
cond,
anyType.getKind());
boolean result = true;
for (int page = 0; page <= (count / AnyDAO.DEFAULT_PAGE_SIZE) && result; page++) {
List<? extends Any<?>> anys = searchDAO.search(
profile.getTask().getSourceRealm(),
true,
Set.of(profile.getTask().getSourceRealm().getFullPath()),
cond,
PageRequest.of(page, AnyDAO.DEFAULT_PAGE_SIZE),
anyType.getKind());
result = doHandle(anys, dispatcher, pushTask.getResource());
}
}
dispatcher.shutdown();
if (!profile.isDryRun()) {
for (PushActions action : profile.getActions()) {
action.afterAll(profile);
}
}
setStatus("Push done");
String result = createReport(profile.getResults(), pushTask.getResource(), dryRun);
LOG.debug("Push result: {}", result);
return result;
}
}