blob: f8448a4d2c40596c5535cee461b46e23ec0fba76 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.distribution.serialization.impl.kryo;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.commons.io.IOUtils;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.SyntheticResource;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.api.wrappers.ValueMapDecorator;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.serialization.DistributionContentSerializer;
import org.apache.sling.distribution.serialization.DistributionExportFilter;
import org.apache.sling.distribution.serialization.DistributionExportOptions;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Kryo based {@link DistributionContentSerializer}
*/
public class KryoContentSerializer implements DistributionContentSerializer {
private final Logger log = LoggerFactory.getLogger(getClass());
private final String name;
public KryoContentSerializer(String name) {
this.name = name;
}
@Override
public void exportToStream(ResourceResolver resourceResolver, DistributionExportOptions options, OutputStream outputStream) throws DistributionException {
DistributionExportFilter filter = options.getFilter();
Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
kryo.addDefaultSerializer(Resource.class, new ResourceSerializer(filter.getPropertyFilter()));
kryo.addDefaultSerializer(InputStream.class, new InputStreamSerializer());
Output output = new Output(outputStream);
LinkedList<Resource> resources = new LinkedList<Resource>();
for (DistributionExportFilter.TreeFilter nodeFilter : filter.getNodeFilters()) {
Resource resource = resourceResolver.getResource(nodeFilter.getPath());
if (resource != null) {
addResource(nodeFilter, resources, resource);
}
}
kryo.writeObject(output, resources);
output.flush();
}
@Override
public void importFromStream(ResourceResolver resourceResolver, InputStream stream) throws DistributionException {
Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
kryo.addDefaultSerializer(Resource.class, new ResourceSerializer(null));
kryo.addDefaultSerializer(InputStream.class, new InputStreamSerializer());
try {
Input input = new Input(stream);
@SuppressWarnings("unchecked") LinkedList<Resource> resources = (LinkedList<Resource>) kryo.readObject(input, LinkedList.class);
input.close();
for (Resource resource : resources) {
persistResource(resourceResolver, resource);
}
resourceResolver.commit();
} catch (Exception e) {
throw new DistributionException(e);
}
}
@Override
public String getName() {
return name;
}
@Override
public boolean isRequestFiltering() {
return false;
}
private void persistResource(@NotNull ResourceResolver resourceResolver, Resource resource) throws PersistenceException {
String path = resource.getPath().trim();
String name = path.substring(path.lastIndexOf('/') + 1);
String substring = path.substring(0, path.lastIndexOf('/'));
String parentPath = substring.length() == 0 ? "/" : substring;
Resource existingResource = resourceResolver.getResource(path);
if (existingResource != null) {
resourceResolver.delete(existingResource);
}
Resource parent = resourceResolver.getResource(parentPath);
if (parent == null) {
parent = createParent(resourceResolver, parentPath);
}
Resource createdResource = resourceResolver.create(parent, name, resource.getValueMap());
log.debug("created resource {}", createdResource);
}
private Resource createParent(ResourceResolver resourceResolver, String path) throws PersistenceException {
String parentPath = path.substring(0, path.lastIndexOf('/'));
if (parentPath.length() == 0) {
parentPath = "/";
}
String name = path.substring(path.lastIndexOf('/') + 1);
Resource parentResource = resourceResolver.getResource(parentPath);
if (parentResource == null) {
parentResource = createParent(resourceResolver, parentPath);
}
Map<String, Object> properties = new HashMap<String, Object>();
return resourceResolver.create(parentResource, name, properties);
}
private class ResourceSerializer extends Serializer<Resource> {
private final DistributionExportFilter.TreeFilter propertyFilter;
private ResourceSerializer(@Nullable DistributionExportFilter.TreeFilter propertyFilter) {
this.propertyFilter = propertyFilter;
}
@Override
public void write(Kryo kryo, Output output, Resource resource) {
ValueMap valueMap = resource.getValueMap();
output.writeString(resource.getPath());
output.writeString(resource.getResourceType());
HashMap<String, Object> map = new HashMap<String, Object>();
for (Map.Entry<String, Object> entry : valueMap.entrySet()) {
if (propertyFilter == null || propertyFilter.matches(entry.getKey())) {
map.put(entry.getKey(), entry.getValue());
}
}
kryo.writeObjectOrNull(output, map, HashMap.class);
}
@Override
public Resource read(Kryo kryo, Input input, Class<Resource> type) {
String path = input.readString();
String resourceType = input.readString();
@SuppressWarnings("unchecked") final HashMap<String, Object> map = kryo.readObjectOrNull(input, HashMap.class);
return new SyntheticResource(null, path, resourceType) {
@Override
public ValueMap getValueMap() {
return new ValueMapDecorator(map);
}
};
}
}
private class ValueMapSerializer extends Serializer<ValueMap> {
@Override
public void write(Kryo kryo, Output output, ValueMap valueMap) {
for (Map.Entry<String, Object> entry : valueMap.entrySet()) {
output.writeString(entry.getKey());
output.writeString(entry.getValue().toString());
}
}
@Override
public ValueMap read(Kryo kryo, Input input, Class<ValueMap> type) {
final Map<String, Object> map = new HashMap<String, Object>();
String key;
while ((key = input.readString()) != null) {
String value = input.readString();
map.put(key, value);
}
return new ValueMapDecorator(map);
}
}
private class InputStreamSerializer extends Serializer<InputStream> {
@Override
public void write(Kryo kryo, Output output, InputStream stream) {
try {
byte[] bytes = IOUtils.toByteArray(stream);
output.writeInt(bytes.length);
output.write(bytes);
} catch (IOException e) {
log.warn("could not serialize input stream", e);
}
}
@Override
public InputStream read(Kryo kryo, Input input, Class<InputStream> type) {
int size = input.readInt();
byte[] bytes = new byte[size];
input.readBytes(bytes);
return new ByteArrayInputStream(bytes);
}
}
private void addResource(DistributionExportFilter.TreeFilter nodeFilter, LinkedList<Resource> resources, Resource resource) {
resources.add(resource);
for (Resource child : resource.getChildren()) {
if (nodeFilter.matches(child.getPath())) {
addResource(nodeFilter, resources, child);
}
}
}
}