blob: 45047323152ff2d5b54d72f75e83cc0d93a4fe1b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jena.fuseki.server;
import static java.lang.String.format;
import static org.apache.jena.fuseki.server.DataServiceStatus.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.jena.ext.com.google.common.collect.ArrayListMultimap;
import org.apache.jena.ext.com.google.common.collect.ListMultimap;
import org.apache.jena.ext.com.google.common.collect.Multimaps;
import org.apache.jena.fuseki.Fuseki;
import org.apache.jena.fuseki.FusekiException;
import org.apache.jena.fuseki.auth.AuthPolicy;
import org.apache.jena.fuseki.servlets.ActionService;
import org.apache.jena.query.TxnType;
import org.apache.jena.query.text.DatasetGraphText;
import org.apache.jena.sparql.core.DatasetGraph;
public class DataService {
private DatasetGraph dataset;
private Map<String, EndpointSet> endpoints = new ConcurrentHashMap<>();
private ListMultimap<Operation, Endpoint> operationsMap = Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
// Dataset-level authorization policy.
private AuthPolicy authPolicy = null;
/**
* Record which {@link DataAccessPoint DataAccessPoints} this {@code DataService} is
* associated with. This is mainly for checking and development.
* Usually, one {@code DataService} is associated with one {@link DataAccessPoint}.
*/
private List<DataAccessPoint> dataAccessPoints = new ArrayList<>(1);
private volatile DataServiceStatus state = UNINITIALIZED;
// DataService-level counters.
private final CounterSet counters = new CounterSet();
private final AtomicBoolean offlineInProgress = new AtomicBoolean(false);
private final AtomicBoolean acceptingRequests = new AtomicBoolean(true);
/** Create a {@code DataService} for the given dataset. */
public DataService(DatasetGraph dataset) {
this.dataset = dataset;
counters.add(CounterName.Requests);
counters.add(CounterName.RequestsGood);
counters.add(CounterName.RequestsBad);
// Need to call goActive(). Not automatic.
//goActive();
}
/*package*/ void noteDataAccessPoint(DataAccessPoint dap) {
this.dataAccessPoints.add(dap);
}
private String label() {
StringJoiner sj = new StringJoiner(", ", "[", "]");
dataAccessPoints.stream()
.map(DataAccessPoint::getName)
.filter(x->!x.isEmpty())
.forEach(sj::add);
return sj.toString();
}
public DatasetGraph getDataset() {
return dataset;
}
// Convenience
public void addEndpoint(Operation operation) {
addEndpoint(operation, null, null);
}
public void addEndpoint(Operation operation, AuthPolicy authPolicy) {
addEndpoint(operation, null, authPolicy);
}
public void addEndpoint(Operation operation, String endpointName) {
addEndpoint(operation, endpointName, null);
}
public void addEndpoint(Operation operation, String endpointName, AuthPolicy authPolicy) {
Endpoint endpoint = Endpoint.create(operation, endpointName, authPolicy);
addEndpoint(endpoint);
}
public void addEndpoint(Endpoint endpoint) {
addEndpoint$(endpoint);
}
private void addEndpoint$(Endpoint endpoint) {
EndpointSet eps = endpoints.computeIfAbsent(endpoint.getName(), (k)->new EndpointSet(k));
eps.put(endpoint);
// Cleaner not to have duplicates. But nice to have a (short) list that keeps the create order.
if ( ! operationsMap.containsEntry(endpoint.getOperation(), endpoint) )
operationsMap.put(endpoint.getOperation(), endpoint);
}
private void removeEndpoint$(Endpoint endpoint) {
EndpointSet eps = endpoints.get(endpoint.getName());
if ( eps == null )
return;
eps.remove(endpoint);
operationsMap.remove(endpoint.getOperation(), endpoint);
}
/** Return the {@linkplain EndpointSet} for the operations for named use. */
public EndpointSet getEndpointSet(String endpointName) {
return endpoints.get(endpointName);
}
/** Return the {@linkplain EndpointSet} for the operations for unnamed use. */
public EndpointSet getEndpointSet() {
return endpoints.get("");
}
/** Return a collection of all endpoints for this {@linkplain DataService}. */
public Collection<Endpoint> getEndpoints() {
// A copy :-(
Set<Endpoint> x = new HashSet<>();
endpoints.forEach((k,eps)->{
eps.forEach((op,ep)->x.add(ep));
});
return x;
}
/** Execute an action for each {@link Endpoint}. */
public void forEachEndpoint(Consumer<Endpoint> action) {
endpoints.forEach((k,eps)->{
eps.forEach((op,ep)->action.accept(ep));
});
Set<Endpoint> x = new HashSet<>();
endpoints.forEach((k,eps)->{
eps.forEach((op,ep)->x.add(ep));
});
}
public List<Endpoint> getEndpoints(Operation operation) {
List<Endpoint> x = operationsMap.get(operation);
return x;
}
/** Return the operations available here.
* @see #getEndpoints(Operation) to get the endpoint list
*/
public Collection<Operation> getOperations() {
return operationsMap.keySet();
}
/** Return the operations available here.
* @see #getEndpoints(Operation) to get the endpoint list
*/
public boolean hasOperation(Operation operation) {
return operationsMap.keySet().contains(operation);
}
//@Override
public boolean allowUpdate() { return true; }
public void goOffline() {
offlineInProgress.set(true);
acceptingRequests.set(false);
state = OFFLINE;
}
/** Set any {@link ActionService} processors is currently unset. */
public void setEndpointProcessors(OperationRegistry operationRegistry) {
// Make sure the processor is set for each endpoint.
forEachEndpoint(ep->{
if ( ep.getProcessor() == null )
ep.setProcessor(operationRegistry.findHandler(ep.getOperation()));
});
}
private void ensureEnpointProcessors() {
// Better is to havee then set purposefully.
forEachEndpoint(ep->{
if ( ep.getProcessor() == null )
Fuseki.configLog.warn("No processor for "+ep.getName());
});
}
public void goActive() {
ensureEnpointProcessors();
offlineInProgress.set(false);
acceptingRequests.set(true);
state = ACTIVE;
}
// Due to concurrency, call isAcceptingRequests().
// public boolean isActive() {
// return state != DatasetStatus.ACTIVE;
// }
public boolean isAcceptingRequests() {
return acceptingRequests.get();
}
//@Override
public CounterSet getCounters() { return counters; }
//@Override
public long getRequests() {
return counters.value(CounterName.Requests);
}
//@Override
public long getRequestsGood() {
return counters.value(CounterName.RequestsGood);
}
//@Override
public long getRequestsBad() {
return counters.value(CounterName.RequestsBad);
}
/** Counter of active transactions */
public AtomicLong activeTxn = new AtomicLong(0);
/** Cumulative counter of transactions */
public AtomicLong totalTxn = new AtomicLong(0);
public void startTxn(TxnType mode) {
check(DataServiceStatus.ACTIVE);
activeTxn.getAndIncrement();
totalTxn.getAndIncrement();
}
private void check(DataServiceStatus status) {
if ( state != status ) {
String msg = format("DataService %s: Expected=%s, Actual=%s", label(), status, state);
throw new FusekiException(msg);
}
}
public void finishTxn() {
activeTxn.decrementAndGet();
}
/** Shutdown and never use again. */
public synchronized void shutdown() {
if ( state == CLOSING )
return;
expel(dataset);
dataset = null;
state = CLOSED;
}
private void expel(DatasetGraph database) {
// Text databases.
// Close the in-JVM objects for Lucene index and databases.
if ( database instanceof DatasetGraphText ) {
DatasetGraphText dbtext = (DatasetGraphText)database;
database = dbtext.getBase();
dbtext.getTextIndex().close();
}
boolean isTDB1 = org.apache.jena.tdb.sys.TDBInternal.isTDB1(database);
boolean isTDB2 = org.apache.jena.tdb2.sys.TDBInternal.isTDB2(database);
if ( ( isTDB1 || isTDB2 ) ) {
// JENA-1586: Remove database from the process.
if ( isTDB1 )
org.apache.jena.tdb.sys.TDBInternal.expel(database);
if ( isTDB2 )
org.apache.jena.tdb2.sys.TDBInternal.expel(database);
} else
dataset.close();
}
public void setAuthPolicy(AuthPolicy authPolicy) { this.authPolicy = authPolicy; }
/** Returning null implies no authorization control */
public AuthPolicy authPolicy() { return authPolicy; }
}