blob: 8fb5b57ca507289f43e4f816f99efcfc813a4781 [file] [log] [blame]
package org.apache.sling.mailarchiveserver.impl;
import static org.apache.james.mime4j.dom.field.FieldName.SUBJECT;
import static org.apache.sling.mailarchiveserver.util.MessageFieldName.CONTENT;
import static org.apache.sling.mailarchiveserver.util.MessageFieldName.HTML_BODY;
import static org.apache.sling.mailarchiveserver.util.MessageFieldName.LIST_ID;
import static org.apache.sling.mailarchiveserver.util.MessageFieldName.NAME;
import static org.apache.sling.mailarchiveserver.util.MessageFieldName.PLAIN_BODY;
import static org.apache.sling.mailarchiveserver.util.MessageFieldName.X_ORIGINAL_HEADER;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
import org.apache.jackrabbit.util.Text;
import org.apache.james.mime4j.dom.BinaryBody;
import org.apache.james.mime4j.dom.Body;
import org.apache.james.mime4j.dom.Entity;
import org.apache.james.mime4j.dom.Header;
import org.apache.james.mime4j.dom.Message;
import org.apache.james.mime4j.dom.Multipart;
import org.apache.james.mime4j.dom.TextBody;
import org.apache.james.mime4j.dom.field.FieldName;
import org.apache.james.mime4j.message.BodyPart;
import org.apache.james.mime4j.message.DefaultMessageWriter;
import org.apache.james.mime4j.stream.Field;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ModifiableValueMap;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.mailarchiveserver.api.AttachmentFilter;
import org.apache.sling.mailarchiveserver.api.MessageProcessor;
import org.apache.sling.mailarchiveserver.api.MessageStore;
import org.apache.sling.mailarchiveserver.api.ThreadKeyGenerator;
import org.apache.sling.mailarchiveserver.util.MailArchiveServerConstants;
import org.apache.sling.mailarchiveserver.util.MessageFieldName;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component
@Service(MessageStore.class)
public class MessageStoreImpl implements MessageStore {
public static final String PLAIN_MIMETYPE = "text/plain";
public static final String HTML_MIMETYPE = "text/html";
@Reference
private ResourceResolverFactory resourceResolverFactory;
@Reference
ThreadKeyGenerator threadKeyGen;
@Reference
AttachmentFilter attachmentFilter;
@Reference(
cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
policy=ReferencePolicy.DYNAMIC,
referenceInterface=MessageProcessor.class)
private SortedSet<ServiceReference> messageProcessorRefs = new TreeSet<ServiceReference>();
private List<MessageProcessor> messageProcessors = new ArrayList<MessageProcessor>();
private boolean processorsUpdated;
private BundleContext bundleContext = null;
static final String FIELD_SEPARATOR = " : ";
private static final String[] RE_PREFIXES = { "re:", "aw:", "fw:", "re ", "aw ", "fw ", "答复"};
// for testing
String archivePath = MailArchiveServerConstants.ARCHIVE_PATH;
String resourceTypeKey = MailArchiveServerConstants.RT_KEY;
private static final Logger logger = LoggerFactory.getLogger(MessageStoreImpl.class);
@Activate
private void activate(BundleContext bc) {
bundleContext = bc;
}
protected ResourceResolver getResourceResolver() throws LoginException {
return resourceResolverFactory.getAdministrativeResourceResolver(null);
}
public void save(Message msg) throws IOException {
ResourceResolver resolver = null;
try {
resolver = getResourceResolver();
save(resolver, msg);
} catch (LoginException e) {
throw new RuntimeException("LoginException", e);
} finally {
if(resolver != null) {
resolver.close();
}
}
}
private void save(ResourceResolver resolver, Message msg) throws IOException, LoginException {
// apply message processors
for(MessageProcessor processor : getSortedMessageProcessors()) {
logger.debug("Calling {}", processor);
processor.processMessage(msg);
}
// into path: archive/domain/list/thread/message
final Map<String, Object> msgProps = new HashMap<String, Object>();
final List<BodyPart> attachments = new ArrayList<BodyPart>();
msgProps.put(resourceTypeKey, MailArchiveServerConstants.MESSAGE_RT);
StringBuilder plainBody = new StringBuilder();
StringBuilder htmlBody = new StringBuilder();
Boolean hasBody = false;
if (!msg.isMultipart()) {
plainBody = new StringBuilder(getTextPart(msg));
} else {
Multipart multipart = (Multipart) msg.getBody();
recursiveMultipartProcessing(multipart, plainBody, htmlBody, hasBody, attachments);
}
msgProps.put(PLAIN_BODY, plainBody.toString().replaceAll("\r\n", "\n"));
if (htmlBody.length() > 0) {
msgProps.put(HTML_BODY, htmlBody.toString());
}
msgProps.putAll(getMessagePropertiesFromHeader(msg.getHeader()));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
new DefaultMessageWriter().writeHeader(msg.getHeader(), baos);
String origHdr = baos.toString(MailArchiveServerConstants.DEFAULT_ENCODER.charset().name());
msgProps.put(X_ORIGINAL_HEADER, origHdr);
final Header hdr = msg.getHeader();
final String listIdRaw = hdr.getField(LIST_ID).getBody();
final String listId = listIdRaw.substring(1, listIdRaw.length()-1); // remove < and >
final String list = getListNodeName(listId);
final String domain = getDomainNodeName(listId);
final String subject = (String) msgProps.get(SUBJECT);
final String threadPath = threadKeyGen.getThreadKey(subject);
final String threadName = removeRe(subject);
Resource parentResource = assertEachNode(resolver, archivePath, domain, list, threadPath, threadName);
String msgNodeName = makeJcrFriendly((String) msgProps.get(NAME));
boolean isMsgNodeCreated = assertResource(resolver, parentResource, msgNodeName, msgProps);
if (isMsgNodeCreated) {
Resource msgResource = resolver.getResource(parentResource, msgNodeName);
for (BodyPart att : attachments) {
if (!attachmentFilter.isEligible(att)) {
continue;
}
final Map<String, Object> attProps = new HashMap<String, Object>();
parseHeaderToProps(att.getHeader(), attProps);
Body body = att.getBody();
if (body instanceof BinaryBody) {
attProps.put(CONTENT, ((BinaryBody) body).getInputStream());
} else if (body instanceof TextBody) {
attProps.put(CONTENT, ((TextBody) body).getInputStream());
}
String attNodeName = Text.escapeIllegalJcrChars(att.getFilename());
assertResource(resolver, msgResource, attNodeName, attProps);
}
updateThread(resolver, parentResource, msgProps);
}
}
static void recursiveMultipartProcessing(Multipart multipart, StringBuilder plainBody, StringBuilder htmlBody, Boolean hasBody, List<BodyPart> attachments) throws IOException {
for (Entity enitiy : multipart.getBodyParts()) {
BodyPart part = (BodyPart) enitiy;
if (part.getDispositionType() != null && !part.getDispositionType().equals("")) {
// if DispositionType is null or empty, it means that it's multipart, not attached file
attachments.add(part);
} else {
if (part.isMimeType(PLAIN_MIMETYPE) && !hasBody) {
plainBody.append(getTextPart(part));
hasBody = true;
} else if (part.isMimeType(HTML_MIMETYPE) && !hasBody) {
htmlBody.append(getTextPart(part));
} else if (part.isMultipart()) {
recursiveMultipartProcessing((Multipart) part.getBody(), plainBody, htmlBody, hasBody, attachments);
}
}
}
}
public void saveAll(Iterator<Message> iterator) throws IOException {
ResourceResolver resolver = null;
try {
resolver = getResourceResolver();
int mcount = 0;
while (iterator.hasNext()) {
Message msg = iterator.next();
save(resolver, msg);
mcount++;
if (mcount % 100 == 0) {
logger.debug(mcount+" messages processed.");
}
}
logger.info(mcount+" messages processed.");
} catch(LoginException e) {
throw new RuntimeException("LoginException", e);
} finally {
if(resolver != null) {
resolver.close();
}
}
}
/**
* code taken from http://www.mozgoweb.com/posts/how-to-parse-mime-message-using-mime4j-library/
*/
static String getTextPart(Entity part) throws IOException {
TextBody tb = (TextBody) part.getBody();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
tb.writeTo(baos);
return baos.toString(MailArchiveServerConstants.DEFAULT_ENCODER.charset().name());
}
static Map<String, Object> getMessagePropertiesFromHeader(Header hdr) {
Map<String, Object> props = new HashMap<String, Object>();
parseHeaderToProps(hdr, props);
// message name
String name;
if (hdr.getField("Message-ID") != null) {
name = hdr.getField("Message-ID").getBody();
name = name.substring(1, name.length()-1); // remove < and >
} else {
name = Integer.toHexString(hdr.getField("Date").hashCode());
}
props.put(MessageFieldName.NAME, name);
return props;
}
private static void parseHeaderToProps(Header hdr, Map<String, Object> props) {
Set<String> processed = new HashSet<String>();
for (Field f : hdr.getFields()) {
String name = f.getName();
if (!processed.contains(name)) {
processed.add(name);
String value = "";
List<Field> fields = hdr.getFields(name);
for (Field fl : fields) {
value += fl.getBody()+FIELD_SEPARATOR;
}
props.put(name, value.substring(0, value.length()-FIELD_SEPARATOR.length()));
}
}
}
static String getListNodeName(String listId) {
String list = "";
String[] split = listId.split("\\.");
int splitL = split.length;
if (splitL >= 4) {
for (int i = 0; i < splitL-2; i++) {
list += split[i] + ".";
}
list = list.substring(0, list.length() - 1);
} else if (splitL == 3) {
list = split[0];
} else {
throw new RuntimeException("List-Id is invalid: minimum 2 separatory dots required.");
}
return list;
}
static String getDomainNodeName(String listId) {
String[] split = listId.split("\\.");
int splitL = split.length;
if (splitL >= 3) {
return split[splitL-2] + "." + split[splitL-1];
} else {
throw new RuntimeException("List-Id is invalid: minimum 2 separatory dots required.");
}
}
private Resource assertEachNode(ResourceResolver resolver, String archive, String domain, String list,
String threadPath, String threadName) throws PersistenceException, LoginException {
final String pathToMessage = archive+domain+"/"+list+"/"+threadPath;
String path = pathToMessage;
Resource resource = resolver.getResource(path);
int cnt = 0;
while (resource == null) {
cnt++;
path = path.substring(0, path.lastIndexOf("/"));
resource = resolver.getResource(path);
}
if (cnt > 0) {
int threadNodesNumber = threadPath.split("/").length;
// bind paths
List<String> nodePaths = new ArrayList<String>();
nodePaths.add(domain);
nodePaths.add(list);
for (String node : threadPath.split("/")) {
nodePaths.add(node);
}
// bind props
List<Map<String, Object>> nodeProps = new ArrayList<Map<String, Object>>();
nodeProps.add(setProperties(MailArchiveServerConstants.DOMAIN_RT, domain));
nodeProps.add(setProperties(MailArchiveServerConstants.LIST_RT, list));
for (int i = 0; i < threadNodesNumber-1; i++) {
nodeProps.add(null);
}
nodeProps.add(setProperties(MailArchiveServerConstants.THREAD_RT, threadName));
// checking
for (int i = nodePaths.size()-cnt; i < nodePaths.size(); i++) {
String name = nodePaths.get(i);
assertResource(resolver, resource, name, nodeProps.get(i));
resource = resolver.getResource(resource.getPath()+"/"+name);
}
}
resource = resolver.getResource(pathToMessage);
if (resource == null) {
throw new RuntimeException("Parent resource cannot be null.");
} else {
return resource;
}
}
private Map<String, Object> setProperties(String resourceType, String name) {
Map<String, Object> props = new HashMap<String, Object>();
props.put(resourceTypeKey, resourceType);
props.put(MessageFieldName.NAME, name);
return props;
}
private boolean assertResource(ResourceResolver resolver, Resource parent, String name, Map<String, Object> newProps)
throws LoginException, PersistenceException {
String checkPath = parent.getPath()+"/"+name;
final Resource checkResource = resolver.getResource(checkPath);
if (checkResource == null) {
final Resource newResource = resolver.create(parent, name, newProps);
resolver.commit();
logger.debug(String.format("Resource created at %s .", newResource.getPath()));
return true;
} else {
logger.debug(String.format("Resource at %s already exists.", checkResource.getPath()));
return false;
}
}
private static void updateThread(ResourceResolver resolver, Resource thread, Map<String, Object> msgProps) throws PersistenceException {
final ModifiableValueMap thrdProps = thread.adaptTo(ModifiableValueMap.class);
Long prop = (Long) thrdProps.get(MessageFieldName.LAST_UPDATE);
Date updatedDate = null;
if (prop != null) {
updatedDate = new Date(prop);
}
final String msgProp = (String) msgProps.get(FieldName.DATE);
SimpleDateFormat sdf = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss Z");
final Date msgDate = sdf.parse(msgProp, new ParsePosition(0));
if (updatedDate == null || msgDate.after(updatedDate)) {
thrdProps.put(MessageFieldName.LAST_UPDATE, msgDate.getTime());
resolver.commit();
}
}
static String makeJcrFriendly(String s) {
return s.replaceAll("[\\s\\.-]+", "_").replaceAll("\\W", "").replaceAll("\\_", " ").trim().replaceAll("[ ]+", "_");
}
static String removeRe(String s) {
s = s.trim();
boolean flag = true;
while (flag) {
flag = false;
for (String prefix : RE_PREFIXES) {
if (s.toLowerCase().startsWith(prefix)) {
s = s.substring(3).trim();
flag = true;
}
}
}
return s.trim();
}
public synchronized void bindMessageProcessor(ServiceReference ref) {
synchronized (messageProcessorRefs) {
messageProcessorRefs.add(ref);
}
processorsUpdated = true;
logger.info("Message processor {} added to pool.", ref);
}
public void unbindMessageProcessor(ServiceReference ref) {
synchronized (messageProcessorRefs) {
messageProcessorRefs.remove(ref);
}
processorsUpdated = true;
logger.info("Message processor {} removed from pool.", ref);
}
private Collection<MessageProcessor> getSortedMessageProcessors() {
if(processorsUpdated) {
synchronized (messageProcessorRefs) {
processorsUpdated = false;
messageProcessors.clear();
for(ServiceReference ref : messageProcessorRefs) {
messageProcessors.add((MessageProcessor)bundleContext.getService(ref));
}
}
logger.debug("Updated sorted list of MessageProcessor: {}", messageProcessors);
}
return messageProcessors;
}
}