| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.usergrid.persistence.collection.uniquevalues; |
| |
| import akka.actor.ActorRef; |
| import akka.actor.ActorSystem; |
| import akka.actor.PoisonPill; |
| import akka.actor.Props; |
| import akka.cluster.client.ClusterClient; |
| import akka.cluster.singleton.ClusterSingletonManager; |
| import akka.cluster.singleton.ClusterSingletonManagerSettings; |
| import akka.cluster.singleton.ClusterSingletonProxy; |
| import akka.cluster.singleton.ClusterSingletonProxySettings; |
| import akka.pattern.Patterns; |
| import akka.util.Timeout; |
| import com.google.inject.Inject; |
| import com.google.inject.Injector; |
| import com.google.inject.Singleton; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; |
| import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; |
| import org.apache.usergrid.persistence.core.scope.ApplicationScope; |
| import org.apache.usergrid.persistence.model.entity.Entity; |
| import org.apache.usergrid.persistence.model.entity.Id; |
| import org.apache.usergrid.persistence.model.field.Field; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import scala.concurrent.Await; |
| import scala.concurrent.Future; |
| |
| import java.util.*; |
| import java.util.concurrent.TimeUnit; |
| |
| |
| @Singleton |
| public class UniqueValuesServiceImpl implements UniqueValuesService { |
| private static final Logger logger = LoggerFactory.getLogger( UniqueValuesServiceImpl.class ); |
| |
| static Injector injector; |
| UniqueValuesFig uniqueValuesFig; |
| ActorSystemManager actorSystemManager; |
| UniqueValuesTable table; |
| private ReservationCache reservationCache; |
| |
| |
| @Inject |
| public UniqueValuesServiceImpl( |
| Injector inj, |
| UniqueValuesFig uniqueValuesFig, |
| ActorSystemManager actorSystemManager, |
| UniqueValuesTable table ) { |
| |
| injector = inj; |
| this.actorSystemManager = actorSystemManager; |
| this.uniqueValuesFig = uniqueValuesFig; |
| this.table = table; |
| |
| ReservationCache.init( uniqueValuesFig.getUniqueValueCacheTtl() ); |
| this.reservationCache = ReservationCache.getInstance(); |
| } |
| |
| |
| @Override |
| public String getRouterPath() { |
| return "/user/uvProxy"; |
| } |
| |
| |
| private void subscribeToReservations( ActorSystem localSystem ) { |
| logger.info("Starting ReservationCacheUpdater"); |
| localSystem.actorOf( Props.create( ReservationCacheActor.class ), "subscriber"); |
| } |
| |
| |
| @Override |
| public void reserveUniqueValues( |
| ApplicationScope scope, Entity entity, UUID version, String region ) throws UniqueValueException { |
| |
| ready(); |
| |
| try { |
| for (Field field : entity.getFields()) { |
| if (field.isUnique()) { |
| reserveUniqueField( scope, entity, version, field, region ); |
| } |
| } |
| |
| } catch ( UniqueValueException e ) { |
| |
| for (Field field : entity.getFields()) { |
| if (field.isUnique()) { |
| try { |
| cancelUniqueField( scope, entity, version, field, region ); |
| } catch (Throwable ignored) { |
| logger.error( "Error canceling unique field", ignored ); |
| } |
| } |
| } |
| throw e; |
| } |
| } |
| |
| |
| @Override |
| public void confirmUniqueValues( |
| ApplicationScope scope, Entity entity, UUID version, String region ) throws UniqueValueException { |
| |
| ready(); |
| |
| try { |
| for (Field field : entity.getFields()) { |
| if (field.isUnique()) { |
| confirmUniqueField( scope, entity, version, field, region ); |
| } |
| } |
| |
| } catch ( UniqueValueException e ) { |
| |
| for (Field field : entity.getFields()) { |
| if (field.isUnique()) { |
| try { |
| cancelUniqueField( scope, entity, version, field, region ); |
| } catch (Throwable ex ) { |
| logger.error( "Error canceling unique field", ex ); |
| } |
| } |
| } |
| throw e; |
| } |
| |
| } |
| |
| |
| private void reserveUniqueField( |
| ApplicationScope scope, Entity entity, UUID version, Field field, String region ) throws UniqueValueException { |
| |
| UniqueValueActor.Request request = new UniqueValueActor.Reservation( |
| scope, entity.getId(), version, field ); |
| |
| UniqueValueActor.Reservation res = reservationCache.get( request.getConsistentHashKey() ); |
| // if ( res != null ) { |
| // getCacheCounter().inc(); |
| // } |
| if ( res != null && !res.getOwner().equals( request.getOwner() )) { |
| throw new UniqueValueException( "Error property not unique (cache)", field); |
| } |
| |
| sendUniqueValueRequest( entity, region, request ); |
| } |
| |
| |
| private void confirmUniqueField( |
| ApplicationScope scope, Entity entity, UUID version, Field field, String region) throws UniqueValueException { |
| |
| UniqueValueActor.Confirmation request = new UniqueValueActor.Confirmation( |
| scope, entity.getId(), version, field ); |
| |
| sendUniqueValueRequest( entity, region, request ); |
| } |
| |
| |
| private void cancelUniqueField( ApplicationScope scope, |
| Entity entity, UUID version, Field field, String region ) throws UniqueValueException { |
| |
| cancelUniqueField( scope, entity.getId(), version, field, region ); |
| } |
| |
| |
| private void cancelUniqueField( ApplicationScope scope, |
| Id entityId, UUID version, Field field, String region ) throws UniqueValueException { |
| |
| UniqueValueActor.Cancellation request = new UniqueValueActor.Cancellation( |
| scope, entityId, version, field ); |
| |
| if ( actorSystemManager.getCurrentRegion().equals( region ) ) { |
| |
| // sending to current region, use local clientActor |
| ActorRef clientActor = actorSystemManager.getClientActor(); |
| clientActor.tell( request, null ); |
| |
| } else { |
| |
| // sending to remote region, send via cluster client for that region |
| ActorRef clusterClient = actorSystemManager.getClusterClient( region ); |
| clusterClient.tell( new ClusterClient.Send("/user/clientActor", request), null ); |
| } |
| |
| } |
| |
| |
| private void ready() { |
| if ( !actorSystemManager.isReady() ) { |
| throw new RuntimeException("Unique values service not initialized, no request actors ready"); |
| } |
| |
| if ( !StringUtils.isEmpty( uniqueValuesFig.getAuthoritativeRegion() )) { |
| if ( !actorSystemManager.getRegions().contains( uniqueValuesFig.getAuthoritativeRegion() ) ) { |
| throw new RuntimeException( "Authoritative region not in region list" ); |
| } |
| } |
| } |
| |
| |
| private void sendUniqueValueRequest( |
| Entity entity, String region, UniqueValueActor.Request request ) throws UniqueValueException { |
| |
| int maxRetries = uniqueValuesFig.getRequestRetryCount(); |
| int retries = 0; |
| |
| UniqueValueActor.Response response = null; |
| while ( retries++ < maxRetries ) { |
| try { |
| Timeout t = new Timeout( uniqueValuesFig.getRequestTimeout(), TimeUnit.MILLISECONDS ); |
| |
| Future<Object> fut; |
| |
| if ( actorSystemManager.getCurrentRegion().equals( region ) ) { |
| |
| // sending to current region, use local clientActor |
| ActorRef clientActor = actorSystemManager.getClientActor(); |
| fut = Patterns.ask( clientActor, request, t ); |
| |
| } else { |
| |
| // sending to remote region, send via cluster client for that region |
| ActorRef clusterClient = actorSystemManager.getClusterClient( region ); |
| fut = Patterns.ask( clusterClient, new ClusterClient.Send("/user/clientActor", request), t ); |
| } |
| |
| // wait (up to timeout) for response |
| response = (UniqueValueActor.Response) Await.result( fut, t.duration() ); |
| |
| if ( response != null && ( |
| response.getStatus().equals( UniqueValueActor.Response.Status.IS_UNIQUE ) |
| || response.getStatus().equals( UniqueValueActor.Response.Status.NOT_UNIQUE ))) { |
| if ( retries > 1 ) { |
| logger.debug("IS_UNIQUE after retrying {} for entity {} rowkey {}", |
| retries, entity.getId().getUuid(), request.getConsistentHashKey()); |
| } |
| break; |
| |
| } else if ( response != null ) { |
| logger.warn("ERROR status retrying {} entity {} rowkey {}", |
| retries, entity.getId().getUuid(), request.getConsistentHashKey()); |
| } else { |
| logger.warn("Timed-out retrying {} entity {} rowkey", |
| retries, entity.getId().getUuid(), request.getConsistentHashKey()); |
| } |
| |
| } catch ( Exception e ) { |
| logger.error("{} caused retry {} for entity {} rowkey {}", |
| e.getClass().getSimpleName(), retries, entity.getId().getUuid(), request.getConsistentHashKey()); |
| } |
| } |
| |
| if ( response == null || response.getStatus().equals( UniqueValueActor.Response.Status.ERROR )) { |
| logger.debug("ERROR after retrying {} for entity {} rowkey {}", |
| retries, entity.getId().getUuid(), request.getConsistentHashKey()); |
| |
| // should result in an HTTP 503 |
| throw new RuntimeException( "Error verifying unique value after " + retries + " retries"); |
| } |
| |
| if ( response.getStatus().equals( UniqueValueActor.Response.Status.NOT_UNIQUE )) { |
| |
| // should result in an HTTP 409 (conflict) |
| throw new UniqueValueException( "Error property not unique", request.getField() ); |
| } |
| } |
| |
| |
| @Override |
| public void produceRouter( ActorSystem system, String role ) { |
| |
| ClusterSingletonManagerSettings settings = |
| ClusterSingletonManagerSettings.create( system ).withRole("io"); |
| |
| system.actorOf( ClusterSingletonManager.props( |
| Props.create( GuiceActorProducer.class, injector, UniqueValuesRouter.class ), |
| PoisonPill.getInstance(), settings ), "uvRouter" ); |
| |
| ClusterSingletonProxySettings proxySettings = |
| ClusterSingletonProxySettings.create( system ).withRole( role ); |
| |
| system.actorOf( ClusterSingletonProxy.props( "/user/uvRouter", proxySettings ), "uvProxy" ); |
| |
| subscribeToReservations( system ); |
| } |
| |
| |
| @Override |
| public void addConfiguration( Map<String, Object> configMap ) { |
| |
| int numInstancesPerNode = uniqueValuesFig.getUniqueValueInstancesPerNode(); |
| |
| // TODO: replace this configuration stuff with equivalent Java code in the above "create" methods? |
| |
| // be careful not to overwrite configurations that other router producers may have added |
| |
| Map<String, Object> akka = (Map<String, Object>) configMap.get( "akka" ); |
| final Map<String, Object> deploymentMap; |
| |
| if ( akka.get( "actor" ) == null ) { |
| |
| // nobody has created anything under "actor" yet, so create it now |
| deploymentMap = new HashMap<>(); |
| akka.put( "actor", new HashMap<String, Object>() {{ |
| put( "deployment", deploymentMap ); |
| }} ); |
| |
| } else if (((Map) akka.get( "actor" )).get( "deployment" ) == null) { |
| |
| // nobody has created anything under "actor/deployment" yet, so create it now |
| deploymentMap = new HashMap<>(); |
| ((Map) akka.get( "actor" )).put( "deployment", deploymentMap ); |
| |
| } else { |
| |
| // somebody else already created "actor/deployment" config so use it |
| deploymentMap = (Map<String, Object>) ((Map) akka.get( "actor" )).get( "deployment" ); |
| } |
| |
| deploymentMap.put( "/uvRouter/singleton/router", new HashMap<String, Object>() {{ |
| put( "router", "consistent-hashing-pool" ); |
| put( "cluster", new HashMap<String, Object>() {{ |
| put( "enabled", "on" ); |
| put( "allow-local-routees", "on" ); |
| put( "use-role", "io" ); |
| put( "max-nr-of-instances-per-node", numInstancesPerNode ); |
| put( "failure-detector", new HashMap<String, Object>() {{ |
| put( "threshold", "10" ); |
| put( "acceptable-heartbeat-pause", "3 s" ); |
| put( "heartbeat-interval", "1 s" ); |
| put( "heartbeat-request", new HashMap<String, Object>() {{ |
| put( "expected-response-after", "3 s" ); |
| }} ); |
| }} ); |
| }} ); |
| }} ); |
| |
| } |
| |
| |
| @Override |
| public Collection<Class> getMessageTypes() { |
| List<Class> messageTypes = new ArrayList<>(); |
| messageTypes.add( UniqueValueActor.Request.class); |
| messageTypes.add( UniqueValueActor.Reservation.class); |
| messageTypes.add( UniqueValueActor.Cancellation.class); |
| messageTypes.add( UniqueValueActor.Confirmation.class); |
| return messageTypes; |
| } |
| } |