blob: 371aa1fa5998839f82ee522facbac355cbc48c9d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.segment.standby.store;
import static org.osgi.service.component.annotations.ReferencePolicy.STATIC;
import static org.osgi.service.component.annotations.ReferencePolicyOption.GREEDY;
import java.io.File;
import java.util.Dictionary;
import java.util.Hashtable;
import org.apache.jackrabbit.guava.common.base.StandardSystemProperty;
import org.apache.jackrabbit.guava.common.io.Closer;
import org.apache.jackrabbit.oak.segment.SegmentStore;
import org.apache.jackrabbit.oak.segment.SegmentStoreProvider;
import org.apache.jackrabbit.oak.segment.file.FileStore;
import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync;
import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.AttributeType;
import org.osgi.service.metatype.annotations.Designate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.osgi.service.metatype.annotations.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component(configurationPolicy = ConfigurationPolicy.REQUIRE)
@Designate(ocd = StandbyStoreService.Configuration.class)
public class StandbyStoreService {
private static final Logger log = LoggerFactory.getLogger(StandbyStoreService.class);
private static final int BLOB_CHUNK_SIZE = Integer.getInteger("oak.standby.blob.chunkSize", 1024 * 1024);
@ObjectClassDefinition(
name = "Apache Jackrabbit Oak Segment Tar Cold Standby Service",
description = "Provides continuous backups of repositories based on Segment Tar"
)
@interface Configuration {
@AttributeDefinition(
name = "Persist configuration",
description = "Must be always disabled to avoid storing the configuration in the repository"
)
boolean org_apache_sling_installer_configuration_persist() default false;
@AttributeDefinition(
name = "Mode",
description = "TarMK Cold Standby mode (primary or standby)",
options = {
@Option(label = "primary", value = "primary"),
@Option(label = "standby", value = "standby")}
)
String mode() default "primary";
@AttributeDefinition(
name = "Port",
description = "TCP/IP port to use"
)
int port() default 8023;
@AttributeDefinition(
name = "Primary Host",
description = "Primary host (standby mode only)"
)
String primary_host() default "127.0.0.1";
@AttributeDefinition(
name = "Sync interval (seconds)",
description = "Sync interval in seconds (standby mode only)"
)
long interval() default 5;
@AttributeDefinition(
name = "Allowed IP-Ranges",
description = "Accept incoming requests for these host names and IP-ranges only (primary mode only)",
cardinality = Integer.MAX_VALUE
)
String[] primary_allowed$_$client$_$ip$_$ranges() default {};
@AttributeDefinition(
name = "Secure",
description = "Use secure connections"
)
boolean secure() default false;
@AttributeDefinition(
name = "Standby Read Timeout",
description = "Timeout for requests issued from the standby instance in milliseconds"
)
int standby_readtimeout() default 60000;
@AttributeDefinition(
name = "Standby Automatic Cleanup",
description = "Call the cleanup method when the root segment Garbage Collector (GC) generation number increases"
)
boolean standby_autoclean() default true;
@AttributeDefinition(
name = "SSL Key File",
description = "The file name which contains the SSL key. If this is empty, a key will be generated on-the-fly."
)
String sslKeyFile();
@AttributeDefinition(
name = "SSL Key Password",
description = "Password for the SSL key. If this is empty, an unencrypted key is expected.",
type = AttributeType.PASSWORD)
String sslKeyPassword() default "";
@AttributeDefinition(
name = "SSL Certificate Chain File",
description = "The file name which contains the SSL certificate chain."
)
String sslChainFile();
@AttributeDefinition(
name = "SSL Validate Client",
description = "Validate the client's SSL certificate."
)
boolean sslValidateClient() default false;
@AttributeDefinition(
name = "SSL Certificate Subject Pattern",
description = "The peer certificate's subject must match this pattern in order to be accepted."
)
String sslSubjectPattern();
}
@Reference(policy = STATIC, policyOption = GREEDY)
private SegmentStoreProvider storeProvider = null;
private final Closer closer = Closer.create();
@Activate
private void activate(ComponentContext context, Configuration config) {
SegmentStore segmentStore = storeProvider.getSegmentStore();
if (!(segmentStore instanceof FileStore)) {
throw new IllegalArgumentException("Unexpected SegmentStore implementation");
}
FileStore fileStore = (FileStore) segmentStore;
String mode = config.mode();
if (mode.equals("primary")) {
bootstrapPrimary(config, fileStore);
return;
}
if (mode.equals("standby")) {
bootstrapSecondary(context, config, fileStore);
return;
}
throw new IllegalArgumentException(String.format("Unexpected mode property, got '%s'", mode));
}
@Deactivate
public void deactivate() throws Exception {
closer.close();
}
private void bootstrapPrimary(Configuration config, FileStore fileStore) {
int port = config.port();
String[] ranges = config.primary_allowed$_$client$_$ip$_$ranges();
boolean secure = config.secure();
String sslKeyFile = config.sslKeyFile();
String sslChainFile = config.sslChainFile();
boolean sslValidateClient = config.sslValidateClient();
String sslSubjectPattern = config.sslSubjectPattern();
StandbyServerSync.Builder builder = StandbyServerSync.builder()
.withPort(port)
.withFileStore(fileStore)
.withBlobChunkSize(BLOB_CHUNK_SIZE)
.withAllowedClientIPRanges(ranges)
.withSecureConnection(secure)
.withSSLKeyFile(sslKeyFile)
.withSSLChainFile(sslChainFile)
.withSSLClientValidation(sslValidateClient)
.withSSLSubjectPattern(sslSubjectPattern);
if (!"".equals(config.sslKeyPassword())) {
builder.withSSLKeyPassword(config.sslKeyPassword());
}
StandbyServerSync standbyServerSync = builder.build();
closer.register(standbyServerSync);
standbyServerSync.start();
log.info("Started primary on port {} with allowed IP ranges {}", port, ranges);
}
private void bootstrapSecondary(ComponentContext context, Configuration config, FileStore fileStore) {
StandbyClientSync.Builder builder = StandbyClientSync.builder()
.withHost(config.primary_host())
.withPort(config.port())
.withFileStore(fileStore)
.withSecureConnection(config.secure())
.withReadTimeoutMs(config.standby_readtimeout())
.withAutoClean(config.standby_autoclean())
.withSpoolFolder(new File(StandardSystemProperty.JAVA_IO_TMPDIR.value()))
.withSecureConnection(config.secure())
.withSSLKeyFile(config.sslKeyFile())
.withSSLChainFile(config.sslChainFile())
.withSSLSubjectPattern(config.sslSubjectPattern());
if (!"".equals(config.sslKeyPassword())) {
builder.withSSLKeyPassword(config.sslKeyPassword());
}
StandbyClientSync standbyClientSync = builder.build();
closer.register(standbyClientSync);
Dictionary<String, Object> dictionary = new Hashtable<>();
dictionary.put("scheduler.period", config.interval());
dictionary.put("scheduler.concurrent", false);
ServiceRegistration registration = context.getBundleContext().registerService(Runnable.class, standbyClientSync, dictionary);
closer.register(registration::unregister);
log.info("Started standby on port {} with {}s sync frequency", config.port(), config.interval());
}
}