blob: 17a6e594b3eb57c1112d253cae196c765c281a6c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.transport;
import java.net.InetAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Reservoir;
import com.codahale.metrics.Snapshot;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir;
import org.apache.cassandra.net.AbstractMessageHandler;
import org.apache.cassandra.net.ResourceLimits;
public class ClientResourceLimits
{
private static final Logger logger = LoggerFactory.getLogger(ClientResourceLimits.class);
private static final ResourceLimits.Concurrent GLOBAL_LIMIT = new ResourceLimits.Concurrent(getGlobalLimit());
private static final AbstractMessageHandler.WaitQueue GLOBAL_QUEUE = AbstractMessageHandler.WaitQueue.global(GLOBAL_LIMIT);
private static final ConcurrentMap<InetAddress, Allocator> PER_ENDPOINT_ALLOCATORS = new ConcurrentHashMap<>();
public static Allocator getAllocatorForEndpoint(InetAddress endpoint)
{
while (true)
{
Allocator result = PER_ENDPOINT_ALLOCATORS.computeIfAbsent(endpoint, Allocator::new);
if (result.acquire())
return result;
PER_ENDPOINT_ALLOCATORS.remove(endpoint, result);
}
}
public static long getGlobalLimit()
{
return DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes();
}
public static void setGlobalLimit(long newLimit)
{
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(newLimit);
long existingLimit = GLOBAL_LIMIT.setLimit(getGlobalLimit());
logger.info("Changed native_max_transport_requests_in_bytes from {} to {}", existingLimit, newLimit);
}
public static long getCurrentGlobalUsage()
{
return GLOBAL_LIMIT.using();
}
public static long getEndpointLimit()
{
return DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp();
}
public static void setEndpointLimit(long newLimit)
{
long existingLimit = DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp();
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(newLimit); // ensure new instances get the new limit
for (Allocator allocator : PER_ENDPOINT_ALLOCATORS.values())
existingLimit = allocator.endpointAndGlobal.endpoint().setLimit(newLimit);
logger.info("Changed native_max_transport_requests_in_bytes_per_ip from {} to {}", existingLimit, newLimit);
}
public static Snapshot getCurrentIpUsage()
{
DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir();
for (Allocator allocator : PER_ENDPOINT_ALLOCATORS.values())
{
histogram.update(allocator.endpointAndGlobal.endpoint().using());
}
return histogram.getSnapshot();
}
/**
* This will recompute the ip usage histo on each query of the snapshot when requested instead of trying to keep
* a histogram up to date with each request
*/
public static Reservoir ipUsageReservoir()
{
return new Reservoir()
{
public int size()
{
return PER_ENDPOINT_ALLOCATORS.size();
}
public void update(long l)
{
throw new IllegalStateException();
}
public Snapshot getSnapshot()
{
return getCurrentIpUsage();
}
};
}
/**
* Used during protocol negotiation in {@link InitialConnectionHandler} and then if protocol V4 or earlier is
* selected. V5 connections convert this to a {@link ResourceProvider} and use the global/endpoint
* limits and queues directly.
*/
static class Allocator
{
private final AtomicInteger refCount = new AtomicInteger(0);
private final InetAddress endpoint;
private final ResourceLimits.EndpointAndGlobal endpointAndGlobal;
// This is not used or exposed by instances of this class, but it is used for V5+
// client connections so it's initialized here as the same queue must be shared
// across all clients from the same remote address.
private final AbstractMessageHandler.WaitQueue waitQueue;
private Allocator(InetAddress endpoint)
{
this.endpoint = endpoint;
ResourceLimits.Concurrent limit = new ResourceLimits.Concurrent(getEndpointLimit());
endpointAndGlobal = new ResourceLimits.EndpointAndGlobal(limit, GLOBAL_LIMIT);
waitQueue = AbstractMessageHandler.WaitQueue.endpoint(limit);
}
private boolean acquire()
{
return 0 < refCount.updateAndGet(i -> i < 0 ? i : i + 1);
}
/**
* Decrement the reference count, possibly removing the instance from the cache
* if this is its final reference
*/
void release()
{
if (-1 == refCount.updateAndGet(i -> i == 1 ? -1 : i - 1))
PER_ENDPOINT_ALLOCATORS.remove(endpoint, this);
}
/**
* Attempt to allocate a number of permits representing bytes towards the inflight
* limits. To succeed, it must be possible to allocate from both the per-endpoint
* and global reserves.
*
* @param amount number permits to allocate
* @return outcome SUCCESS if the allocation was successful. In the case of failure,
* either INSUFFICIENT_GLOBAL or INSUFFICIENT_ENPOINT to indicate which reserve rejected
* the allocation request.
*/
ResourceLimits.Outcome tryAllocate(long amount)
{
return endpointAndGlobal.tryAllocate(amount);
}
/**
* Force an allocation of a number of permits representing bytes from the inflight
* limits. Permits will be acquired from both the per-endpoint and global reserves
* which may lead to either or both reserves going over their limits.
* @param amount number permits to allocate
*/
void allocate(long amount)
{
endpointAndGlobal.allocate(amount);
}
/**
* Release a number of permits representing bytes back to the both the per-endpoint and
* global limits for inflight requests.
*
* @param amount number of permits to release
* @return outcome, ABOVE_LIMIT if either reserve is above its configured limit after
* the operation completes or, BELOW_LIMIT if neither is.
* rejected the allocation request.
*/
ResourceLimits.Outcome release(long amount)
{
return endpointAndGlobal.release(amount);
}
@VisibleForTesting
long endpointUsing()
{
return endpointAndGlobal.endpoint().using();
}
@VisibleForTesting
long globallyUsing()
{
return endpointAndGlobal.global().using();
}
public String toString()
{
return String.format("InflightEndpointRequestPayload: %d/%d, InflightOverallRequestPayload: %d/%d",
endpointAndGlobal.endpoint().using(),
endpointAndGlobal.endpoint().limit(),
endpointAndGlobal.global().using(),
endpointAndGlobal.global().limit());
}
}
/**
* Used in protocol V5 and later by the AbstractMessageHandler/CQLMessageHandler hierarchy.
* This hides the allocate/tryAllocate/release methods from EndpointResourceLimits and exposes
* the endpoint and global limits, along with their corresponding
* {@link org.apache.cassandra.net.AbstractMessageHandler.WaitQueue} directly.
* Provided as an interface and single implementation for testing (see CQLConnectionTest)
*/
interface ResourceProvider
{
ResourceLimits.Limit globalLimit();
AbstractMessageHandler.WaitQueue globalWaitQueue();
ResourceLimits.Limit endpointLimit();
AbstractMessageHandler.WaitQueue endpointWaitQueue();
void release();
static class Default implements ResourceProvider
{
private final Allocator limits;
Default(Allocator limits)
{
this.limits = limits;
}
public ResourceLimits.Limit globalLimit()
{
return limits.endpointAndGlobal.global();
}
public AbstractMessageHandler.WaitQueue globalWaitQueue()
{
return GLOBAL_QUEUE;
}
public ResourceLimits.Limit endpointLimit()
{
return limits.endpointAndGlobal.endpoint();
}
public AbstractMessageHandler.WaitQueue endpointWaitQueue()
{
return limits.waitQueue;
}
public void release()
{
limits.release();
}
}
}
}