| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.catalina.ha.tcp; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.LongAdder; |
| import java.util.regex.Pattern; |
| import java.util.regex.PatternSyntaxException; |
| |
| import jakarta.servlet.ServletException; |
| |
| import org.apache.catalina.Cluster; |
| import org.apache.catalina.Context; |
| import org.apache.catalina.LifecycleException; |
| import org.apache.catalina.Manager; |
| import org.apache.catalina.Session; |
| import org.apache.catalina.connector.Request; |
| import org.apache.catalina.connector.Response; |
| import org.apache.catalina.core.StandardContext; |
| import org.apache.catalina.ha.CatalinaCluster; |
| import org.apache.catalina.ha.ClusterManager; |
| import org.apache.catalina.ha.ClusterMessage; |
| import org.apache.catalina.ha.ClusterSession; |
| import org.apache.catalina.ha.ClusterValve; |
| import org.apache.catalina.ha.session.DeltaManager; |
| import org.apache.catalina.ha.session.DeltaSession; |
| import org.apache.catalina.valves.ValveBase; |
| import org.apache.juli.logging.Log; |
| import org.apache.juli.logging.LogFactory; |
| import org.apache.tomcat.util.res.StringManager; |
| |
| /** |
| * <p> |
| * Implementation of a Valve that logs interesting contents from the specified Request (before processing) and the |
| * corresponding Response (after processing). It is especially useful in debugging problems related to headers and |
| * cookies. |
| * </p> |
| * <p> |
| * This Valve may be attached to any Container, depending on the granularity of the logging you wish to perform. |
| * </p> |
| * <p> |
| * primaryIndicator=true, then the request attribute <i>org.apache.catalina.ha.tcp.isPrimarySession.</i> is set true, |
| * when request processing is at sessions primary node. |
| * </p> |
| * @author Peter Rossbach |
| */ |
| public class ReplicationValve extends ValveBase implements ClusterValve { |
| |
| private static final Log log = LogFactory.getLog(ReplicationValve.class); |
| |
| // ----------------------------------------------------- Instance Variables |
| |
| /** |
| * The StringManager for this package. |
| */ |
| protected static final StringManager sm = StringManager.getManager(Constants.Package); |
| |
| private CatalinaCluster cluster = null; |
| |
| /** |
| * Filter expression |
| */ |
| protected Pattern filter = null; |
| |
| /** |
| * crossContext session container |
| */ |
| protected final ThreadLocal<ArrayList<DeltaSession>> crossContextSessions = new ThreadLocal<>(); |
| |
| /** |
| * doProcessingStats (default = off) |
| */ |
| protected boolean doProcessingStats = false; |
| |
| protected LongAdder totalRequestTime = new LongAdder(); |
| protected LongAdder totalSendTime = new LongAdder(); |
| protected LongAdder nrOfRequests = new LongAdder(); |
| protected AtomicLong lastSendTime = new AtomicLong(); |
| protected LongAdder nrOfFilterRequests = new LongAdder(); |
| protected LongAdder nrOfSendRequests = new LongAdder(); |
| protected LongAdder nrOfCrossContextSendRequests = new LongAdder(); |
| |
| /** |
| * Must set primary change indicator. |
| */ |
| protected boolean primaryIndicator = false; |
| |
| /** |
| * Name of primary change indicator as request attribute |
| */ |
| protected String primaryIndicatorName = "org.apache.catalina.ha.tcp.isPrimarySession"; |
| |
| // ------------------------------------------------------------- Properties |
| |
| public ReplicationValve() { |
| super(true); |
| } |
| |
| /** |
| * @return the cluster. |
| */ |
| @Override |
| public CatalinaCluster getCluster() { |
| return cluster; |
| } |
| |
| /** |
| * @param cluster The cluster to set. |
| */ |
| @Override |
| public void setCluster(CatalinaCluster cluster) { |
| this.cluster = cluster; |
| } |
| |
| /** |
| * @return the filter |
| */ |
| public String getFilter() { |
| if (filter == null) { |
| return null; |
| } |
| return filter.toString(); |
| } |
| |
| /** |
| * compile filter string to regular expression |
| * |
| * @see Pattern#compile(String) |
| * |
| * @param filter The filter to set. |
| */ |
| public void setFilter(String filter) { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("ReplicationValve.filter.loading", filter)); |
| } |
| |
| if (filter == null || filter.isEmpty()) { |
| this.filter = null; |
| } else { |
| try { |
| this.filter = Pattern.compile(filter); |
| } catch (PatternSyntaxException pse) { |
| log.error(sm.getString("ReplicationValve.filter.failure", filter), pse); |
| } |
| } |
| } |
| |
| /** |
| * @return the primaryIndicator. |
| */ |
| public boolean isPrimaryIndicator() { |
| return primaryIndicator; |
| } |
| |
| /** |
| * @param primaryIndicator The primaryIndicator to set. |
| */ |
| public void setPrimaryIndicator(boolean primaryIndicator) { |
| this.primaryIndicator = primaryIndicator; |
| } |
| |
| /** |
| * @return the primaryIndicatorName. |
| */ |
| public String getPrimaryIndicatorName() { |
| return primaryIndicatorName; |
| } |
| |
| /** |
| * @param primaryIndicatorName The primaryIndicatorName to set. |
| */ |
| public void setPrimaryIndicatorName(String primaryIndicatorName) { |
| this.primaryIndicatorName = primaryIndicatorName; |
| } |
| |
| /** |
| * Calc processing stats |
| * |
| * @return <code>true</code> if statistics are enabled |
| */ |
| public boolean doStatistics() { |
| return doProcessingStats; |
| } |
| |
| /** |
| * Set Calc processing stats |
| * |
| * @param doProcessingStats New flag value |
| * |
| * @see #resetStatistics() |
| */ |
| public void setStatistics(boolean doProcessingStats) { |
| this.doProcessingStats = doProcessingStats; |
| } |
| |
| /** |
| * @return the lastSendTime. |
| */ |
| public long getLastSendTime() { |
| return lastSendTime.longValue(); |
| } |
| |
| /** |
| * @return the nrOfRequests. |
| */ |
| public long getNrOfRequests() { |
| return nrOfRequests.longValue(); |
| } |
| |
| /** |
| * @return the nrOfFilterRequests. |
| */ |
| public long getNrOfFilterRequests() { |
| return nrOfFilterRequests.longValue(); |
| } |
| |
| /** |
| * @return the nrOfCrossContextSendRequests. |
| */ |
| public long getNrOfCrossContextSendRequests() { |
| return nrOfCrossContextSendRequests.longValue(); |
| } |
| |
| /** |
| * @return the nrOfSendRequests. |
| */ |
| public long getNrOfSendRequests() { |
| return nrOfSendRequests.longValue(); |
| } |
| |
| /** |
| * @return the totalRequestTime. |
| */ |
| public long getTotalRequestTime() { |
| return totalRequestTime.longValue(); |
| } |
| |
| /** |
| * @return the totalSendTime. |
| */ |
| public long getTotalSendTime() { |
| return totalSendTime.longValue(); |
| } |
| |
| // --------------------------------------------------------- Public Methods |
| |
| /** |
| * Register all cross context sessions inside endAccess. Use a list with contains check, that the Portlet API can |
| * include a lot of fragments from same or different applications with session changes. |
| * |
| * @param session cross context session |
| */ |
| public void registerReplicationSession(DeltaSession session) { |
| List<DeltaSession> sessions = crossContextSessions.get(); |
| if (sessions != null) { |
| if (!sessions.contains(session)) { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("ReplicationValve.crossContext.registerSession", session.getIdInternal(), |
| session.getManager().getContext().getName())); |
| } |
| sessions.add(session); |
| } |
| } |
| } |
| |
| @Override |
| public void invoke(Request request, Response response) throws IOException, ServletException { |
| long totalstart = 0; |
| |
| // this happens before the request |
| if (doStatistics()) { |
| totalstart = System.currentTimeMillis(); |
| } |
| if (primaryIndicator) { |
| createPrimaryIndicator(request); |
| } |
| Context context = request.getContext(); |
| boolean isCrossContext = context instanceof StandardContext && context.getCrossContext(); |
| boolean isAsync = request.getAsyncContextInternal() != null; |
| try { |
| if (isCrossContext) { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("ReplicationValve.crossContext.add")); |
| } |
| crossContextSessions.set(new ArrayList<>()); |
| } |
| getNext().invoke(request, response); |
| if (context != null && cluster != null && context.getManager() instanceof ClusterManager clusterManager) { |
| |
| // valve cluster can access manager - other cluster handle replication |
| // at host level - hopefully! |
| if (cluster.getManager(clusterManager.getName()) == null) { |
| return; |
| } |
| if (cluster.hasMembers()) { |
| sendReplicationMessage(request, totalstart, isCrossContext, isAsync, clusterManager); |
| } else { |
| resetReplicationRequest(request, isCrossContext); |
| } |
| } |
| } finally { |
| // Array must be removed: Current master request send endAccess at recycle. |
| // Don't register this request session again! |
| if (isCrossContext) { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("ReplicationValve.crossContext.remove")); |
| } |
| crossContextSessions.remove(); |
| } |
| } |
| } |
| |
| |
| /** |
| * reset the active statistics |
| */ |
| public void resetStatistics() { |
| totalRequestTime.reset(); |
| totalSendTime.reset(); |
| lastSendTime.set(0); |
| nrOfFilterRequests.reset(); |
| nrOfRequests.reset(); |
| nrOfSendRequests.reset(); |
| nrOfCrossContextSendRequests.reset(); |
| } |
| |
| @Override |
| protected void startInternal() throws LifecycleException { |
| if (cluster == null) { |
| Cluster containerCluster = getContainer().getCluster(); |
| if (containerCluster instanceof CatalinaCluster) { |
| setCluster((CatalinaCluster) containerCluster); |
| } else { |
| if (log.isWarnEnabled()) { |
| log.warn(sm.getString("ReplicationValve.nocluster")); |
| } |
| } |
| } |
| super.startInternal(); |
| } |
| |
| |
| // --------------------------------------------------------- Protected Methods |
| |
| protected void sendReplicationMessage(Request request, long totalstart, boolean isCrossContext, boolean isAsync, |
| ClusterManager clusterManager) { |
| // this happens after the request |
| long start = 0; |
| if (doStatistics()) { |
| start = System.currentTimeMillis(); |
| } |
| try { |
| // send invalid sessions |
| sendInvalidSessions(clusterManager); |
| // send replication |
| sendSessionReplicationMessage(request, clusterManager); |
| if (isCrossContext) { |
| sendCrossContextSession(); |
| } |
| } catch (Exception e) { |
| // FIXME we have a lot of sends, but the trouble with one node stops the correct replication to other nodes! |
| log.error(sm.getString("ReplicationValve.send.failure"), e); |
| } finally { |
| if (doStatistics()) { |
| updateStats(totalstart, start, isAsync); |
| } |
| } |
| } |
| |
| /** |
| * Send all changed cross context sessions to backups |
| */ |
| protected void sendCrossContextSession() { |
| List<DeltaSession> sessions = crossContextSessions.get(); |
| if (sessions != null && !sessions.isEmpty()) { |
| for (DeltaSession session : sessions) { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("ReplicationValve.crossContext.sendDelta", |
| session.getManager().getContext().getName())); |
| } |
| sendMessage(session, (ClusterManager) session.getManager()); |
| if (doStatistics()) { |
| nrOfCrossContextSendRequests.increment(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Fix memory leak for long sessions with many changes, when no backup member exists! |
| * |
| * @param request current request after response is generated |
| * @param isCrossContext check crosscontext threadlocal |
| */ |
| protected void resetReplicationRequest(Request request, boolean isCrossContext) { |
| Session contextSession = request.getSessionInternal(false); |
| if (contextSession instanceof DeltaSession) { |
| resetDeltaRequest(contextSession); |
| ((DeltaSession) contextSession).setPrimarySession(true); |
| } |
| if (isCrossContext) { |
| List<DeltaSession> sessions = crossContextSessions.get(); |
| if (sessions != null) { |
| for (DeltaSession session : sessions) { |
| resetDeltaRequest(session); |
| session.setPrimarySession(true); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Reset DeltaRequest from session |
| * |
| * @param session HttpSession from current request or cross context session |
| */ |
| protected void resetDeltaRequest(Session session) { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("ReplicationValve.resetDeltaRequest", session.getManager().getContext().getName())); |
| } |
| ((DeltaSession) session).resetDeltaRequest(); |
| } |
| |
| /** |
| * Send Cluster Replication Request |
| * |
| * @param request current request |
| * @param manager session manager |
| */ |
| protected void sendSessionReplicationMessage(Request request, ClusterManager manager) { |
| Session session = request.getSessionInternal(false); |
| if (session != null) { |
| String uri = request.getDecodedRequestURI(); |
| // request without session change |
| if (!isRequestWithoutSessionChange(uri)) { |
| if (log.isDebugEnabled()) { |
| log.debug(sm.getString("ReplicationValve.invoke.uri", uri)); |
| } |
| sendMessage(session, manager); |
| } else if (doStatistics()) { |
| nrOfFilterRequests.increment(); |
| } |
| } |
| |
| } |
| |
| /** |
| * Send message delta message from request session |
| * |
| * @param session current session |
| * @param manager session manager |
| */ |
| protected void sendMessage(Session session, ClusterManager manager) { |
| String id = session.getIdInternal(); |
| if (id != null) { |
| send(manager, id); |
| } |
| } |
| |
| /** |
| * send manager requestCompleted message to cluster |
| * |
| * @param manager SessionManager |
| * @param sessionId sessionid from the manager |
| * |
| * @see DeltaManager#requestCompleted(String) |
| * @see SimpleTcpCluster#send(ClusterMessage) |
| */ |
| protected void send(ClusterManager manager, String sessionId) { |
| ClusterMessage msg = manager.requestCompleted(sessionId); |
| if (msg != null && cluster != null) { |
| cluster.send(msg); |
| if (doStatistics()) { |
| nrOfSendRequests.increment(); |
| } |
| } |
| } |
| |
| /** |
| * check for session invalidations |
| * |
| * @param manager Associated manager |
| */ |
| protected void sendInvalidSessions(ClusterManager manager) { |
| String[] invalidIds = manager.getInvalidatedSessions(); |
| for (String invalidId : invalidIds) { |
| try { |
| send(manager, invalidId); |
| } catch (Exception e) { |
| log.error(sm.getString("ReplicationValve.send.invalid.failure", invalidId), e); |
| } |
| } |
| } |
| |
| /** |
| * is request without possible session change |
| * |
| * @param uri The request uri |
| * |
| * @return True if no session change |
| */ |
| protected boolean isRequestWithoutSessionChange(String uri) { |
| Pattern f = filter; |
| return f != null && f.matcher(uri).matches(); |
| } |
| |
| /** |
| * Protocol cluster replications stats |
| * |
| * @param requestTime Request time |
| * @param clusterTime Cluster time |
| * @param isAsync if the request was in async mode |
| */ |
| protected void updateStats(long requestTime, long clusterTime, boolean isAsync) { |
| long currentTime = System.currentTimeMillis(); |
| lastSendTime.set(currentTime); |
| totalSendTime.add(currentTime - clusterTime); |
| totalRequestTime.add(currentTime - requestTime); |
| if (!isAsync) { |
| nrOfRequests.increment(); |
| if (log.isDebugEnabled()) { |
| if ((nrOfRequests.longValue() % 100) == 0) { |
| log.debug(sm.getString("ReplicationValve.stats", |
| Long.valueOf(totalRequestTime.longValue() / nrOfRequests.longValue()), |
| Long.valueOf(totalSendTime.longValue() / nrOfRequests.longValue()), |
| Long.valueOf(nrOfRequests.longValue()), Long.valueOf(nrOfSendRequests.longValue()), |
| Long.valueOf(nrOfCrossContextSendRequests.longValue()), |
| Long.valueOf(nrOfFilterRequests.longValue()), Long.valueOf(totalRequestTime.longValue()), |
| Long.valueOf(totalSendTime.longValue()))); |
| } |
| } |
| } |
| } |
| |
| |
| /** |
| * Mark Request that processed at primary node with attribute primaryIndicatorName |
| * |
| * @param request The Servlet request |
| * |
| * @throws IOException IO error finding session |
| */ |
| protected void createPrimaryIndicator(Request request) throws IOException { |
| String id = request.getRequestedSessionId(); |
| if ((id != null) && (!id.isEmpty())) { |
| Manager manager = request.getContext().getManager(); |
| Session session = manager.findSession(id); |
| if (session instanceof ClusterSession cses) { |
| if (log.isDebugEnabled()) { |
| log.debug(sm.getString("ReplicationValve.session.indicator", request.getContext().getName(), id, |
| primaryIndicatorName, Boolean.valueOf(cses.isPrimarySession()))); |
| } |
| request.setAttribute(primaryIndicatorName, cses.isPrimarySession() ? Boolean.TRUE : Boolean.FALSE); |
| } else { |
| if (log.isDebugEnabled()) { |
| if (session != null) { |
| log.debug(sm.getString("ReplicationValve.session.found", request.getContext().getName(), id)); |
| } else { |
| log.debug(sm.getString("ReplicationValve.session.invalid", request.getContext().getName(), id)); |
| } |
| } |
| } |
| } |
| } |
| |
| } |