Update source files with ASF Licenses; Add ASF badge to README. (#1)

diff --git a/README.md b/README.md
index 75e668c..f9ee367 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,11 @@
-## *MATOS*: Serverless **M**essage **A**rchiver **T**o **O**bject **S**torage
+## Serverless **M**essage **A**rchiver **T**o **O**bject **S**torage (MATOS) Sample Application
+
+[![License](https://img.shields.io/badge/license-Apache--2.0-blue.svg)](http://www.apache.org/licenses/LICENSE-2.0)
+#[![Build Status](https://travis-ci.org/apache/incubator-openwhisk-sample-matos.svg?branch=master)](https://travis-ci.org/apache/incubator-openwhisk-sample-matos)
+
 ### Motivation
-***Matos*** demonstrates Bluemix-based serverless implementation of a simple pipeline (hosted on OpenWhisk) that reads messages from a Message Hub topic and archives them in batches into an Object Storage folder. 
+
+***Matos*** demonstrates Bluemix-based serverless implementation of a simple pipeline (hosted on OpenWhisk) that reads messages from a Message Hub topic and archives them in batches into an Object Storage folder.
 
 The serverless architecture introduces multiple advantages. First, by leveraging OpenWhisk and given the persistent nature of Message Hub, it is possible to apply the archiving in batches, and pay only for the short periods of execution time (typically seconds) of each batch. Moreover, the architecture can seamlessly accommodate spikes in load due to inherent elasticity of OpenWhisk. The combination of the two can dramatically reduce the overall cost, and increase the elasticity of the solution.
 
@@ -140,7 +145,7 @@
 ```sh
 :~/matos$ wsk action create matosMB --sequence mymatos/monitor,mymatos/batchW
 :~/matos$ wsk trigger create everyFiveMinutes --feed /whisk.system/alarms/alarm -p cron '*/5 * * * *'
-:~/matos$ wsk rule create --enable matosEvery5min everyFiveMinutes matosMB 
+:~/matos$ wsk rule create --enable matosEvery5min everyFiveMinutes matosMB
 ```
 
 As a result, consequent invocations of `load` will be handled in batches every 5 minutes - while `batch` action will be invoked only if new data is available.
diff --git a/js/batchW.js b/js/batchW.js
index 77353fd..a11ae87 100644
--- a/js/batchW.js
+++ b/js/batchW.js
@@ -1,3 +1,6 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more contributor
+// license agreements; and to You under the Apache License, Version 2.0.
+
 function main(params) {
     if(!params.owPath || !params.last || !params.committed) {
         return whisk.error();
@@ -18,4 +21,4 @@
         return whisk.done();
     }
 }
-	
+
diff --git a/src/com/ibm/matos/Batch.java b/src/com/ibm/matos/Batch.java
index e7ccc20..80d2b7b 100644
--- a/src/com/ibm/matos/Batch.java
+++ b/src/com/ibm/matos/Batch.java
@@ -1,22 +1,20 @@
-/**
- * Copyright 2015 IBM
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-/**
- * Licensed Materials - Property of IBM
- * (c) Copyright IBM Corp. 2015
- */
+
 package com.ibm.matos;
 
 import java.io.IOException;
@@ -40,169 +38,169 @@
 
 public class Batch {
 
-	private static Logger logger = Logger.getLogger(Batch.class);
-	private static KafkaConsumer<byte[], byte[]> kafkaConsumer;
-	// private Collection<TopicPartition> myPartitions = null;
-	private static TopicPartition tp;
-	private static boolean done;
-	private static final String KAFKA_CONSUMER_ID_KEY = "group.id";
-	private static final long FETCH_TIMEOUT_SEC = 60;
-	private static long startOffset, endOffset;
-	private static BatchAppender processor;
-	private static Config config;
+    private static Logger logger = Logger.getLogger(Batch.class);
+    private static KafkaConsumer<byte[], byte[]> kafkaConsumer;
+    // private Collection<TopicPartition> myPartitions = null;
+    private static TopicPartition tp;
+    private static boolean done;
+    private static final String KAFKA_CONSUMER_ID_KEY = "group.id";
+    private static final long FETCH_TIMEOUT_SEC = 60;
+    private static long startOffset, endOffset;
+    private static BatchAppender processor;
+    private static Config config;
 
-	// main method for Whisk action
-	public static JsonObject main(JsonObject args) {
+    // main method for Whisk action
+    public static JsonObject main(JsonObject args) {
 
-		try {
-			Utils.initDirs();
-			Utils.extractResourcesToFilesystem(false);
+        try {
+            Utils.initDirs();
+            Utils.extractResourcesToFilesystem(false);
 
-			config = new Config();
-			config.overrideProperties(args);
+            config = new Config();
+            config.overrideProperties(args);
 
-			// invoke the "real" main method, shared with Java main
-			doMain();
-			
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
+            // invoke the "real" main method, shared with Java main
+            doMain();
 
-		JsonObject response = new JsonObject();
-		response.addProperty("offsets", getOffsets());
-		return response;
-	}
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
 
-	// Java main class, reading arguments from command line and invoking doMain() 
-	public static void main(String args[]) throws InterruptedException, ExecutionException, IOException {
+        JsonObject response = new JsonObject();
+        response.addProperty("offsets", getOffsets());
+        return response;
+    }
 
-		Utils.initDirs();
-		Utils.extractResourcesToFilesystem(false);
+    // Java main class, reading arguments from command line and invoking doMain()
+    public static void main(String args[]) throws InterruptedException, ExecutionException, IOException {
 
-		if (args.length >= 5 && args.length <=8) {
-			config = new Config(args[0]);
-			HashMap<String,String> amap = new HashMap<String,String>();
-			amap.put(Config.KAFKA_API_KEY_PROP, args[1]);
-			amap.put(Config.SWIFT_TENANT_ID_PROP, args[2]);
-			amap.put(Config.SWIFT_USER_ID_PROP, args[3]);
-			amap.put(Config.SWIFT_PASSWORD_PROP, args[4]);
-			if (args.length >= 6)
-				amap.put(Config.KAFKA_PARTITION_PROP, args[5]);
-			if (args.length >= 7)
-				amap.put(Config.KAFKA_START_OFFSET_PROP, args[6]);
-			if (args.length >= 8)
-				amap.put(Config.KAFKA_END_OFFSET_PROP, args[7]);
-			config.overrideProperties(amap);
-		} else {
-			logger.log(Level.ERROR, "Usage:\n\n" +
-					"java -jar <name_of_jar>.jar <config-json-file-name> " +
-					"<kafka_api_key> <swift_tenant_id> <swift_user_id> <swift_password> " +
-					"[<kafka_partition> [<kafka_start_offset> [<kafka_end_offset>]]]");
-			return;
-		}
-		// invoke the "real" main method, shared with Whisk's main
-		doMain();
-	}
-	
-	private static void doMain() throws IOException, InterruptedException {
-		String consumerGroup;
-		logger.log(Level.INFO, "Starting " + Batch.class.getSimpleName() + "; CONFIG:");
-		logger.log(Level.INFO, config);
+        Utils.initDirs();
+        Utils.extractResourcesToFilesystem(false);
 
-		Utils.setJaasLocation();
+        if (args.length >= 5 && args.length <=8) {
+            config = new Config(args[0]);
+            HashMap<String,String> amap = new HashMap<String,String>();
+            amap.put(Config.KAFKA_API_KEY_PROP, args[1]);
+            amap.put(Config.SWIFT_TENANT_ID_PROP, args[2]);
+            amap.put(Config.SWIFT_USER_ID_PROP, args[3]);
+            amap.put(Config.SWIFT_PASSWORD_PROP, args[4]);
+            if (args.length >= 6)
+                amap.put(Config.KAFKA_PARTITION_PROP, args[5]);
+            if (args.length >= 7)
+                amap.put(Config.KAFKA_START_OFFSET_PROP, args[6]);
+            if (args.length >= 8)
+                amap.put(Config.KAFKA_END_OFFSET_PROP, args[7]);
+            config.overrideProperties(amap);
+        } else {
+            logger.log(Level.ERROR, "Usage:\n\n" +
+                    "java -jar <name_of_jar>.jar <config-json-file-name> " +
+                    "<kafka_api_key> <swift_tenant_id> <swift_user_id> <swift_password> " +
+                    "[<kafka_partition> [<kafka_start_offset> [<kafka_end_offset>]]]");
+            return;
+        }
+        // invoke the "real" main method, shared with Whisk's main
+        doMain();
+    }
 
-		String apiKey = config.get(Config.KAFKA_API_KEY_PROP);
-		Utils.updateJaasConfiguration(apiKey.substring(0, 16), apiKey.substring(16));
+    private static void doMain() throws IOException, InterruptedException {
+        String consumerGroup;
+        logger.log(Level.INFO, "Starting " + Batch.class.getSimpleName() + "; CONFIG:");
+        logger.log(Level.INFO, config);
 
-		consumerGroup = config.get(Config.KAFKA_CONSUMER_ID_PROP);
-		startOffset = Integer.parseInt(config.get(Config.KAFKA_START_OFFSET_PROP));
-		endOffset = Integer.parseInt(config.get(Config.KAFKA_END_OFFSET_PROP));
-		
-		String broker = config.get(Config.KAFKA_BROKER_PROP);
-		String topic = config.get(Config.KAFKA_TOPIC_PROP);
-		int partition = Integer.parseInt(config.get(Config.KAFKA_PARTITION_PROP));
+        Utils.setJaasLocation();
 
-		Properties props = Utils.getClientConfiguration(broker, false);
-		props.put(KAFKA_CONSUMER_ID_KEY, consumerGroup);
+        String apiKey = config.get(Config.KAFKA_API_KEY_PROP);
+        Utils.updateJaasConfiguration(apiKey.substring(0, 16), apiKey.substring(16));
 
-		// initialize Kafka consumer
-		kafkaConsumer = new KafkaConsumer<byte[], byte[]>(props, new ByteArrayDeserializer(),
-				new ByteArrayDeserializer());
+        consumerGroup = config.get(Config.KAFKA_CONSUMER_ID_PROP);
+        startOffset = Integer.parseInt(config.get(Config.KAFKA_START_OFFSET_PROP));
+        endOffset = Integer.parseInt(config.get(Config.KAFKA_END_OFFSET_PROP));
 
-		tp = new TopicPartition(topic, partition);
-		
-		logger.log(Level.INFO, "Assigning topic-partition: " + tp);
-		kafkaConsumer.assign(Collections.singletonList(tp));
-		
-		if (startOffset >= 0) {
-			logger.log(Level.INFO, "Rewinding " + tp + " to " + startOffset);
-			kafkaConsumer.seek(tp, startOffset);
-		} else {
-			logger.log(Level.INFO, "Starting with current offset");
-		}
-		
-		processor = new BatchAppender();
-		done = false;
-		int received = 0;
-		long start_time_sec = System.currentTimeMillis() / 1000;
-		
-		kafkaConsumer.poll(0);	// TODO: not needed?
-		
-		startOffset = kafkaConsumer.position(tp);
-		if (endOffset < 0) {	// get last offset
-			// rewind to end, get offset, then rewind back
-			kafkaConsumer.seekToEnd(Collections.singletonList(tp));
-			endOffset = kafkaConsumer.position(tp);	// returns the 'next after last' offset
-			kafkaConsumer.seek(tp, startOffset);
-		}
-		logger.log(Level.INFO, "Offsets to read: [" + startOffset + "," + endOffset +"]");
-		while (!done) {
-			// Poll on the Kafka consumer every second.
-			ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
-			logger.log(Level.INFO, "Retrieved " + records.count() + " records");
-			if (records.count() > 0) {
-				// we might need less than we have in the buffer
-				// considering also records received in previous iterations 
-				int last = Math.min(records.count(), 
-						(int) (endOffset - startOffset - received));
-				processor.processRecords(records.records(tp).subList(0, last), tp);
-			}
-			received += records.count();
-			if (startOffset + received >= endOffset) {
-				logger.log(Level.INFO, "Setting offset of " + tp + "(group=" + consumerGroup + ") to " + endOffset);
-				kafkaConsumer.seek(tp, endOffset);
-				kafkaConsumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(endOffset)));
-				done = true;
-			} else {
-				if (System.currentTimeMillis() / 1000 - start_time_sec > FETCH_TIMEOUT_SEC) {
-					String err = "TIMEOUT fetching from " + tp + ": expected " + (endOffset - startOffset)
-							+ " messages with offsets [" + startOffset + ".." + endOffset + "], "
-							+ " and received only " + received + " messages";
-					logger.log(Level.ERROR, err);
-					shutdown();
-				}
-			}
-		}
-		
-		kafkaConsumer.close();
+        String broker = config.get(Config.KAFKA_BROKER_PROP);
+        String topic = config.get(Config.KAFKA_TOPIC_PROP);
+        int partition = Integer.parseInt(config.get(Config.KAFKA_PARTITION_PROP));
 
-		// Store the retrieved messages into Object Storage
-		if (processor.getLast() > processor.getFirst()) {
-			BatchObStor obstor = new BatchObStor(config);
-			final String obstor_path = 
-				"matos/" + System.currentTimeMillis() + "_" 
-					+ processor.getFirst() + "-" + processor.getLast() + ".txt";
-			obstor.uploadFile(obstor_path, processor.getBytes());
-		}
-		
-		System.out.println("Offsets: " + getOffsets());
-	}
-	
-	private static String getOffsets() {
-		return "[" + processor.getFirst() + ".." + processor.getLast() + "]";
-	}
+        Properties props = Utils.getClientConfiguration(broker, false);
+        props.put(KAFKA_CONSUMER_ID_KEY, consumerGroup);
 
-	private static void shutdown() {
-		done = true;
-	}
+        // initialize Kafka consumer
+        kafkaConsumer = new KafkaConsumer<byte[], byte[]>(props, new ByteArrayDeserializer(),
+                new ByteArrayDeserializer());
+
+        tp = new TopicPartition(topic, partition);
+
+        logger.log(Level.INFO, "Assigning topic-partition: " + tp);
+        kafkaConsumer.assign(Collections.singletonList(tp));
+
+        if (startOffset >= 0) {
+            logger.log(Level.INFO, "Rewinding " + tp + " to " + startOffset);
+            kafkaConsumer.seek(tp, startOffset);
+        } else {
+            logger.log(Level.INFO, "Starting with current offset");
+        }
+
+        processor = new BatchAppender();
+        done = false;
+        int received = 0;
+        long start_time_sec = System.currentTimeMillis() / 1000;
+
+        kafkaConsumer.poll(0);  // TODO: not needed?
+
+        startOffset = kafkaConsumer.position(tp);
+        if (endOffset < 0) {    // get last offset
+            // rewind to end, get offset, then rewind back
+            kafkaConsumer.seekToEnd(Collections.singletonList(tp));
+            endOffset = kafkaConsumer.position(tp); // returns the 'next after last' offset
+            kafkaConsumer.seek(tp, startOffset);
+        }
+        logger.log(Level.INFO, "Offsets to read: [" + startOffset + "," + endOffset +"]");
+        while (!done) {
+            // Poll on the Kafka consumer every second.
+            ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
+            logger.log(Level.INFO, "Retrieved " + records.count() + " records");
+            if (records.count() > 0) {
+                // we might need less than we have in the buffer
+                // considering also records received in previous iterations
+                int last = Math.min(records.count(),
+                        (int) (endOffset - startOffset - received));
+                processor.processRecords(records.records(tp).subList(0, last), tp);
+            }
+            received += records.count();
+            if (startOffset + received >= endOffset) {
+                logger.log(Level.INFO, "Setting offset of " + tp + "(group=" + consumerGroup + ") to " + endOffset);
+                kafkaConsumer.seek(tp, endOffset);
+                kafkaConsumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(endOffset)));
+                done = true;
+            } else {
+                if (System.currentTimeMillis() / 1000 - start_time_sec > FETCH_TIMEOUT_SEC) {
+                    String err = "TIMEOUT fetching from " + tp + ": expected " + (endOffset - startOffset)
+                            + " messages with offsets [" + startOffset + ".." + endOffset + "], "
+                            + " and received only " + received + " messages";
+                    logger.log(Level.ERROR, err);
+                    shutdown();
+                }
+            }
+        }
+
+        kafkaConsumer.close();
+
+        // Store the retrieved messages into Object Storage
+        if (processor.getLast() > processor.getFirst()) {
+            BatchObStor obstor = new BatchObStor(config);
+            final String obstor_path =
+                "matos/" + System.currentTimeMillis() + "_"
+                    + processor.getFirst() + "-" + processor.getLast() + ".txt";
+            obstor.uploadFile(obstor_path, processor.getBytes());
+        }
+
+        System.out.println("Offsets: " + getOffsets());
+    }
+
+    private static String getOffsets() {
+        return "[" + processor.getFirst() + ".." + processor.getLast() + "]";
+    }
+
+    private static void shutdown() {
+        done = true;
+    }
 }
diff --git a/src/com/ibm/matos/BatchAppender.java b/src/com/ibm/matos/BatchAppender.java
index 5a60f31..ee6fbb4 100644
--- a/src/com/ibm/matos/BatchAppender.java
+++ b/src/com/ibm/matos/BatchAppender.java
@@ -1,22 +1,20 @@
-/**
- * Copyright 2015 IBM
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-/**
- * Licensed Materials - Property of IBM
- * (c) Copyright IBM Corp. 2015
- */
+
 package com.ibm.matos;
 
 import java.util.List;
@@ -27,45 +25,45 @@
 import org.apache.log4j.Logger;
 
 public class BatchAppender implements RecordsProcessor {
-	private static final Logger logger = Logger.getLogger(RecordsProcessor.class);
-	private long first, last;
-	private StringBuilder buffer;
+    private static final Logger logger = Logger.getLogger(RecordsProcessor.class);
+    private long first, last;
+    private StringBuilder buffer;
 
-	public BatchAppender() {
-		first = -1;
-		last = -1;
-	}
-	
-	@Override
-	public void processRecords(List<ConsumerRecord<byte[], byte[]>> records, TopicPartition tp) {
-		if (buffer == null) {
-			buffer = new StringBuilder();
-		}
-		if (first == -1) { // update 'first' only the first time
-			first = records.get(0).offset();	
-		}
-		last = records.get(0).offset() + records.size();
-		
-		logger.log(Level.INFO, "Processing " + records.size() + " records from " + tp + " starting from offset "
-				+ records.get(0).offset() + ":");
-		for (ConsumerRecord<byte[], byte[]> record : records) {
-			// processing goes here
-			String msg = "[" + record.offset() + "] " + new String(record.value());
-			buffer.append(msg + System.getProperty("line.separator"));
-//			logger.log(Level.DEBUG, "   " + msg);
-		}
-	}
-	
-	public byte[] getBytes() {
-		return buffer.toString().getBytes();		
-	}
-	
-	public long getFirst() {
-		return first;
-	}
+    public BatchAppender() {
+        first = -1;
+        last = -1;
+    }
 
-	public long getLast() {
-		return last;
-	}
+    @Override
+    public void processRecords(List<ConsumerRecord<byte[], byte[]>> records, TopicPartition tp) {
+        if (buffer == null) {
+            buffer = new StringBuilder();
+        }
+        if (first == -1) { // update 'first' only the first time
+            first = records.get(0).offset();
+        }
+        last = records.get(0).offset() + records.size();
+
+        logger.log(Level.INFO, "Processing " + records.size() + " records from " + tp + " starting from offset "
+                + records.get(0).offset() + ":");
+        for (ConsumerRecord<byte[], byte[]> record : records) {
+            // processing goes here
+            String msg = "[" + record.offset() + "] " + new String(record.value());
+            buffer.append(msg + System.getProperty("line.separator"));
+//          logger.log(Level.DEBUG, "   " + msg);
+        }
+    }
+
+    public byte[] getBytes() {
+        return buffer.toString().getBytes();
+    }
+
+    public long getFirst() {
+        return first;
+    }
+
+    public long getLast() {
+        return last;
+    }
 
 }
diff --git a/src/com/ibm/matos/BatchObStor.java b/src/com/ibm/matos/BatchObStor.java
index cedccb7..0d35624 100644
--- a/src/com/ibm/matos/BatchObStor.java
+++ b/src/com/ibm/matos/BatchObStor.java
@@ -1,22 +1,20 @@
-/**
- * Copyright 2015 IBM
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-/**
- * Licensed Materials - Property of IBM
- * (c) Copyright IBM Corp. 2015
- */
+
 package com.ibm.matos;
 
 import org.javaswift.joss.client.factory.AccountConfig;
@@ -28,36 +26,36 @@
 import com.ibm.stocator.fs.swift.auth.PasswordScopeAccessProvider;
 
 public class BatchObStor {
-	private Container container;
+    private Container container;
 
-	public BatchObStor (Config config) {
-		String auth_url = config.get(Config.SWIFT_AUTH_URL_PROP);
-		String userid = config.get(Config.SWIFT_USER_ID_PROP);
-		String passwd = config.get(Config.SWIFT_PASSWORD_PROP);
-		String tenantid = config.get(Config.SWIFT_TENANT_ID_PROP);
-		String region = config.get(Config.SWIFT_REGION_PROP);
-		String folder = config.get(Config.SWIFT_CONTAINER_PROP);
-	
-		AccountConfig ac = new AccountConfig();
-		ac.setAuthUrl(auth_url);
-		ac.setAuthenticationMethod(AuthenticationMethod.EXTERNAL);
-		PasswordScopeAccessProvider psap = new PasswordScopeAccessProvider(userid, passwd, tenantid, auth_url, region);
-		ac.setAccessProvider(psap);
-		
-		JossAccount acct = new JossAccount(ac, region, false);
-		acct.createAccount();
-		acct.authenticate();
-	
-	    container = acct.getAccount().getContainer(folder);
-	    if (!container.exists()) {
-	        container.create();
-	      }
-	}
-	
-	public void uploadFile(String filename, byte[] bytes) {
-	    StoredObject object = container.getObject(filename);
-	    object.uploadObject(bytes);
-		
-	}
+    public BatchObStor (Config config) {
+        String auth_url = config.get(Config.SWIFT_AUTH_URL_PROP);
+        String userid = config.get(Config.SWIFT_USER_ID_PROP);
+        String passwd = config.get(Config.SWIFT_PASSWORD_PROP);
+        String tenantid = config.get(Config.SWIFT_TENANT_ID_PROP);
+        String region = config.get(Config.SWIFT_REGION_PROP);
+        String folder = config.get(Config.SWIFT_CONTAINER_PROP);
+
+        AccountConfig ac = new AccountConfig();
+        ac.setAuthUrl(auth_url);
+        ac.setAuthenticationMethod(AuthenticationMethod.EXTERNAL);
+        PasswordScopeAccessProvider psap = new PasswordScopeAccessProvider(userid, passwd, tenantid, auth_url, region);
+        ac.setAccessProvider(psap);
+
+        JossAccount acct = new JossAccount(ac, region, false);
+        acct.createAccount();
+        acct.authenticate();
+
+        container = acct.getAccount().getContainer(folder);
+        if (!container.exists()) {
+            container.create();
+          }
+    }
+
+    public void uploadFile(String filename, byte[] bytes) {
+        StoredObject object = container.getObject(filename);
+        object.uploadObject(bytes);
+
+    }
 
 }
diff --git a/src/com/ibm/matos/Config.java b/src/com/ibm/matos/Config.java
index c2e1a65..516b03f 100644
--- a/src/com/ibm/matos/Config.java
+++ b/src/com/ibm/matos/Config.java
@@ -1,22 +1,20 @@
-/**
- * Copyright 2015 IBM
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-/**
- * Licensed Materials - Property of IBM
- * (c) Copyright IBM Corp. 2015
- */
+
 package com.ibm.matos;
 
 import java.io.File;
@@ -34,86 +32,86 @@
 
 public class Config {
 
-	private static Logger logger = Logger.getLogger(Config.class);
-	
-	// static Kafka properties
-	final static String KAFKA_BROKER_PROP = "kafkaBroker";
-	final static String KAFKA_REST_PROP = "kafkaRest";
-	final static String KAFKA_API_KEY_PROP = "kafkaApiKey";
-	final static String KAFKA_TOPIC_PROP = "kafkaTopic";
-	final static String KAFKA_PARTITION_PROP = "kafkaPartition";
-	final static String KAFKA_CONSUMER_ID_PROP = "kafkaConsumerId";
-	// variable Kafka properties
-	final static String KAFKA_START_OFFSET_PROP = "kafkaStartOffset";
-	final static String KAFKA_END_OFFSET_PROP = "kafkaEndOffset";
-	final static String KAFKA_NUM_RECORDS_PROP = "kafkaNumRecords";
-	// static Swift properties
-	final static String SWIFT_AUTH_URL_PROP = "swiftAuthUrl";
-	final static String SWIFT_REGION_PROP = "swiftRegion";
-	final static String SWIFT_TENANT_ID_PROP = "swiftTenantId";
-	final static String SWIFT_USER_ID_PROP = "swiftUserId";
-	final static String SWIFT_PASSWORD_PROP = "swiftPassword";
-	final static String SWIFT_CONTAINER_PROP = "swiftContainer";
-	final static String[] ALL_PROPS = {
-			KAFKA_BROKER_PROP,
-			KAFKA_REST_PROP,
-			KAFKA_API_KEY_PROP,
-			KAFKA_TOPIC_PROP,
-			KAFKA_PARTITION_PROP,
-			KAFKA_CONSUMER_ID_PROP,
-			KAFKA_START_OFFSET_PROP,
-			KAFKA_END_OFFSET_PROP,
-			KAFKA_NUM_RECORDS_PROP,
-			SWIFT_AUTH_URL_PROP,
-			SWIFT_REGION_PROP,
-			SWIFT_TENANT_ID_PROP,
-			SWIFT_USER_ID_PROP,
-			SWIFT_PASSWORD_PROP,
-			SWIFT_CONTAINER_PROP
-	};
-	
-	private final static String PROP_FILE_NAME = "matos.json";
-	private JsonObject matos;
-	
-	public Config() throws IOException {
-		this(PROP_FILE_NAME);
-	}
-	
-	public Config(String filename) throws IOException {
-		matos = loadProperties(filename); 
-	}
+    private static Logger logger = Logger.getLogger(Config.class);
 
-	private JsonObject loadProperties(String filename) throws IOException {
-		JsonObject matos = null;
-		InputStream is = new FileInputStream(Utils.resourceDir + File.separator + filename);
-		matos = new JsonParser().parse(new InputStreamReader(is)).getAsJsonObject();
-		logger.log(Level.INFO, "Matos Properties:" + matos);
-		return matos;
-	}
+    // static Kafka properties
+    final static String KAFKA_BROKER_PROP = "kafkaBroker";
+    final static String KAFKA_REST_PROP = "kafkaRest";
+    final static String KAFKA_API_KEY_PROP = "kafkaApiKey";
+    final static String KAFKA_TOPIC_PROP = "kafkaTopic";
+    final static String KAFKA_PARTITION_PROP = "kafkaPartition";
+    final static String KAFKA_CONSUMER_ID_PROP = "kafkaConsumerId";
+    // variable Kafka properties
+    final static String KAFKA_START_OFFSET_PROP = "kafkaStartOffset";
+    final static String KAFKA_END_OFFSET_PROP = "kafkaEndOffset";
+    final static String KAFKA_NUM_RECORDS_PROP = "kafkaNumRecords";
+    // static Swift properties
+    final static String SWIFT_AUTH_URL_PROP = "swiftAuthUrl";
+    final static String SWIFT_REGION_PROP = "swiftRegion";
+    final static String SWIFT_TENANT_ID_PROP = "swiftTenantId";
+    final static String SWIFT_USER_ID_PROP = "swiftUserId";
+    final static String SWIFT_PASSWORD_PROP = "swiftPassword";
+    final static String SWIFT_CONTAINER_PROP = "swiftContainer";
+    final static String[] ALL_PROPS = {
+            KAFKA_BROKER_PROP,
+            KAFKA_REST_PROP,
+            KAFKA_API_KEY_PROP,
+            KAFKA_TOPIC_PROP,
+            KAFKA_PARTITION_PROP,
+            KAFKA_CONSUMER_ID_PROP,
+            KAFKA_START_OFFSET_PROP,
+            KAFKA_END_OFFSET_PROP,
+            KAFKA_NUM_RECORDS_PROP,
+            SWIFT_AUTH_URL_PROP,
+            SWIFT_REGION_PROP,
+            SWIFT_TENANT_ID_PROP,
+            SWIFT_USER_ID_PROP,
+            SWIFT_PASSWORD_PROP,
+            SWIFT_CONTAINER_PROP
+    };
+
+    private final static String PROP_FILE_NAME = "matos.json";
+    private JsonObject matos;
+
+    public Config() throws IOException {
+        this(PROP_FILE_NAME);
+    }
+
+    public Config(String filename) throws IOException {
+        matos = loadProperties(filename);
+    }
+
+    private JsonObject loadProperties(String filename) throws IOException {
+        JsonObject matos = null;
+        InputStream is = new FileInputStream(Utils.resourceDir + File.separator + filename);
+        matos = new JsonParser().parse(new InputStreamReader(is)).getAsJsonObject();
+        logger.log(Level.INFO, "Matos Properties:" + matos);
+        return matos;
+    }
 
 
-	public void overrideProperties(JsonObject props) {
-		// override config properties with those passed in params
-		for (String prop : Config.ALL_PROPS) {
-			if (props.has(prop))
-				matos.addProperty(prop, props.getAsJsonPrimitive(prop).getAsString());
-		}
-	}
+    public void overrideProperties(JsonObject props) {
+        // override config properties with those passed in params
+        for (String prop : Config.ALL_PROPS) {
+            if (props.has(prop))
+                matos.addProperty(prop, props.getAsJsonPrimitive(prop).getAsString());
+        }
+    }
 
-	public void overrideProperties(Map<String,String> props) {
-		// override config properties with those passed in params
-		for (String prop : Config.ALL_PROPS) {
-			if (props.containsKey(prop))
-				matos.addProperty(prop, props.get(prop));
-		}
-	}
-	
-	public String get(String key) {
-		return matos.getAsJsonPrimitive(key).getAsString();
-	}
-	
-	public String toString() {
-		return matos.toString();
-	}
+    public void overrideProperties(Map<String,String> props) {
+        // override config properties with those passed in params
+        for (String prop : Config.ALL_PROPS) {
+            if (props.containsKey(prop))
+                matos.addProperty(prop, props.get(prop));
+        }
+    }
+
+    public String get(String key) {
+        return matos.getAsJsonPrimitive(key).getAsString();
+    }
+
+    public String toString() {
+        return matos.toString();
+    }
 
 }
diff --git a/src/com/ibm/matos/Load.java b/src/com/ibm/matos/Load.java
index 0135085..6ee2143 100644
--- a/src/com/ibm/matos/Load.java
+++ b/src/com/ibm/matos/Load.java
@@ -1,22 +1,20 @@
-/**
- * Copyright 2015 IBM
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-/**
- * Licensed Materials - Property of IBM
- * (c) Copyright IBM Corp. 2015
- */
+
 package com.ibm.matos;
 
 import java.io.IOException;
@@ -35,126 +33,126 @@
 
 public class Load {
 
-	private static Logger logger = Logger.getLogger(Load.class);
-	private static boolean done = false;
-	private static long lastOffset;
-	private static Config config;
+    private static Logger logger = Logger.getLogger(Load.class);
+    private static boolean done = false;
+    private static long lastOffset;
+    private static Config config;
 
 
-	// main method for Whisk action
-	public static JsonObject main(JsonObject args) {
+    // main method for Whisk action
+    public static JsonObject main(JsonObject args) {
 
-		try {
-			Utils.initDirs();
-			Utils.extractResourcesToFilesystem(true);
+        try {
+            Utils.initDirs();
+            Utils.extractResourcesToFilesystem(true);
 
-			config = new Config();
-			config.overrideProperties(args);
-			
-			// invoke the "real" main method, shared with Java main
-			doMain();
+            config = new Config();
+            config.overrideProperties(args);
 
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
+            // invoke the "real" main method, shared with Java main
+            doMain();
 
-		JsonObject response = new JsonObject();
-		response.addProperty("last", getLastOffset());
-		return response;
-	}
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
 
-	public static void main(String args[]) throws InterruptedException, ExecutionException, IOException {
+        JsonObject response = new JsonObject();
+        response.addProperty("last", getLastOffset());
+        return response;
+    }
 
-		Utils.initDirs();
-		Utils.extractResourcesToFilesystem(true);
+    public static void main(String args[]) throws InterruptedException, ExecutionException, IOException {
 
-		if (args.length == 3) {
-			config = new Config(args[0]);
-			HashMap<String,String> amap = new HashMap<String,String>();
-			amap.put(Config.KAFKA_API_KEY_PROP, args[1]);
-			amap.put(Config.KAFKA_NUM_RECORDS_PROP, args[2]);
-			config.overrideProperties(amap);
-		} else {
-			logger.log(Level.ERROR, "Usage:\n\n" +
-					"java -jar <name_of_jar>.jar <config-json-file-name> " +
-					"<kafka_api_key> <kafka_num_records>");
-			return;
-		}
-		// invoke the "real" main method, shared with Whisk's main
-		doMain();
-	}
-	
-	public static void doMain() throws InterruptedException, ExecutionException {
-		
-		KafkaProducer<byte[], byte[]> kafkaProducer;
-		String kafkaHost;
-		String apiKey;
-		String topic;
-		int numRecords;
-		int producedMessages;
-		
-		logger.log(Level.INFO, "Starting " + Load.class.getSimpleName() + "; CONFIG:");
-		logger.log(Level.INFO, config);
-		
-		Utils.setJaasLocation();
+        Utils.initDirs();
+        Utils.extractResourcesToFilesystem(true);
 
-		kafkaHost = config.get(Config.KAFKA_BROKER_PROP);
-		apiKey = config.get(Config.KAFKA_API_KEY_PROP);
-		topic = config.get(Config.KAFKA_TOPIC_PROP);
-		numRecords = Integer.parseInt(config.get(Config.KAFKA_NUM_RECORDS_PROP));
+        if (args.length == 3) {
+            config = new Config(args[0]);
+            HashMap<String,String> amap = new HashMap<String,String>();
+            amap.put(Config.KAFKA_API_KEY_PROP, args[1]);
+            amap.put(Config.KAFKA_NUM_RECORDS_PROP, args[2]);
+            config.overrideProperties(amap);
+        } else {
+            logger.log(Level.ERROR, "Usage:\n\n" +
+                    "java -jar <name_of_jar>.jar <config-json-file-name> " +
+                    "<kafka_api_key> <kafka_num_records>");
+            return;
+        }
+        // invoke the "real" main method, shared with Whisk's main
+        doMain();
+    }
 
-		Utils.updateJaasConfiguration(apiKey.substring(0, 16), apiKey.substring(16));
+    public static void doMain() throws InterruptedException, ExecutionException {
 
-		kafkaProducer = new KafkaProducer<byte[], byte[]>(
-				Utils.getClientConfiguration(kafkaHost, true));
-		
-		done = false;
-		producedMessages = 0;
-		lastOffset = -1;
-		Future<RecordMetadata> fm = null;
-		
-		while (!done) {
-			String fieldName = "records";
-			// Push a message into the list to be sent.
-			MessageList list = new MessageList();
-			long now = System.currentTimeMillis();
-			list.push("This is a test message[" + producedMessages + "] ["
-					+ new Date(now).toString() 
-					+ "|" + now + "]");
+        KafkaProducer<byte[], byte[]> kafkaProducer;
+        String kafkaHost;
+        String apiKey;
+        String topic;
+        int numRecords;
+        int producedMessages;
 
-			try {
-				// Create a producer record which will be sent
-				// to the Message Hub service, providing the topic
-				// name, field name and message. The field name and
-				// message are converted to UTF-8.
-				ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topic,
-						fieldName.getBytes("UTF-8"), list.toString().getBytes("UTF-8"));
+        logger.log(Level.INFO, "Starting " + Load.class.getSimpleName() + "; CONFIG:");
+        logger.log(Level.INFO, config);
 
-				// keep the metadata of the last produced message 
-				fm = kafkaProducer.send(record);
-				producedMessages++;
+        Utils.setJaasLocation();
 
-				if(producedMessages >= numRecords) {
-					done = true;
-				}
-			} catch (final Exception e) {
-				e.printStackTrace();
-				done = true;
-			}
-		}
-		// wait until last message has been sent, retrieve its offset
-		RecordMetadata m = fm.get();
-		logger.log(Level.INFO, "[" + producedMessages + " messages sent, last offset: " + m.offset() + "]");
-		lastOffset = m.offset()+1;	// 'next after last' offset
+        kafkaHost = config.get(Config.KAFKA_BROKER_PROP);
+        apiKey = config.get(Config.KAFKA_API_KEY_PROP);
+        topic = config.get(Config.KAFKA_TOPIC_PROP);
+        numRecords = Integer.parseInt(config.get(Config.KAFKA_NUM_RECORDS_PROP));
 
-		logger.log(Level.INFO, Load.class.toString() + " is shutting down.");
-		kafkaProducer.close();
+        Utils.updateJaasConfiguration(apiKey.substring(0, 16), apiKey.substring(16));
 
-		System.out.println("Last offset: " + getLastOffset());
-	}
+        kafkaProducer = new KafkaProducer<byte[], byte[]>(
+                Utils.getClientConfiguration(kafkaHost, true));
 
-	public static long getLastOffset() {
-		return lastOffset;
-	}
+        done = false;
+        producedMessages = 0;
+        lastOffset = -1;
+        Future<RecordMetadata> fm = null;
+
+        while (!done) {
+            String fieldName = "records";
+            // Push a message into the list to be sent.
+            MessageList list = new MessageList();
+            long now = System.currentTimeMillis();
+            list.push("This is a test message[" + producedMessages + "] ["
+                    + new Date(now).toString()
+                    + "|" + now + "]");
+
+            try {
+                // Create a producer record which will be sent
+                // to the Message Hub service, providing the topic
+                // name, field name and message. The field name and
+                // message are converted to UTF-8.
+                ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topic,
+                        fieldName.getBytes("UTF-8"), list.toString().getBytes("UTF-8"));
+
+                // keep the metadata of the last produced message
+                fm = kafkaProducer.send(record);
+                producedMessages++;
+
+                if(producedMessages >= numRecords) {
+                    done = true;
+                }
+            } catch (final Exception e) {
+                e.printStackTrace();
+                done = true;
+            }
+        }
+        // wait until last message has been sent, retrieve its offset
+        RecordMetadata m = fm.get();
+        logger.log(Level.INFO, "[" + producedMessages + " messages sent, last offset: " + m.offset() + "]");
+        lastOffset = m.offset()+1;  // 'next after last' offset
+
+        logger.log(Level.INFO, Load.class.toString() + " is shutting down.");
+        kafkaProducer.close();
+
+        System.out.println("Last offset: " + getLastOffset());
+    }
+
+    public static long getLastOffset() {
+        return lastOffset;
+    }
 
 }
diff --git a/src/com/ibm/matos/MessageList.java b/src/com/ibm/matos/MessageList.java
index 17aaab6..5ef184f 100644
--- a/src/com/ibm/matos/MessageList.java
+++ b/src/com/ibm/matos/MessageList.java
@@ -1,22 +1,20 @@
-/**
- * Copyright 2015 IBM
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-/**
- * Licensed Materials - Property of IBM
- * (c) Copyright IBM Corp. 2015
- */
+
 package com.ibm.matos;
 
 import java.util.ArrayList;
@@ -28,86 +26,86 @@
 
 public class MessageList {
 
-	private class Message {
-		@JsonProperty("value")
-		private String message;
+    private class Message {
+        @JsonProperty("value")
+        private String message;
 
-		@JsonProperty("timestamp")
-		private String commitTime;
+        @JsonProperty("timestamp")
+        private String commitTime;
 
-		public Message(String message) {
-			this.message = message;
-			this.commitTime = new Date().toString();
-		}
-	}
+        public Message(String message) {
+            this.message = message;
+            this.commitTime = new Date().toString();
+        }
+    }
 
-	private ArrayList<Message> messages;
+    private ArrayList<Message> messages;
 
-	/**
-	 * Constructs an instance of MessageList with the provided array of strings.
-	 * If the array is null, the message list is only initialized.
-	 * 
-	 * @param messages
-	 *            {String[]} Array of strings to add to the message list.
-	 */
-	public MessageList(String messages[]) {
-		this.messages = new ArrayList<Message>();
+    /**
+     * Constructs an instance of MessageList with the provided array of strings.
+     * If the array is null, the message list is only initialized.
+     *
+     * @param messages
+     *            {String[]} Array of strings to add to the message list.
+     */
+    public MessageList(String messages[]) {
+        this.messages = new ArrayList<Message>();
 
-		if (messages != null && messages.length > 0) {
-			for (int i = 0; i < messages.length; i++) {
-				push(messages[i]);
-			}
-		}
-	}
+        if (messages != null && messages.length > 0) {
+            for (int i = 0; i < messages.length; i++) {
+                push(messages[i]);
+            }
+        }
+    }
 
-	/**
-	 * Constructs an instance of MessageList with the provided ArrayList of
-	 * strings. If the ArrayList is null, the message list is only initialized.
-	 * 
-	 * @param messages
-	 *            {String[]} Array of strings to add to the message list.
-	 */
-	public MessageList(ArrayList<String> messages) {
-		this.messages = new ArrayList<Message>();
+    /**
+     * Constructs an instance of MessageList with the provided ArrayList of
+     * strings. If the ArrayList is null, the message list is only initialized.
+     *
+     * @param messages
+     *            {String[]} Array of strings to add to the message list.
+     */
+    public MessageList(ArrayList<String> messages) {
+        this.messages = new ArrayList<Message>();
 
-		if (messages != null && messages.size() > 0) {
-			for (String message : messages) {
-				push(message);
-			}
-		}
-	}
+        if (messages != null && messages.size() > 0) {
+            for (String message : messages) {
+                push(message);
+            }
+        }
+    }
 
-	/**
-	 * Constructs an empty instance of MessageList.
-	 */
-	public MessageList() {
-		this.messages = new ArrayList<Message>();
-	}
+    /**
+     * Constructs an empty instance of MessageList.
+     */
+    public MessageList() {
+        this.messages = new ArrayList<Message>();
+    }
 
-	/**
-	 * Adds a new message to the message list.
-	 * 
-	 * @param message
-	 *            {String} The message to add to the list.
-	 */
-	public void push(String message) {
-		this.messages.add(new Message(message));
-	}
+    /**
+     * Adds a new message to the message list.
+     *
+     * @param message
+     *            {String} The message to add to the list.
+     */
+    public void push(String message) {
+        this.messages.add(new Message(message));
+    }
 
-	/**
-	 * Build message list dependent on the format Message Hub requires. The
-	 * message list is in the form: [{ "value": string, "time": timestamp },
-	 * ...]
-	 * 
-	 * @return {String} String representation of a JSON object.
-	 */
-	@Override
-	public String toString() {
-		try {
-			return new ObjectMapper().writeValueAsString(messages);
-		} catch (final JsonProcessingException e) {
-			e.printStackTrace();
-			return "";
-		}
-	}
+    /**
+     * Build message list dependent on the format Message Hub requires. The
+     * message list is in the form: [{ "value": string, "time": timestamp },
+     * ...]
+     *
+     * @return {String} String representation of a JSON object.
+     */
+    @Override
+    public String toString() {
+        try {
+            return new ObjectMapper().writeValueAsString(messages);
+        } catch (final JsonProcessingException e) {
+            e.printStackTrace();
+            return "";
+        }
+    }
 }
diff --git a/src/com/ibm/matos/Monitor.java b/src/com/ibm/matos/Monitor.java
index 63dfa12..540408a 100644
--- a/src/com/ibm/matos/Monitor.java
+++ b/src/com/ibm/matos/Monitor.java
@@ -1,22 +1,20 @@
-/**
- * Copyright 2015 IBM
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-/**
- * Licensed Materials - Property of IBM
- * (c) Copyright IBM Corp. 2015
- */
+
 package com.ibm.matos;
 
 import java.io.IOException;
@@ -36,134 +34,134 @@
 
 public class Monitor {
 
-	private static Logger logger = Logger.getLogger(Monitor.class);
-	private static long lastOffset;
-	private static long committedOffset;
-	private static Config config;
-	private final static String KAFKA_CONSUMER_ID_KEY = "group.id";
-	
-	// main method for Whisk action
-	public static JsonObject main(JsonObject args) {
+    private static Logger logger = Logger.getLogger(Monitor.class);
+    private static long lastOffset;
+    private static long committedOffset;
+    private static Config config;
+    private final static String KAFKA_CONSUMER_ID_KEY = "group.id";
 
-		try {
-			Utils.initDirs();
-			Utils.extractResourcesToFilesystem(false);
+    // main method for Whisk action
+    public static JsonObject main(JsonObject args) {
 
-			config = new Config();
-			config.overrideProperties(args);
-			
-			// invoke the "real" main method, shared with Java main
-			doMain(true);
+        try {
+            Utils.initDirs();
+            Utils.extractResourcesToFilesystem(false);
 
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
+            config = new Config();
+            config.overrideProperties(args);
 
-		JsonObject response = new JsonObject();
-		response.addProperty("last", getLastOffset());
-		response.addProperty("committed", getCommittedOffset());
-		return response;
-	}
+            // invoke the "real" main method, shared with Java main
+            doMain(true);
 
-	public static void main(String args[]) throws InterruptedException, ExecutionException, IOException {
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
 
-		Utils.initDirs();
-		Utils.extractResourcesToFilesystem(false);
+        JsonObject response = new JsonObject();
+        response.addProperty("last", getLastOffset());
+        response.addProperty("committed", getCommittedOffset());
+        return response;
+    }
 
-		if (args.length == 2 || args.length == 3) {
-			config = new Config(args[0]);
-			HashMap<String,String> amap = new HashMap<String,String>();
-			amap.put(Config.KAFKA_API_KEY_PROP, args[1]);
-			if (args.length == 3) {
-				amap.put(Config.KAFKA_PARTITION_PROP, args[2]);
-			}
-			config.overrideProperties(amap);
-		} else {
-			logger.log(Level.ERROR, "Usage:\n\n" +
-					"java -jar <name_of_jar>.jar <config-json-file-name> " +
-					"<kafka_api_key> [<kafka_partition>]");
-			return;
-		}
-		// invoke the "real" main method, shared with Whisk's main
-		doMain(false);
-	}
-		
-	private static void doMain(boolean once) throws InterruptedException {
-		
-		String kafkaHost;
-		String apiKey;
-		String topic;
-		int partition;
-		String consumerId;
-		boolean done = false;
+    public static void main(String args[]) throws InterruptedException, ExecutionException, IOException {
 
-		Utils.setJaasLocation();
+        Utils.initDirs();
+        Utils.extractResourcesToFilesystem(false);
 
-		logger.log(Level.INFO, "Starting " + Monitor.class.getSimpleName() + "; CONFIG:");
-		logger.log(Level.INFO, config);
+        if (args.length == 2 || args.length == 3) {
+            config = new Config(args[0]);
+            HashMap<String,String> amap = new HashMap<String,String>();
+            amap.put(Config.KAFKA_API_KEY_PROP, args[1]);
+            if (args.length == 3) {
+                amap.put(Config.KAFKA_PARTITION_PROP, args[2]);
+            }
+            config.overrideProperties(amap);
+        } else {
+            logger.log(Level.ERROR, "Usage:\n\n" +
+                    "java -jar <name_of_jar>.jar <config-json-file-name> " +
+                    "<kafka_api_key> [<kafka_partition>]");
+            return;
+        }
+        // invoke the "real" main method, shared with Whisk's main
+        doMain(false);
+    }
 
-		kafkaHost = config.get(Config.KAFKA_BROKER_PROP);
-		apiKey = config.get(Config.KAFKA_API_KEY_PROP);
-		topic = config.get(Config.KAFKA_TOPIC_PROP);
-		partition = Integer.parseInt(config.get(Config.KAFKA_PARTITION_PROP));
-		consumerId = config.get(Config.KAFKA_CONSUMER_ID_PROP);
-		
-		Utils.updateJaasConfiguration(apiKey.substring(0, 16), apiKey.substring(16));
+    private static void doMain(boolean once) throws InterruptedException {
 
-		TopicPartition tp = new TopicPartition(topic, partition);
+        String kafkaHost;
+        String apiKey;
+        String topic;
+        int partition;
+        String consumerId;
+        boolean done = false;
 
-		// configure Kafka consumer that will be used to retrieve the last offset
-		Properties propsLast = Utils.getClientConfiguration(kafkaHost, false);
-		KafkaConsumer<byte[], byte[]> kafkaConsumerLast = new KafkaConsumer<byte[], 
-				byte[]>(propsLast, new ByteArrayDeserializer(),	new ByteArrayDeserializer());
+        Utils.setJaasLocation();
 
-		// configure Kafka consumer that will be used to retrieve the committed offset
-		Properties propsCommitted = Utils.getClientConfiguration(kafkaHost, false);
-		propsCommitted.put(KAFKA_CONSUMER_ID_KEY, consumerId);
-		KafkaConsumer<byte[], byte[]> kafkaConsumerCommitted = new KafkaConsumer<byte[], 
-				byte[]>(propsCommitted, new ByteArrayDeserializer(),	new ByteArrayDeserializer());
+        logger.log(Level.INFO, "Starting " + Monitor.class.getSimpleName() + "; CONFIG:");
+        logger.log(Level.INFO, config);
 
-		lastOffset = committedOffset = -1;
+        kafkaHost = config.get(Config.KAFKA_BROKER_PROP);
+        apiKey = config.get(Config.KAFKA_API_KEY_PROP);
+        topic = config.get(Config.KAFKA_TOPIC_PROP);
+        partition = Integer.parseInt(config.get(Config.KAFKA_PARTITION_PROP));
+        consumerId = config.get(Config.KAFKA_CONSUMER_ID_PROP);
 
-		while (!done) {
-			// retrieve last and committed offset in the topic-partition
-			updateLastOffset(kafkaConsumerLast, tp);
-			updateCommittedOffset(kafkaConsumerCommitted, tp);
-			System.out.print("[" + getCommittedOffset() + "," + getLastOffset() + "]");
-			
-			if (once) {
-				done = true;
-			} else {
-				Thread.sleep(5000);
-			}
-		}
+        Utils.updateJaasConfiguration(apiKey.substring(0, 16), apiKey.substring(16));
 
-		kafkaConsumerLast.close();
-		kafkaConsumerCommitted.close();
+        TopicPartition tp = new TopicPartition(topic, partition);
 
-		logger.log(Level.INFO, "Shutting down " + Monitor.class.getSimpleName());
-		
-	}
-	
-	private static void updateLastOffset(KafkaConsumer<?, ?> kafkaConsumer, TopicPartition tp) {
-		kafkaConsumer.assign(Collections.singletonList(tp));
-		kafkaConsumer.seekToEnd(Collections.singletonList(tp));
-		lastOffset = kafkaConsumer.position(tp);
-		logger.log(Level.INFO, "Retrieved last offset: " + lastOffset);
-	}
-	
-	private static void updateCommittedOffset(KafkaConsumer<?, ?> kafkaConsumer, TopicPartition tp) {
-		// does not require assignment
-		OffsetAndMetadata committed = kafkaConsumer.committed(tp);
-		logger.log(Level.INFO, "Position of " + tp + ": " + committed);
-		committedOffset = (committed != null ? committed.offset() : -1);
-	}
-	
-	private static long getLastOffset() {
-		return lastOffset;
-	}
-	
-	private static long getCommittedOffset() {
-		return committedOffset;
-	}
+        // configure Kafka consumer that will be used to retrieve the last offset
+        Properties propsLast = Utils.getClientConfiguration(kafkaHost, false);
+        KafkaConsumer<byte[], byte[]> kafkaConsumerLast = new KafkaConsumer<byte[],
+                byte[]>(propsLast, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+
+        // configure Kafka consumer that will be used to retrieve the committed offset
+        Properties propsCommitted = Utils.getClientConfiguration(kafkaHost, false);
+        propsCommitted.put(KAFKA_CONSUMER_ID_KEY, consumerId);
+        KafkaConsumer<byte[], byte[]> kafkaConsumerCommitted = new KafkaConsumer<byte[],
+                byte[]>(propsCommitted, new ByteArrayDeserializer(),    new ByteArrayDeserializer());
+
+        lastOffset = committedOffset = -1;
+
+        while (!done) {
+            // retrieve last and committed offset in the topic-partition
+            updateLastOffset(kafkaConsumerLast, tp);
+            updateCommittedOffset(kafkaConsumerCommitted, tp);
+            System.out.print("[" + getCommittedOffset() + "," + getLastOffset() + "]");
+
+            if (once) {
+                done = true;
+            } else {
+                Thread.sleep(5000);
+            }
+        }
+
+        kafkaConsumerLast.close();
+        kafkaConsumerCommitted.close();
+
+        logger.log(Level.INFO, "Shutting down " + Monitor.class.getSimpleName());
+
+    }
+
+    private static void updateLastOffset(KafkaConsumer<?, ?> kafkaConsumer, TopicPartition tp) {
+        kafkaConsumer.assign(Collections.singletonList(tp));
+        kafkaConsumer.seekToEnd(Collections.singletonList(tp));
+        lastOffset = kafkaConsumer.position(tp);
+        logger.log(Level.INFO, "Retrieved last offset: " + lastOffset);
+    }
+
+    private static void updateCommittedOffset(KafkaConsumer<?, ?> kafkaConsumer, TopicPartition tp) {
+        // does not require assignment
+        OffsetAndMetadata committed = kafkaConsumer.committed(tp);
+        logger.log(Level.INFO, "Position of " + tp + ": " + committed);
+        committedOffset = (committed != null ? committed.offset() : -1);
+    }
+
+    private static long getLastOffset() {
+        return lastOffset;
+    }
+
+    private static long getCommittedOffset() {
+        return committedOffset;
+    }
 }
diff --git a/src/com/ibm/matos/RecordsProcessor.java b/src/com/ibm/matos/RecordsProcessor.java
index 010579f..974c8cd 100644
--- a/src/com/ibm/matos/RecordsProcessor.java
+++ b/src/com/ibm/matos/RecordsProcessor.java
@@ -1,22 +1,20 @@
-/**
- * Copyright 2015 IBM
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-/**
- * Licensed Materials - Property of IBM
- * (c) Copyright IBM Corp. 2015
- */
+
 package com.ibm.matos;
 
 import java.util.List;
@@ -25,13 +23,13 @@
 import org.apache.kafka.common.TopicPartition;
 
 /**
- *
+ * RecordsProcessor
  */
 public interface RecordsProcessor {
 
-	void processRecords(List<ConsumerRecord<byte[], byte[]>> records, TopicPartition tp);
+    void processRecords(List<ConsumerRecord<byte[], byte[]>> records, TopicPartition tp);
 
-	long getFirst();
+    long getFirst();
 
-	long getLast();
+    long getLast();
 }
diff --git a/src/com/ibm/matos/Utils.java b/src/com/ibm/matos/Utils.java
index 2855dce..5657a66 100644
--- a/src/com/ibm/matos/Utils.java
+++ b/src/com/ibm/matos/Utils.java
@@ -1,22 +1,20 @@
-/**
- * Copyright 2015 IBM
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-/**
- * Licensed Materials - Property of IBM
- * (c) Copyright IBM Corp. 2015
- */
+
 package com.ibm.matos;
 
 import java.io.File;
@@ -39,123 +37,121 @@
 // Auxiliary class
 public class Utils {
 
-	static Logger logger = Logger.getLogger(Utils.class);
-	private static final String JAAS_CONFIG_PROPERTY = "java.security.auth.login.config";
+    static Logger logger = Logger.getLogger(Utils.class);
+    private static final String JAAS_CONFIG_PROPERTY = "java.security.auth.login.config";
 
-	private static String userDir = null;
-	static String resourceDir = null;
-	private static String resourcePathInJar = null;
+    private static String userDir = null;
+    static String resourceDir = null;
+    private static String resourcePathInJar = null;
 
-	static void extractResourcesToFilesystem(boolean isProducer) throws IOException {
-		String propfile = (isProducer ? "producer.properties" : "consumer.properties");
+    static void extractResourcesToFilesystem(boolean isProducer) throws IOException {
+        String propfile = (isProducer ? "producer.properties" : "consumer.properties");
 
-		Files.createDirectories(Paths.get(resourceDir));
+        Files.createDirectories(Paths.get(resourceDir));
 
-		for (String path : new String[] { "jaas.conf.template", propfile, "matos.json" }) {
-			String fullPath = resourcePathInJar + File.separator + path;
-			InputStream is = Utils.class.getClassLoader()
-					.getResourceAsStream(fullPath);
-			if (is == null) {
-				is = new FileInputStream(fullPath);
-			}
-			Path tPath = Paths.get(resourceDir + File.separator + path);
-			logger.log(Level.INFO,
-					"[Copying " + resourcePathInJar + File.separator + path + " from JAR to " + tPath + "]");
-			Files.copy(is, tPath, StandardCopyOption.REPLACE_EXISTING);
-		}
-	}
+        for (String path : new String[] { "jaas.conf.template", propfile, "matos.json" }) {
+            String fullPath = resourcePathInJar + File.separator + path;
+            InputStream is = Utils.class.getClassLoader()
+                    .getResourceAsStream(fullPath);
+            if (is == null) {
+                is = new FileInputStream(fullPath);
+            }
+            Path tPath = Paths.get(resourceDir + File.separator + path);
+            logger.log(Level.INFO,
+                    "[Copying " + resourcePathInJar + File.separator + path + " from JAR to " + tPath + "]");
+            Files.copy(is, tPath, StandardCopyOption.REPLACE_EXISTING);
+        }
+    }
 
-	/**
-	 * Retrieve client configuration information, using a properties file, for
-	 * connecting to secure Kafka.
-	 *
-	 * @param broker
-	 *            {String} A string representing a list of brokers the producer
-	 *            can contact.
-	 * @param isProducer
-	 *            {Boolean} Flag used to determine whether or not the
-	 *            configuration is for a producer.
-	 * @return {Properties} A properties object which stores the client
-	 *         configuration info.
-	 */
-	public static final Properties getClientConfiguration(String broker, boolean isProducer) {
-		Properties props = new Properties();
-		InputStream propsStream;
-		String fileName;
+    /**
+     * Retrieve client configuration information, using a properties file, for
+     * connecting to secure Kafka.
+     *
+     * @param broker
+     *            {String} A string representing a list of brokers the producer
+     *            can contact.
+     * @param isProducer
+     *            {Boolean} Flag used to determine whether or not the
+     *            configuration is for a producer.
+     * @return {Properties} A properties object which stores the client
+     *         configuration info.
+     */
+    public static final Properties getClientConfiguration(String broker, boolean isProducer) {
+        Properties props = new Properties();
+        InputStream propsStream;
+        String fileName;
 
-		if (isProducer) {
-			fileName = "producer.properties";
-		} else {
-			fileName = "consumer.properties";
-		}
+        if (isProducer) {
+            fileName = "producer.properties";
+        } else {
+            fileName = "consumer.properties";
+        }
 
-		try {
-			propsStream = new FileInputStream(resourceDir + File.separator + fileName);
-			props.load(propsStream);
-			propsStream.close();
-		} catch (IOException e) {
-			logger.log(Level.ERROR, "Could not load properties from file");
-			return props;
-		}
+        try {
+            propsStream = new FileInputStream(resourceDir + File.separator + fileName);
+            props.load(propsStream);
+            propsStream.close();
+        } catch (IOException e) {
+            logger.log(Level.ERROR, "Could not load properties from file");
+            return props;
+        }
 
-		props.put("bootstrap.servers", broker);
+        props.put("bootstrap.servers", broker);
 
-		// update truststore location property to the Java certificates folder
-		props.put("ssl.truststore.location", System.getProperty("java.home") + File.separator + "lib" + File.separator
-				+ "security" + File.separator + "cacerts");
+        // update truststore location property to the Java certificates folder
+        props.put("ssl.truststore.location", System.getProperty("java.home") + File.separator + "lib" + File.separator
+                + "security" + File.separator + "cacerts");
 
-		return props;
-	}
+        return props;
+    }
 
-	static void updateJaasConfiguration(String username, String password) {
-		String templatePath = resourceDir + File.separator + "jaas.conf.template";
-		String path = resourceDir + File.separator + "jaas.conf";
-		OutputStream jaasStream = null;
+    static void updateJaasConfiguration(String username, String password) {
+        String templatePath = resourceDir + File.separator + "jaas.conf.template";
+        String path = resourceDir + File.separator + "jaas.conf";
+        OutputStream jaasStream = null;
 
-		logger.log(Level.INFO, "Updating JAAS configuration");
+        logger.log(Level.INFO, "Updating JAAS configuration");
 
-		try {
-			Path tPath = Paths.get(templatePath);
-			String templateContents = new String(Files.readAllBytes(tPath));
-			jaasStream = new FileOutputStream(path, false);
+        try {
+            Path tPath = Paths.get(templatePath);
+            String templateContents = new String(Files.readAllBytes(tPath));
+            jaasStream = new FileOutputStream(path, false);
 
-			// Replace username and password in template and write
-			// to jaas.conf in resources directory.
-			String fileContents = templateContents.replace("$USERNAME", username).replace("$PASSWORD", password);
+            // Replace username and password in template and write
+            // to jaas.conf in resources directory.
+            String fileContents = templateContents.replace("$USERNAME", username).replace("$PASSWORD", password);
 
-			jaasStream.write(fileContents.getBytes(Charset.forName("UTF-8")));
-		} catch (final FileNotFoundException e) {
-			logger.log(Level.ERROR, "Could not load JAAS config file at: " + path);
-		} catch (final IOException e) {
-			logger.log(Level.ERROR, "Writing to JAAS config file:");
-			e.printStackTrace();
-		} finally {
-			if (jaasStream != null) {
-				try {
-					jaasStream.close();
-				} catch (final Exception e) {
-					logger.log(Level.ERROR, "Closing JAAS config file:");
-					e.printStackTrace();
-				}
-			}
-		}
-	}
+            jaasStream.write(fileContents.getBytes(Charset.forName("UTF-8")));
+        } catch (final FileNotFoundException e) {
+            logger.log(Level.ERROR, "Could not load JAAS config file at: " + path);
+        } catch (final IOException e) {
+            logger.log(Level.ERROR, "Writing to JAAS config file:");
+            e.printStackTrace();
+        } finally {
+            if (jaasStream != null) {
+                try {
+                    jaasStream.close();
+                } catch (final Exception e) {
+                    logger.log(Level.ERROR, "Closing JAAS config file:");
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
 
-	public static void initDirs() {
-		userDir = System.getProperty("user.dir");
-		resourceDir = userDir + File.separator + "resources";
-		resourcePathInJar = "resources";
+    public static void initDirs() {
+        userDir = System.getProperty("user.dir");
+        resourceDir = userDir + File.separator + "resources";
+        resourcePathInJar = "resources";
 
-		logger.log(Level.INFO, "Resource directory: " + resourceDir);
-	}
+        logger.log(Level.INFO, "Resource directory: " + resourceDir);
+    }
 
-	public static void setJaasLocation() {
-		// Set JAAS configuration property.
-		if (System.getProperty(JAAS_CONFIG_PROPERTY) == null) {
-			System.setProperty(JAAS_CONFIG_PROPERTY, resourceDir + File.separator + "jaas.conf");
-		}
-	}
-	
+    public static void setJaasLocation() {
+        // Set JAAS configuration property.
+        if (System.getProperty(JAAS_CONFIG_PROPERTY) == null) {
+            System.setProperty(JAAS_CONFIG_PROPERTY, resourceDir + File.separator + "jaas.conf");
+        }
+    }
 
-	
 }