blob: 26183987101101d03d91ff937cf84da1b6696329 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.syncope.common.keymaster.client.zookeeper;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.syncope.common.keymaster.client.api.KeymasterException;
import org.apache.syncope.common.keymaster.client.api.ServiceOps;
import org.apache.syncope.common.keymaster.client.api.model.NetworkService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
/**
* Implements {@link ServiceOps} via Apache Curator / Zookeeper via Curator's {@link ServiceDiscovery}.
*/
public class ZookeeperServiceDiscoveryOps implements ServiceOps, InitializingBean {
private static final Logger LOG = LoggerFactory.getLogger(ServiceOps.class);
private static final String SERVICE_PATH = "/services";
private final Map<NetworkService.Type, ServiceProvider<Void>> providers = new ConcurrentHashMap<>();
@Autowired
private CuratorFramework client;
private ServiceDiscovery<Void> discovery;
@Override
public void afterPropertiesSet() throws Exception {
discovery = ServiceDiscoveryBuilder.builder(Void.class).
client(client).
basePath(SERVICE_PATH).
build();
discovery.start();
}
private ServiceProvider<Void> getProvider(final NetworkService.Type type) {
return providers.computeIfAbsent(type, t -> {
try {
ServiceProvider<Void> provider = discovery.
serviceProviderBuilder().
serviceName(t.name()).build();
provider.start();
return provider;
} catch (KeymasterException e) {
throw e;
} catch (Exception e) {
throw new KeymasterException("While preparing ServiceProvider for " + type, e);
}
});
}
@Override
public void register(final NetworkService service) {
try {
unregister(service);
ServiceInstance<Void> instance = ServiceInstance.<Void>builder().
name(service.getType().name()).
address(service.getAddress()).
build();
discovery.registerService(instance);
} catch (KeymasterException e) {
throw e;
} catch (Exception e) {
LOG.error("While registering {}", service, e);
throw new KeymasterException(e);
}
}
@Override
public void unregister(final NetworkService service) {
try {
discovery.queryForInstances(service.getType().name()).stream().
filter(instance -> instance.getName().equals(service.getType().name())
&& instance.getAddress().equals(service.getAddress())).findFirst().
ifPresent(instance -> {
try {
discovery.unregisterService(instance);
} catch (Exception e) {
LOG.error("While deregistering {}", service, e);
throw new KeymasterException(e);
}
});
} catch (KeymasterException e) {
throw e;
} catch (Exception e) {
LOG.error("While registering {}", service, e);
throw new KeymasterException(e);
}
}
private static NetworkService toNetworkService(
final NetworkService.Type serviceType,
final ServiceInstance<Void> serviceInstance) {
NetworkService ns = new NetworkService();
ns.setType(serviceType);
ns.setAddress(serviceInstance.getAddress());
return ns;
}
@Override
public List<NetworkService> list(final NetworkService.Type serviceType) {
try {
return discovery.queryForInstances(serviceType.name()).stream().
map(serviceInstance -> toNetworkService(serviceType, serviceInstance)).
collect(Collectors.toList());
} catch (KeymasterException e) {
throw e;
} catch (Exception e) {
throw new KeymasterException(e);
}
}
@Override
public NetworkService get(final NetworkService.Type serviceType) {
ServiceInstance<Void> serviceInstance = null;
try {
if (!discovery.queryForInstances(serviceType.name()).isEmpty()) {
serviceInstance = getProvider(serviceType).getInstance();
}
} catch (KeymasterException e) {
throw e;
} catch (Exception e) {
throw new KeymasterException(e);
}
if (serviceInstance == null) {
throw new KeymasterException("No services found for " + serviceType);
}
return toNetworkService(serviceType, serviceInstance);
}
}