Add kerby-event library and the event-network support back in the branch
diff --git a/kerby-kerb/kerb-client/pom.xml b/kerby-kerb/kerb-client/pom.xml
index 2dca9a4..5d01fe3 100644
--- a/kerby-kerb/kerb-client/pom.xml
+++ b/kerby-kerb/kerb-client/pom.xml
@@ -43,6 +43,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.kerby</groupId>
+      <artifactId>kerby-event</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kerby</groupId>
       <artifactId>kerb-util</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git a/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/KrbClient.java b/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/KrbClient.java
index adf89c3..7cbb3d0 100644
--- a/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/KrbClient.java
+++ b/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/KrbClient.java
@@ -22,6 +22,7 @@
 import org.apache.kerby.KOptions;
 import org.apache.kerby.kerberos.kerb.KrbException;
 import org.apache.kerby.kerberos.kerb.client.impl.DefaultInternalKrbClient;
+import org.apache.kerby.kerberos.kerb.client.impl.event.EventBasedKrbClient;
 import org.apache.kerby.kerberos.kerb.spec.base.AuthToken;
 import org.apache.kerby.kerberos.kerb.spec.ticket.ServiceTicket;
 import org.apache.kerby.kerberos.kerb.spec.ticket.TgtTicket;
@@ -119,11 +120,22 @@
     }
 
     /**
+     * Use event model. By default blocking model is used.
+     */
+    public void useEventModel() {
+        commonOptions.add(KrbOption.USE_EVENT_MODEL);
+    }
+
+    /**
      * Init the client.
      * @throws KrbException
      */
     public void init() throws KrbException {
-        innerClient = new DefaultInternalKrbClient();
+        if (commonOptions.contains(KrbOption.USE_EVENT_MODEL)) {
+            innerClient = new EventBasedKrbClient();
+        } else {
+            innerClient = new DefaultInternalKrbClient();
+        }
         innerClient.init(commonOptions);
     }
 
diff --git a/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/KrbOption.java b/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/KrbOption.java
index caea76d..f1afbfd 100644
--- a/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/KrbOption.java
+++ b/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/KrbOption.java
@@ -24,6 +24,7 @@
 
 public enum KrbOption implements KOption {
     NONE("NONE"),
+    USE_EVENT_MODEL("use event model", KOptionType.NOV),
     CLIENT_PRINCIPAL("client-principal", "Client principal", KOptionType.STR),
     KRB_CONFIG("krb config", KOptionType.OBJ),
     CONF_DIR("conf dir", KOptionType.DIR),
diff --git a/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/impl/event/EventBasedKrbClient.java b/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/impl/event/EventBasedKrbClient.java
new file mode 100644
index 0000000..525dfb6
--- /dev/null
+++ b/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/impl/event/EventBasedKrbClient.java
@@ -0,0 +1,118 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.kerberos.kerb.client.impl.event;
+
+import org.apache.kerby.KOptions;
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.event.EventWaiter;
+import org.apache.kerby.kerberos.kerb.KrbException;
+import org.apache.kerby.kerberos.kerb.client.impl.AbstractInternalKrbClient;
+import org.apache.kerby.kerberos.kerb.client.request.AsRequest;
+import org.apache.kerby.kerberos.kerb.client.request.TgsRequest;
+import org.apache.kerby.kerberos.kerb.common.KrbStreamingDecoder;
+import org.apache.kerby.kerberos.kerb.spec.ticket.ServiceTicket;
+import org.apache.kerby.kerberos.kerb.spec.ticket.TgtTicket;
+import org.apache.kerby.transport.Network;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.TransportEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An event based krb client implementation.
+ */
+public class EventBasedKrbClient extends AbstractInternalKrbClient {
+
+    private EventKrbHandler krbHandler;
+    private EventHub eventHub;
+    private EventWaiter eventWaiter;
+    private Transport transport;
+
+    @Override
+    public void init(KOptions commonOptions) throws KrbException {
+        super.init(commonOptions);
+
+        this.krbHandler = new EventKrbHandler();
+        krbHandler.init(getContext());
+
+        this.eventHub = new EventHub();
+        eventHub.register(krbHandler);
+
+        Network network = new Network();
+        network.setStreamingDecoder(new KrbStreamingDecoder());
+        eventHub.register(network);
+
+        eventWaiter = eventHub.waitEvent(
+                TransportEventType.NEW_TRANSPORT,
+                KrbClientEventType.TGT_RESULT,
+                KrbClientEventType.TKT_RESULT
+        );
+
+        eventHub.start();
+
+        network.tcpConnect(getSetting().getKdcHost(),
+                getSetting().getKdcTcpPort());
+        if (getSetting().allowUdp()) {
+            network.udpConnect(getSetting().getKdcHost(),
+                    getSetting().getKdcUdpPort());
+        }
+        final Event event = eventWaiter.waitEvent(TransportEventType.NEW_TRANSPORT);
+        transport = ((TransportEvent) event).getTransport();
+    }
+
+    @Override
+    protected TgtTicket doRequestTgtTicket(AsRequest tgtTktReq) throws KrbException {
+        tgtTktReq.setSessionData(transport);
+        transport.setAttachment(tgtTktReq);
+
+        eventHub.dispatch(KrbClientEvent.createTgtIntentEvent(tgtTktReq));
+        Event resultEvent;
+        try {
+            resultEvent = eventWaiter.waitEvent(KrbClientEventType.TGT_RESULT,
+                    getSetting().getTimeout(), TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            throw new KrbException("Network timeout", e);
+        }
+        AsRequest asResponse = (AsRequest) resultEvent.getEventData();
+
+        return asResponse.getTicket();
+    }
+
+    @Override
+    protected ServiceTicket doRequestServiceTicket(TgsRequest ticketReq) throws KrbException {
+        ticketReq.setSessionData(transport);
+        transport.setAttachment(ticketReq);
+
+        eventHub.dispatch(KrbClientEvent.createTktIntentEvent(ticketReq));
+        Event resultEvent;
+        try {
+            resultEvent = eventWaiter.waitEvent(KrbClientEventType.TKT_RESULT,
+                    getSetting().getTimeout(), TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            throw new KrbException("Network timeout", e);
+        }
+        TgsRequest tgsResponse = (TgsRequest) resultEvent.getEventData();
+
+        return tgsResponse.getServiceTicket();
+    }
+}
diff --git a/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/impl/event/EventKrbHandler.java b/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/impl/event/EventKrbHandler.java
new file mode 100644
index 0000000..e2a6dbd
--- /dev/null
+++ b/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/impl/event/EventKrbHandler.java
@@ -0,0 +1,87 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.kerberos.kerb.client.impl.event;
+
+import org.apache.kerby.event.AbstractEventHandler;
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.kerberos.kerb.client.KrbContext;
+import org.apache.kerby.kerberos.kerb.client.KrbHandler;
+import org.apache.kerby.kerberos.kerb.client.request.AsRequest;
+import org.apache.kerby.kerberos.kerb.client.request.KdcRequest;
+import org.apache.kerby.kerberos.kerb.client.request.TgsRequest;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class EventKrbHandler extends AbstractEventHandler {
+
+    private KrbHandler innerHandler;
+
+
+    public void init(KrbContext context) {
+        this.innerHandler = new KrbHandler() {
+            @Override
+            protected void sendMessage(KdcRequest kdcRequest,
+                                       ByteBuffer requestMessage) throws IOException {
+                Transport transport = (Transport) kdcRequest.getSessionData();
+                transport.sendMessage(requestMessage);
+            }
+        };
+        innerHandler.init(context);
+    }
+
+    @Override
+    public EventType[] getInterestedEvents() {
+        return new EventType[] {
+                TransportEventType.INBOUND_MESSAGE,
+                KrbClientEventType.TGT_INTENT,
+                KrbClientEventType.TKT_INTENT
+        };
+    }
+
+    @Override
+    protected void doHandle(Event event) throws Exception {
+        EventType eventType = event.getEventType();
+
+        if (eventType == KrbClientEventType.TGT_INTENT ||
+                eventType == KrbClientEventType.TKT_INTENT) {
+            KdcRequest kdcRequest = (KdcRequest) event.getEventData();
+            innerHandler.handleRequest(kdcRequest);
+        } else if (event.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+            handleMessage((MessageEvent) event);
+        }
+    }
+
+    protected void handleMessage(MessageEvent event) throws Exception {
+        ByteBuffer receivedMessage = event.getMessage();
+
+        KdcRequest kdcRequest = (KdcRequest) event.getTransport().getAttachment();
+        innerHandler.onResponseMessage(kdcRequest, receivedMessage);
+        if (AsRequest.class.isAssignableFrom(kdcRequest.getClass())) {
+            dispatch(KrbClientEvent.createTgtResultEvent((AsRequest) kdcRequest));
+        } else if (TgsRequest.class.isAssignableFrom(kdcRequest.getClass())) {
+            dispatch(KrbClientEvent.createTktResultEvent((TgsRequest) kdcRequest));
+        }
+    }
+}
diff --git a/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/impl/event/KrbClientEvent.java b/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/impl/event/KrbClientEvent.java
new file mode 100644
index 0000000..574e2d0
--- /dev/null
+++ b/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/impl/event/KrbClientEvent.java
@@ -0,0 +1,43 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.kerberos.kerb.client.impl.event;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.kerberos.kerb.client.request.AsRequest;
+import org.apache.kerby.kerberos.kerb.client.request.TgsRequest;
+
+public class KrbClientEvent {
+
+    public static Event createTgtIntentEvent(AsRequest asRequest) {
+        return new Event(KrbClientEventType.TGT_INTENT, asRequest);
+    }
+
+    public static Event createTktIntentEvent(TgsRequest tgsRequest) {
+        return new Event(KrbClientEventType.TKT_INTENT, tgsRequest);
+    }
+
+    public static Event createTgtResultEvent(AsRequest asRequest) {
+        return new Event(KrbClientEventType.TGT_RESULT, asRequest);
+    }
+
+    public static Event createTktResultEvent(TgsRequest tgsRequest) {
+        return new Event(KrbClientEventType.TKT_RESULT, tgsRequest);
+    }
+}
diff --git a/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/impl/event/KrbClientEventType.java b/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/impl/event/KrbClientEventType.java
new file mode 100644
index 0000000..8f0e981
--- /dev/null
+++ b/kerby-kerb/kerb-client/src/main/java/org/apache/kerby/kerberos/kerb/client/impl/event/KrbClientEventType.java
@@ -0,0 +1,29 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.kerberos.kerb.client.impl.event;
+
+import org.apache.kerby.event.EventType;
+
+public enum KrbClientEventType implements EventType {
+    TGT_INTENT,
+    TGT_RESULT,
+    TKT_INTENT,
+    TKT_RESULT
+}
diff --git a/kerby-kerb/kerb-common/pom.xml b/kerby-kerb/kerb-common/pom.xml
index c745928..d21d320 100644
--- a/kerby-kerb/kerb-common/pom.xml
+++ b/kerby-kerb/kerb-common/pom.xml
@@ -33,6 +33,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.kerby</groupId>
+      <artifactId>kerby-event</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kerby</groupId>
       <artifactId>kerb-core</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git a/kerby-kerb/kerb-common/src/main/java/org/apache/kerby/kerberos/kerb/common/KrbStreamingDecoder.java b/kerby-kerb/kerb-common/src/main/java/org/apache/kerby/kerberos/kerb/common/KrbStreamingDecoder.java
new file mode 100644
index 0000000..a647bf0
--- /dev/null
+++ b/kerby-kerb/kerb-common/src/main/java/org/apache/kerby/kerberos/kerb/common/KrbStreamingDecoder.java
@@ -0,0 +1,42 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.kerberos.kerb.common;
+
+import org.apache.kerby.transport.tcp.DecodingCallback;
+import org.apache.kerby.transport.tcp.StreamingDecoder;
+
+import java.nio.ByteBuffer;
+
+public class KrbStreamingDecoder implements StreamingDecoder {
+
+    @Override
+    public void decode(ByteBuffer streamingBuffer, DecodingCallback callback) {
+        if (streamingBuffer.remaining() >= 4) {
+            int len = streamingBuffer.getInt();
+            if (streamingBuffer.remaining() >= len) {
+                callback.onMessageComplete(len + 4, 4); //Remove the prefix length
+            } else {
+                callback.onMoreDataNeeded(len + 4);
+            }
+        } else {
+            callback.onMoreDataNeeded();
+        }
+    }
+}
diff --git a/kerby-kerb/kerb-kdc-test/src/main/java/org/apache/kerby/kerberos/kerb/server/KdcTestBase.java b/kerby-kerb/kerb-kdc-test/src/main/java/org/apache/kerby/kerberos/kerb/server/KdcTestBase.java
index 0543568..f750d31 100644
--- a/kerby-kerb/kerb-kdc-test/src/main/java/org/apache/kerby/kerberos/kerb/server/KdcTestBase.java
+++ b/kerby-kerb/kerb-kdc-test/src/main/java/org/apache/kerby/kerberos/kerb/server/KdcTestBase.java
@@ -152,7 +152,9 @@
      * @throws Exception
      */
     protected void prepareKrbClient() throws Exception {
-
+        if (useEventModelClient()) {
+            krbClnt.useEventModel();
+        }
     }
 
     /**
@@ -170,6 +172,18 @@
         if (udpPort > 0) {
             kdcServer.setKdcUdpPort(udpPort);
         }
+
+        if (useEventModelKdc()) {
+            kdcServer.useEventModel();
+        }
+    }
+
+    protected boolean useEventModelKdc() {
+        return false;
+    }
+
+    protected boolean useEventModelClient() {
+        return false;
     }
 
     protected void setUpKdcServer() throws Exception {
diff --git a/kerby-kerb/kerb-kdc-test/src/test/java/org/apache/kerby/kerberos/kerb/server/KdcTestUsingEventClient.java b/kerby-kerb/kerb-kdc-test/src/test/java/org/apache/kerby/kerberos/kerb/server/KdcTestUsingEventClient.java
new file mode 100644
index 0000000..5e906de
--- /dev/null
+++ b/kerby-kerb/kerb-kdc-test/src/test/java/org/apache/kerby/kerberos/kerb/server/KdcTestUsingEventClient.java
@@ -0,0 +1,35 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.kerberos.kerb.server;
+
+import org.junit.Test;
+
+public class KdcTestUsingEventClient extends KdcTest {
+
+    @Override
+    protected boolean useEventModelClient() {
+        return false;
+    }
+
+    @Test
+    public void testKdc() throws Exception {
+        performKdcTest();
+    }
+}
\ No newline at end of file
diff --git a/kerby-kerb/kerb-kdc-test/src/test/java/org/apache/kerby/kerberos/kerb/server/KdcTestUsingEventKdc.java b/kerby-kerb/kerb-kdc-test/src/test/java/org/apache/kerby/kerberos/kerb/server/KdcTestUsingEventKdc.java
new file mode 100644
index 0000000..99a1aad
--- /dev/null
+++ b/kerby-kerb/kerb-kdc-test/src/test/java/org/apache/kerby/kerberos/kerb/server/KdcTestUsingEventKdc.java
@@ -0,0 +1,35 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.kerberos.kerb.server;
+
+import org.junit.Test;
+
+public class KdcTestUsingEventKdc extends KdcTest {
+
+    @Override
+    protected boolean useEventModelKdc() {
+        return false;
+    }
+
+    @Test
+    public void testKdc() throws Exception {
+        performKdcTest();
+    }
+}
\ No newline at end of file
diff --git a/kerby-kerb/kerb-server/pom.xml b/kerby-kerb/kerb-server/pom.xml
index b0ab417..138d123 100644
--- a/kerby-kerb/kerb-server/pom.xml
+++ b/kerby-kerb/kerb-server/pom.xml
@@ -47,6 +47,11 @@
       <artifactId>kerb-identity</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kerby</groupId>
+      <artifactId>kerby-event</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
   <profiles>
diff --git a/kerby-kerb/kerb-server/src/main/java/org/apache/kerby/kerberos/kerb/server/KdcServer.java b/kerby-kerb/kerb-server/src/main/java/org/apache/kerby/kerberos/kerb/server/KdcServer.java
index a2f6c46..7cbd6eb 100644
--- a/kerby-kerb/kerb-server/src/main/java/org/apache/kerby/kerberos/kerb/server/KdcServer.java
+++ b/kerby-kerb/kerb-server/src/main/java/org/apache/kerby/kerberos/kerb/server/KdcServer.java
@@ -22,6 +22,7 @@
 import org.apache.kerby.KOptions;
 import org.apache.kerby.kerberos.kerb.identity.IdentityService;
 import org.apache.kerby.kerberos.kerb.server.impl.DefaultInternalKdcServerImpl;
+import org.apache.kerby.kerberos.kerb.server.impl.event.EventBasedKdcServer;
 
 import java.io.File;
 
@@ -113,6 +114,13 @@
     }
 
     /**
+     * Use event model. By default blocking model is used.
+     */
+    public void useEventModel() {
+        commonOptions.add(KdcServerOption.USE_EVENT_MODEL);
+    }
+
+    /**
      * Set runtime folder.
      * @param workDir
      */
@@ -165,6 +173,8 @@
         if (commonOptions.contains(KdcServerOption.INNER_KDC_IMPL)) {
             innerKdc = (InternalKdcServer) commonOptions.getOptionValue(
                     KdcServerOption.INNER_KDC_IMPL);
+        } else if (commonOptions.contains(KdcServerOption.USE_EVENT_MODEL)) {
+            innerKdc = new EventBasedKdcServer();
         } else {
             innerKdc = new DefaultInternalKdcServerImpl();
         }
diff --git a/kerby-kerb/kerb-server/src/main/java/org/apache/kerby/kerberos/kerb/server/KdcServerOption.java b/kerby-kerb/kerb-server/src/main/java/org/apache/kerby/kerberos/kerb/server/KdcServerOption.java
index a6bad7d..61017d2 100644
--- a/kerby-kerb/kerb-server/src/main/java/org/apache/kerby/kerberos/kerb/server/KdcServerOption.java
+++ b/kerby-kerb/kerb-server/src/main/java/org/apache/kerby/kerberos/kerb/server/KdcServerOption.java
@@ -27,6 +27,7 @@
  */
 public enum KdcServerOption implements KOption {
     NONE("NONE"),
+    USE_EVENT_MODEL("use event model", KOptionType.NOV),
     INNER_KDC_IMPL("inner KDC impl", KOptionType.OBJ),
     KDC_CONFIG("kdc config", KOptionType.OBJ),
     BACKEND_CONFIG("backend config", KOptionType.OBJ),
diff --git a/kerby-kerb/kerb-server/src/main/java/org/apache/kerby/kerberos/kerb/server/impl/event/EventBasedKdcServer.java b/kerby-kerb/kerb-server/src/main/java/org/apache/kerby/kerberos/kerb/server/impl/event/EventBasedKdcServer.java
new file mode 100644
index 0000000..ec076cf
--- /dev/null
+++ b/kerby-kerb/kerb-server/src/main/java/org/apache/kerby/kerberos/kerb/server/impl/event/EventBasedKdcServer.java
@@ -0,0 +1,76 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *
+ */
+package org.apache.kerby.kerberos.kerb.server.impl.event;
+
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.kerberos.kerb.common.KrbStreamingDecoder;
+import org.apache.kerby.kerberos.kerb.server.KdcContext;
+import org.apache.kerby.kerberos.kerb.server.impl.AbstractInternalKdcServer;
+import org.apache.kerby.kerberos.kerb.server.preauth.PreauthHandler;
+import org.apache.kerby.transport.Network;
+
+/**
+ * Event based KDC server.
+ */
+public class EventBasedKdcServer extends AbstractInternalKdcServer {
+
+    private EventKdcHandler kdcHandler;
+    private EventHub eventHub;
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        prepareHandler();
+
+        this.eventHub = new EventHub();
+
+        eventHub.register(kdcHandler);
+
+        Network network = new Network();
+        network.setStreamingDecoder(new KrbStreamingDecoder());
+        eventHub.register(network);
+
+        eventHub.start();
+        network.tcpListen(getSetting().getKdcHost(),
+                getSetting().getKdcTcpPort());
+        if (getSetting().allowUdp()) {
+            network.udpListen(getSetting().getKdcHost(),
+                    getSetting().getKdcUdpPort());
+        }
+    }
+
+    private void prepareHandler() {
+        KdcContext kdcContext = new KdcContext(getSetting());
+        kdcContext.setIdentityService(getBackend());
+        PreauthHandler preauthHandler = new PreauthHandler();
+        preauthHandler.init();
+        kdcContext.setPreauthHandler(preauthHandler);
+
+        this.kdcHandler = new EventKdcHandler(kdcContext);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        eventHub.stop();
+    }
+}
diff --git a/kerby-kerb/kerb-server/src/main/java/org/apache/kerby/kerberos/kerb/server/impl/event/EventKdcHandler.java b/kerby-kerb/kerb-server/src/main/java/org/apache/kerby/kerberos/kerb/server/impl/event/EventKdcHandler.java
new file mode 100644
index 0000000..5c6ab7b
--- /dev/null
+++ b/kerby-kerb/kerb-server/src/main/java/org/apache/kerby/kerberos/kerb/server/impl/event/EventKdcHandler.java
@@ -0,0 +1,61 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.kerberos.kerb.server.impl.event;
+
+import org.apache.kerby.kerberos.kerb.server.KdcHandler;
+import org.apache.kerby.kerberos.kerb.server.KdcContext;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.tcp.TcpTransport;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+
+/**
+ * KDC handler to process client requests. Currently only one realm is supported.
+ */
+public class EventKdcHandler extends MessageHandler {
+
+    private final KdcHandler myKdcHandler;
+
+    public EventKdcHandler(KdcContext kdcContext) {
+        this.myKdcHandler = new KdcHandler(kdcContext);
+    }
+
+    @Override
+    protected void handleMessage(MessageEvent event) throws Exception {
+        ByteBuffer message = event.getMessage();
+        Transport transport = event.getTransport();
+
+        InetSocketAddress clientAddress = transport.getRemoteAddress();
+        boolean isTcp = transport instanceof TcpTransport;
+
+        try {
+            ByteBuffer krbResponse = myKdcHandler.handleMessage(message, isTcp,
+                    clientAddress.getAddress());
+            transport.sendMessage(krbResponse);
+        } catch (Exception e) {
+            //TODO: log the error
+            System.out.println("Error occured while processing request:"
+                    + e.getMessage());
+        }
+    }
+}
diff --git a/lib/kerby-event/README b/lib/kerby-event/README
new file mode 100644
index 0000000..cb3b88a
--- /dev/null
+++ b/lib/kerby-event/README
@@ -0,0 +1 @@
+An event driven application framework with mixed (TCP, UDP) x (connector, acceptor) supported.
\ No newline at end of file
diff --git a/lib/kerby-event/pom.xml b/lib/kerby-event/pom.xml
new file mode 100644
index 0000000..af1e11a
--- /dev/null
+++ b/lib/kerby-event/pom.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>lib</artifactId>
+    <groupId>org.apache.kerby</groupId>
+    <version>1.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>kerby-event</artifactId>
+  <name>Kerby Event</name>
+  <description>Kerby Event and Transport facilities for both client and server</description>
+
+</project>
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/event/AbstractEventHandler.java b/lib/kerby-event/src/main/java/org/apache/kerby/event/AbstractEventHandler.java
new file mode 100644
index 0000000..59a0a82
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/event/AbstractEventHandler.java
@@ -0,0 +1,55 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event;
+
+public abstract class AbstractEventHandler implements EventHandler {
+
+    private Dispatcher dispatcher;
+
+    public AbstractEventHandler() {
+
+    }
+
+    protected void dispatch(Event event) {
+        dispatcher.dispatch(event);
+    }
+
+    @Override
+    public Dispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    @Override
+    public void setDispatcher(Dispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    @Override
+    public void handle(Event event) {
+        try {
+            doHandle(event);
+        } catch (Exception e) {
+            throw new RuntimeException(event.toString(), e);
+        }
+    }
+
+    protected abstract void doHandle(Event event) throws Exception;
+}
+
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/event/AbstractInternalEventHandler.java b/lib/kerby-event/src/main/java/org/apache/kerby/event/AbstractInternalEventHandler.java
new file mode 100644
index 0000000..bfed126
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/event/AbstractInternalEventHandler.java
@@ -0,0 +1,66 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class AbstractInternalEventHandler extends AbstractEventHandler
+        implements InternalEventHandler {
+
+    private int id = -1;
+    protected EventHandler handler;
+
+    private static AtomicInteger idGen = new AtomicInteger(1);
+
+    public AbstractInternalEventHandler() {
+        super();
+
+        this.id = idGen.getAndIncrement();
+
+        init();
+    }
+
+    public AbstractInternalEventHandler(EventHandler handler) {
+        this();
+
+        this.handler = handler;
+    }
+
+    protected void setEventHandler(EventHandler handler) {
+        this.handler = handler;
+    }
+
+    @Override
+    public int id() {
+        return id;
+    }
+
+    public abstract void init();
+
+    protected void process(Event event) {
+        handler.handle(event);
+    }
+
+    @Override
+    public EventType[] getInterestedEvents() {
+        return handler.getInterestedEvents();
+    }
+}
+
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/event/BufferedEventHandler.java b/lib/kerby-event/src/main/java/org/apache/kerby/event/BufferedEventHandler.java
new file mode 100644
index 0000000..d3c1401
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/event/BufferedEventHandler.java
@@ -0,0 +1,49 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * An EventHandler wrapper buffering events and processing them later
+ */
+public abstract class BufferedEventHandler extends AbstractInternalEventHandler {
+
+    protected BlockingQueue<Event> eventQueue;
+
+    public BufferedEventHandler(EventHandler handler) {
+        super(handler);
+    }
+
+    public BufferedEventHandler() {
+        super();
+    }
+
+    @Override
+    public void init() {
+        this.eventQueue = new ArrayBlockingQueue<Event>(2);
+    }
+
+    @Override
+    protected void doHandle(Event event) throws Exception {
+        eventQueue.put(event);
+    }
+}
\ No newline at end of file
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/event/Dispatcher.java b/lib/kerby-event/src/main/java/org/apache/kerby/event/Dispatcher.java
new file mode 100644
index 0000000..f730745
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/event/Dispatcher.java
@@ -0,0 +1,29 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event;
+
+public interface Dispatcher {
+
+    void dispatch(Event event);
+
+    void register(EventHandler handler);
+
+    void register(InternalEventHandler internalHandler);
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/event/Event.java b/lib/kerby-event/src/main/java/org/apache/kerby/event/Event.java
new file mode 100644
index 0000000..332ee0d
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/event/Event.java
@@ -0,0 +1,43 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event;
+
+public class Event {
+
+    private EventType eventType;
+    private Object eventData;
+
+    public Event(EventType eventType) {
+        this.eventType = eventType;
+    }
+
+    public Event(EventType eventType, Object eventData) {
+        this.eventType = eventType;
+        this.eventData = eventData;
+    }
+
+    public EventType getEventType() {
+        return eventType;
+    }
+
+    public Object getEventData() {
+        return eventData;
+    }
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/event/EventHandler.java b/lib/kerby-event/src/main/java/org/apache/kerby/event/EventHandler.java
new file mode 100644
index 0000000..27c4a44
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/event/EventHandler.java
@@ -0,0 +1,31 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event;
+
+public interface EventHandler {
+
+    void handle(Event event);
+
+    EventType[] getInterestedEvents();
+
+    Dispatcher getDispatcher();
+
+    void setDispatcher(Dispatcher dispatcher);
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/event/EventHub.java b/lib/kerby-event/src/main/java/org/apache/kerby/event/EventHub.java
new file mode 100644
index 0000000..9f10c88
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/event/EventHub.java
@@ -0,0 +1,193 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class EventHub implements Dispatcher {
+
+    private enum BuiltInEventType implements EventType {
+        STOP,
+        ALL
+    }
+
+    private boolean started = false;
+
+    private Map<Integer, InternalEventHandler> handlers =
+            new ConcurrentHashMap<Integer, InternalEventHandler>();
+
+    private Map<EventType, Set<Integer>> eventHandlersMap =
+        new ConcurrentHashMap<EventType, Set<Integer>>();
+
+    private InternalEventHandler builtInHandler;
+
+    class BuiltInEventHandler extends AbstractEventHandler {
+        public BuiltInEventHandler() {
+            super();
+        }
+
+        @Override
+        protected void doHandle(Event event) {
+
+        }
+
+        @Override
+        public EventType[] getInterestedEvents() {
+            return BuiltInEventType.values();
+        }
+    }
+
+    public EventHub() {
+        init();
+    }
+
+    private void init() {
+        EventHandler eh = new BuiltInEventHandler();
+        builtInHandler = new ExecutedEventHandler(eh);
+        register(builtInHandler);
+    }
+
+    @Override
+    public void dispatch(Event event) {
+        process(event);
+    }
+
+    @Override
+    public void register(EventHandler handler) {
+        handler.setDispatcher(this);
+        InternalEventHandler ieh = new ExecutedEventHandler(handler);
+        register(ieh);
+    }
+
+    @Override
+    public void register(InternalEventHandler handler) {
+        handler.setDispatcher(this);
+        handler.init();
+        handlers.put(handler.id(), handler);
+
+        if (started) {
+            handler.start();
+        }
+
+        EventType[] interestedEvents = handler.getInterestedEvents();
+        Set<Integer> tmpHandlers;
+        for (EventType eventType : interestedEvents) {
+            if (eventHandlersMap.containsKey(eventType)) {
+                tmpHandlers = eventHandlersMap.get(eventType);
+            } else {
+                tmpHandlers = new HashSet<Integer>();
+                eventHandlersMap.put(eventType, tmpHandlers);
+            }
+            tmpHandlers.add(handler.id());
+        }
+    }
+
+    public EventWaiter waitEvent(final EventType event) {
+        return waitEvent(new EventType[] { event } );
+    }
+
+    public EventWaiter waitEvent(final EventType... events) {
+        EventHandler handler = new AbstractEventHandler() {
+            @Override
+            protected void doHandle(Event event) throws Exception {
+                // no op;
+            }
+
+            @Override
+            public EventType[] getInterestedEvents() {
+                return events;
+            }
+        };
+
+        handler.setDispatcher(this);
+        final WaitEventHandler waitEventHandler = new WaitEventHandler(handler);
+        register(waitEventHandler);
+        EventWaiter waiter = new EventWaiter() {
+            @Override
+            public Event waitEvent(EventType event) {
+                return waitEventHandler.waitEvent(event);
+            }
+
+            @Override
+            public Event waitEvent() {
+                return waitEventHandler.waitEvent();
+            }
+
+            @Override
+            public Event waitEvent(EventType event, long timeout,
+                                   TimeUnit timeUnit) throws TimeoutException {
+                return waitEventHandler.waitEvent(event, timeout, timeUnit);
+            }
+
+            @Override
+            public Event waitEvent(long timeout,
+                                   TimeUnit timeUnit) throws TimeoutException {
+                return waitEventHandler.waitEvent(timeout, timeUnit);
+            }
+        };
+
+        return waiter;
+    }
+
+    private void process(Event event) {
+        EventType eventType = event.getEventType();
+        InternalEventHandler handler;
+        Set<Integer> handlerIds;
+
+        if (eventHandlersMap.containsKey(eventType)) {
+            handlerIds = eventHandlersMap.get(eventType);
+            for (Integer hid : handlerIds) {
+                handler = handlers.get(hid);
+                handler.handle(event);
+            }
+        }
+
+        if (eventHandlersMap.containsKey(BuiltInEventType.ALL)) {
+            handlerIds = eventHandlersMap.get(BuiltInEventType.ALL);
+            for (Integer hid : handlerIds) {
+                handler = handlers.get(hid);
+                handler.handle(event);
+            }
+        }
+    }
+
+    public void start() {
+        if (!started) {
+            for (InternalEventHandler handler : handlers.values()) {
+                handler.start();
+            }
+            started = true;
+        }
+    }
+
+    public void stop() {
+        if (started) {
+            for (InternalEventHandler handler : handlers.values()) {
+                handler.stop();
+            }
+            started = false;
+        }
+    }
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/event/EventType.java b/lib/kerby-event/src/main/java/org/apache/kerby/event/EventType.java
new file mode 100644
index 0000000..6a4a453
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/event/EventType.java
@@ -0,0 +1,24 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event;
+
+public interface EventType {
+    // no op
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/event/EventWaiter.java b/lib/kerby-event/src/main/java/org/apache/kerby/event/EventWaiter.java
new file mode 100644
index 0000000..dacc5eb
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/event/EventWaiter.java
@@ -0,0 +1,37 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public interface EventWaiter {
+
+    Event waitEvent(EventType event);
+
+    Event waitEvent();
+
+    Event waitEvent(EventType event, long timeout,
+                                    TimeUnit timeUnit) throws TimeoutException;
+
+    Event waitEvent(long timeout,
+                                    TimeUnit timeUnit) throws TimeoutException;
+
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/event/ExecutedEventHandler.java b/lib/kerby-event/src/main/java/org/apache/kerby/event/ExecutedEventHandler.java
new file mode 100644
index 0000000..d094711
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/event/ExecutedEventHandler.java
@@ -0,0 +1,76 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * An EventHandler wrapper processing events using an ExecutorService
+ */
+public class ExecutedEventHandler extends AbstractInternalEventHandler {
+
+    private ExecutorService executorService;
+
+    public ExecutedEventHandler(EventHandler handler) {
+        super(handler);
+    }
+
+    @Override
+    protected void doHandle(final Event event) throws Exception {
+        if (executorService.isTerminated()) {
+            return;
+        }
+
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    process(event);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void start() {
+        executorService = Executors.newFixedThreadPool(2);
+    }
+
+    @Override
+    public void stop() {
+        if (executorService.isShutdown()) {
+            return;
+        }
+        executorService.shutdownNow();
+    }
+
+    @Override
+    public boolean isStopped() {
+        return executorService.isShutdown();
+    }
+
+    @Override
+    public void init() {
+
+    }
+}
\ No newline at end of file
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/event/InternalEventHandler.java b/lib/kerby-event/src/main/java/org/apache/kerby/event/InternalEventHandler.java
new file mode 100644
index 0000000..e7fafad
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/event/InternalEventHandler.java
@@ -0,0 +1,34 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event;
+
+public interface InternalEventHandler extends EventHandler {
+
+    int id();
+
+    void init();
+
+    void start();
+
+    void stop();
+
+    boolean isStopped();
+}
+
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/event/LongRunningEventHandler.java b/lib/kerby-event/src/main/java/org/apache/kerby/event/LongRunningEventHandler.java
new file mode 100644
index 0000000..10c1f0b
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/event/LongRunningEventHandler.java
@@ -0,0 +1,77 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public abstract class LongRunningEventHandler extends BufferedEventHandler {
+
+    private ExecutorService executorService;
+
+    public LongRunningEventHandler(EventHandler handler) {
+        super(handler);
+    }
+
+    public LongRunningEventHandler() {
+        super();
+    }
+
+    protected abstract void loopOnce();
+
+    @Override
+    public void start() {
+        executorService = Executors.newFixedThreadPool(1);
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                while (true) {
+
+                    processEvents();
+
+                    loopOnce();
+                }
+            }
+        });
+    }
+
+    @Override
+    public void stop() {
+        if (executorService.isShutdown()) {
+            return;
+        }
+        executorService.shutdownNow();
+    }
+
+    @Override
+    public boolean isStopped() {
+        return executorService.isShutdown();
+    }
+
+    protected void processEvents() {
+        while (! eventQueue.isEmpty()) {
+            try {
+                process(eventQueue.take());
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/event/WaitEventHandler.java b/lib/kerby-event/src/main/java/org/apache/kerby/event/WaitEventHandler.java
new file mode 100644
index 0000000..da9c561
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/event/WaitEventHandler.java
@@ -0,0 +1,126 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event;
+
+import java.util.concurrent.*;
+
+public class WaitEventHandler extends BufferedEventHandler {
+
+    private ExecutorService executorService;
+
+    public WaitEventHandler(EventHandler handler) {
+        super(handler);
+    }
+
+    public Event waitEvent() {
+        return waitEvent(null);
+    }
+
+    public Event waitEvent(final EventType eventType) {
+        Future<Event> future = doWaitEvent(eventType);
+
+        try {
+            return future.get();
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Event waitEvent(final EventType eventType,
+                           long timeout, TimeUnit timeUnit) throws TimeoutException {
+        Future<Event> future = doWaitEvent(eventType);
+
+        try {
+            return future.get(timeout, timeUnit);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Event waitEvent(long timeout, TimeUnit timeUnit) throws TimeoutException {
+        Future<Event> future = doWaitEvent(null);
+
+        try {
+            return future.get(timeout, timeUnit);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Future<Event> doWaitEvent(final EventType eventType) {
+        Future<Event> future = executorService.submit(new Callable<Event>() {
+            @Override
+            public Event call() throws Exception {
+                if (eventType != null) {
+                    return checkEvent(eventType);
+                } else {
+                    return checkEvent();
+                }
+            }
+        });
+
+        return future;
+    }
+
+    private Event checkEvent() throws Exception {
+        return eventQueue.take();
+    }
+
+    private Event checkEvent(EventType eventType) throws Exception {
+        while (true) {
+            if (eventQueue.size() == 1) {
+                if (eventQueue.peek().getEventType() == eventType) {
+                    return eventQueue.take();
+                }
+            } else {
+                Event event = eventQueue.take();
+                if (event.getEventType() == eventType) {
+                    return event;
+                } else {
+                    eventQueue.put(event); // put back since not wanted
+                }
+            }
+        }
+    }
+
+    @Override
+    public void start() {
+        executorService = Executors.newFixedThreadPool(1);
+    }
+
+    @Override
+    public void stop() {
+        if (executorService.isShutdown()) {
+            return;
+        }
+        executorService.shutdown();
+    }
+
+    @Override
+    public boolean isStopped() {
+        return executorService.isShutdown();
+    }
+}
\ No newline at end of file
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/Acceptor.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/Acceptor.java
new file mode 100644
index 0000000..efa827a
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/Acceptor.java
@@ -0,0 +1,36 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport;
+
+import java.net.InetSocketAddress;
+
+public abstract class Acceptor extends TransportSelector {
+
+    public Acceptor(TransportHandler transportHandler) {
+        super(transportHandler);
+    }
+
+    public void listen(String address, int listenPort) {
+        InetSocketAddress socketAddress = new InetSocketAddress(address, listenPort);
+        doListen(socketAddress);
+    }
+
+    protected abstract void doListen(InetSocketAddress socketAddress);
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/Connector.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/Connector.java
new file mode 100644
index 0000000..720f481
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/Connector.java
@@ -0,0 +1,36 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport;
+
+import java.net.InetSocketAddress;
+
+public abstract class Connector extends TransportSelector {
+
+    public Connector(TransportHandler transportHandler) {
+        super(transportHandler);
+    }
+
+    public void connect(String serverAddress, int serverPort) {
+        InetSocketAddress sa = new InetSocketAddress(serverAddress, serverPort);
+        doConnect(sa);
+    }
+
+    protected abstract void doConnect(InetSocketAddress sa);
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/MessageHandler.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/MessageHandler.java
new file mode 100644
index 0000000..d6ad01e
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/MessageHandler.java
@@ -0,0 +1,42 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport;
+
+import org.apache.kerby.event.AbstractEventHandler;
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+
+public abstract class MessageHandler extends AbstractEventHandler {
+
+    @Override
+    protected void doHandle(Event event) throws Exception {
+        handleMessage((MessageEvent) event);
+    }
+
+    protected abstract void handleMessage(MessageEvent event) throws Exception;
+
+    @Override
+    public EventType[] getInterestedEvents() {
+        return new EventType[] { TransportEventType.INBOUND_MESSAGE };
+    }
+
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/Network.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/Network.java
new file mode 100644
index 0000000..9ade687
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/Network.java
@@ -0,0 +1,297 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport;
+
+import org.apache.kerby.event.AbstractEventHandler;
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.event.LongRunningEventHandler;
+import org.apache.kerby.transport.event.AddressEvent;
+import org.apache.kerby.transport.event.TransportEvent;
+import org.apache.kerby.transport.tcp.*;
+import org.apache.kerby.transport.udp.UdpAddressEvent;
+import org.apache.kerby.transport.udp.UdpEventType;
+import org.apache.kerby.transport.udp.UdpTransport;
+import org.apache.kerby.transport.udp.UdpTransportHandler;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.*;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * A combined and mixed network facility handling UDP and TCP in both connect and accept sides
+ */
+public class Network extends LongRunningEventHandler {
+
+    private Selector selector;
+    private StreamingDecoder streamingDecoder;
+    private UdpTransportHandler udpTransportHandler;
+    private TcpTransportHandler tcpTransportHandler;
+
+    class MyEventHandler extends AbstractEventHandler {
+        @Override
+        protected void doHandle(Event event) throws Exception {
+            if (event.getEventType() == UdpEventType.ADDRESS_CONNECT) {
+                doUdpConnect((AddressEvent) event);
+            } else if (event.getEventType() ==  UdpEventType.ADDRESS_BIND) {
+                doUdpBind((AddressEvent) event);
+            } else if (event.getEventType() ==  TcpEventType.ADDRESS_CONNECT) {
+                doTcpConnect((AddressEvent) event);
+            } else if (event.getEventType() ==  TcpEventType.ADDRESS_BIND) {
+                doTcpBind((AddressEvent) event);
+            }
+        }
+
+        @Override
+        public EventType[] getInterestedEvents() {
+            return new EventType[]{
+                    UdpEventType.ADDRESS_CONNECT,
+                    UdpEventType.ADDRESS_BIND,
+                    TcpEventType.ADDRESS_CONNECT,
+                    TcpEventType.ADDRESS_BIND
+            };
+        }
+    }
+
+    public Network() {
+        setEventHandler(new MyEventHandler());
+    }
+
+    @Override
+    public void init() {
+        super.init();
+
+        try {
+            selector = Selector.open();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * TCP transport only, for decoding tcp streaming into messages
+     * @param streamingDecoder
+     */
+    public void setStreamingDecoder(StreamingDecoder streamingDecoder) {
+        this.streamingDecoder = streamingDecoder;
+    }
+
+    /**
+     * TCP only. Connect on the given server address. Can be called multiple times
+     * for multiple servers
+     * @param serverAddress
+     * @param serverPort
+     */
+    public void tcpConnect(String serverAddress, int serverPort) {
+        InetSocketAddress sa = new InetSocketAddress(serverAddress, serverPort);
+        checkTcpTransportHandler();
+        doTcpConnect(sa);
+    }
+
+    /**
+     * UDP only. Connect on the given server address. Can be called multiple times
+     * for multiple servers
+     * @param serverAddress
+     * @param serverPort
+     */
+    public void udpConnect(String serverAddress, int serverPort) {
+        InetSocketAddress sa = new InetSocketAddress(serverAddress, serverPort);
+        checkUdpTransportHandler();
+        doUdpConnect(sa);
+    }
+
+    /**
+     * TCP only. Listen and accept connections on the address. Can be called multiple
+     * times for multiple server addresses.
+     * @param serverAddress
+     * @param serverPort
+     */
+    public void tcpListen(String serverAddress, int serverPort) {
+        InetSocketAddress sa = new InetSocketAddress(serverAddress, serverPort);
+        checkTcpTransportHandler();
+        doTcpListen(sa);
+    }
+
+    /**
+     * UDP only. Listen and accept connections on the address. Can be called multiple
+     * times for multiple server addresses.
+     * @param serverAddress
+     * @param serverPort
+     */
+    public void udpListen(String serverAddress, int serverPort) {
+        InetSocketAddress sa = new InetSocketAddress(serverAddress, serverPort);
+        checkUdpTransportHandler();
+        doUdpListen(sa);
+    }
+
+    @Override
+    protected void loopOnce() {
+        try {
+            selectOnce();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected void selectOnce() throws IOException {
+        if (selector.isOpen() && selector.select(2) > 0 && selector.isOpen()) {
+            Set<SelectionKey> selectionKeys = selector.selectedKeys();
+            Iterator<SelectionKey> iterator = selectionKeys.iterator();
+            while (iterator.hasNext()) {
+                SelectionKey selectionKey = iterator.next();
+                dealKey(selectionKey);
+                iterator.remove();
+            }
+            selectionKeys.clear();
+        }
+    }
+
+    private void checkTcpTransportHandler() {
+        if (tcpTransportHandler == null) {
+            if (streamingDecoder == null) {
+                throw new IllegalArgumentException("No streaming decoder set yet");
+            }
+            tcpTransportHandler = new TcpTransportHandler(streamingDecoder);
+            getDispatcher().register(tcpTransportHandler);
+        }
+    }
+
+    private void checkUdpTransportHandler() {
+        if (udpTransportHandler == null) {
+            udpTransportHandler = new UdpTransportHandler();
+            getDispatcher().register(udpTransportHandler);
+        }
+    }
+
+    private void dealKey(SelectionKey selectionKey) throws IOException {
+        if (selectionKey.isConnectable()) {
+            doTcpConnect(selectionKey);
+        } else if (selectionKey.isAcceptable()) {
+            doTcpAccept(selectionKey);
+        } else {
+            helpHandleSelectionKey(selectionKey);
+        }
+    }
+
+    private void helpHandleSelectionKey(SelectionKey selectionKey) throws IOException {
+        SelectableChannel channel = selectionKey.channel();
+        if (channel instanceof DatagramChannel) {
+            udpTransportHandler.helpHandleSelectionKey(selectionKey);
+        } else {
+            tcpTransportHandler.helpHandleSelectionKey(selectionKey);
+        }
+    }
+
+    private void doUdpConnect(InetSocketAddress sa) {
+        AddressEvent event = UdpAddressEvent.createAddressConnectEvent(sa);
+        dispatch(event);
+    }
+
+    private void doUdpConnect(AddressEvent event) throws IOException {
+        InetSocketAddress address = event.getAddress();
+        DatagramChannel channel = DatagramChannel.open();
+        channel.configureBlocking(false);
+        channel.connect(address);
+
+        channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+
+        UdpTransport transport = new UdpTransport(channel, address);
+        onNewTransport(transport);
+    }
+
+    protected void doUdpListen(InetSocketAddress socketAddress) {
+        AddressEvent event = UdpAddressEvent.createAddressBindEvent(socketAddress);
+        dispatch(event);
+    }
+
+    private void doUdpBind(AddressEvent event) throws IOException {
+        DatagramChannel serverChannel = DatagramChannel.open();
+        serverChannel.configureBlocking(false);
+        serverChannel.bind(event.getAddress());
+        serverChannel.register(selector, SelectionKey.OP_READ);
+    }
+
+    protected void doTcpConnect(InetSocketAddress sa) {
+        AddressEvent event = TcpAddressEvent.createAddressConnectEvent(sa);
+        dispatch(event);
+    }
+
+    private void doTcpConnect(AddressEvent event) throws IOException {
+        SocketChannel channel = SocketChannel.open();
+        channel.configureBlocking(false);
+        channel.connect(event.getAddress());
+        channel.register(selector,
+                SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+    }
+
+    private void doTcpConnect(SelectionKey key) throws IOException {
+        SocketChannel channel = (SocketChannel) key.channel();
+        if (channel.isConnectionPending()) {
+            channel.finishConnect();
+        }
+
+        Transport transport = new TcpTransport(channel, tcpTransportHandler.getStreamingDecoder());
+        channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, transport);
+        onNewTransport(transport);
+    }
+
+    protected void doTcpListen(InetSocketAddress socketAddress) {
+        AddressEvent event = TcpAddressEvent.createAddressBindEvent(socketAddress);
+        dispatch(event);
+    }
+
+    protected void doTcpAccept(SelectionKey key) throws IOException {
+        ServerSocketChannel server = (ServerSocketChannel) key.channel();
+        SocketChannel channel;
+
+        try {
+            while ((channel = server.accept()) != null) {
+                channel.configureBlocking(false);
+                channel.socket().setTcpNoDelay(true);
+                channel.socket().setKeepAlive(true);
+
+                Transport transport = new TcpTransport(channel,
+                    tcpTransportHandler.getStreamingDecoder());
+
+                channel.register(selector,
+                    SelectionKey.OP_READ | SelectionKey.OP_WRITE, transport);
+                onNewTransport(transport);
+            }
+        } catch (ClosedByInterruptException e) { //NOPMD
+            // No op as normal
+        }
+    }
+
+    protected void doTcpBind(AddressEvent event) throws IOException {
+        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.configureBlocking(false);
+        ServerSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(event.getAddress());
+        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);
+    }
+
+    private void onNewTransport(Transport transport) {
+        transport.setDispatcher(getDispatcher());
+        dispatch(TransportEvent.createNewTransportEvent(transport));
+    }
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/Transport.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/Transport.java
new file mode 100644
index 0000000..bdf2ff7
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/Transport.java
@@ -0,0 +1,84 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport;
+
+import org.apache.kerby.event.Dispatcher;
+import org.apache.kerby.transport.buffer.TransBuffer;
+import org.apache.kerby.transport.event.TransportEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+
+public abstract class Transport {
+    private InetSocketAddress remoteAddress;
+    protected Dispatcher dispatcher;
+    private Object attachment;
+
+    protected TransBuffer sendBuffer;
+
+    protected int readableCount = 0;
+    protected int writableCount = 0;
+
+    public Transport(InetSocketAddress remoteAddress) {
+        this.remoteAddress = remoteAddress;
+        this.sendBuffer = new TransBuffer();
+    }
+
+    public void setDispatcher(Dispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public InetSocketAddress getRemoteAddress() {
+        return remoteAddress;
+    }
+
+    public void sendMessage(ByteBuffer message) {
+        if (message != null) {
+            sendBuffer.write(message);
+            dispatcher.dispatch(TransportEvent.createWritableTransportEvent(this));
+        }
+    }
+
+    public void onWriteable() throws IOException {
+        this.writableCount ++;
+
+        if (! sendBuffer.isEmpty()) {
+            ByteBuffer message = sendBuffer.read();
+            if (message != null) {
+                sendOutMessage(message);
+            }
+        }
+    }
+
+    public void onReadable() throws IOException {
+        this.readableCount++;
+    }
+
+    protected abstract void sendOutMessage(ByteBuffer message) throws IOException;
+
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    public Object getAttachment() {
+        return attachment;
+    }
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/TransportHandler.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/TransportHandler.java
new file mode 100644
index 0000000..e745e38
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/TransportHandler.java
@@ -0,0 +1,34 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport;
+
+import org.apache.kerby.event.AbstractEventHandler;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+
+/**
+ * Handling readable and writable events
+ */
+public abstract class TransportHandler extends AbstractEventHandler {
+
+    public abstract void helpHandleSelectionKey(SelectionKey selectionKey) throws IOException;
+
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/TransportSelector.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/TransportSelector.java
new file mode 100644
index 0000000..a4016a0
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/TransportSelector.java
@@ -0,0 +1,100 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport;
+
+import org.apache.kerby.event.Dispatcher;
+import org.apache.kerby.event.LongRunningEventHandler;
+import org.apache.kerby.transport.event.TransportEvent;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+
+public abstract class TransportSelector extends LongRunningEventHandler {
+
+    protected Selector selector;
+    protected TransportHandler transportHandler;
+
+    public TransportSelector(TransportHandler transportHandler) {
+        super();
+        this.transportHandler = transportHandler;
+    }
+
+    @Override
+    public void setDispatcher(Dispatcher dispatcher) {
+        super.setDispatcher(dispatcher);
+        dispatcher.register(transportHandler);
+    }
+
+    @Override
+    public void init() {
+        super.init();
+
+        try {
+            selector = Selector.open();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected void loopOnce() {
+        try {
+            selectOnce();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected void selectOnce() throws IOException {
+        if (selector.isOpen() && selector.select(10) > 0 && selector.isOpen()) {
+            Set<SelectionKey> selectionKeys = selector.selectedKeys();
+            Iterator<SelectionKey> iterator = selectionKeys.iterator();
+            while (iterator.hasNext()) {
+                SelectionKey selectionKey = iterator.next();
+                dealKey(selectionKey);
+                iterator.remove();
+            }
+            selectionKeys.clear();
+        }
+    }
+
+    protected void dealKey(SelectionKey selectionKey) throws IOException {
+        transportHandler.helpHandleSelectionKey(selectionKey);
+    }
+
+    protected void onNewTransport(Transport transport) {
+        transport.setDispatcher(getDispatcher());
+        dispatch(TransportEvent.createNewTransportEvent(transport));
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+
+        try {
+            selector.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/buffer/BufferPool.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/buffer/BufferPool.java
new file mode 100644
index 0000000..7737c13
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/buffer/BufferPool.java
@@ -0,0 +1,33 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.buffer;
+
+import java.nio.ByteBuffer;
+
+public class BufferPool {
+
+    public static ByteBuffer allocate(int len) {
+        return ByteBuffer.allocate(len);
+    }
+
+    public static void release(ByteBuffer buffer) {
+
+    }
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/buffer/BufferUtil.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/buffer/BufferUtil.java
new file mode 100644
index 0000000..f67ab09
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/buffer/BufferUtil.java
@@ -0,0 +1,42 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.buffer;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+public class BufferUtil {
+
+    /**
+     * Read len bytes from src buffer
+     */
+    public static ByteBuffer read(ByteBuffer src, int len) {
+        if (len > src.remaining())
+            throw new BufferOverflowException();
+
+        ByteBuffer result = ByteBuffer.allocate(len);
+        int n = src.remaining();
+        for (int i = 0; i < n; i++) {
+            result.put(src.get());
+        }
+
+        return result;
+    }
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/buffer/RecvBuffer.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/buffer/RecvBuffer.java
new file mode 100644
index 0000000..2c190a3
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/buffer/RecvBuffer.java
@@ -0,0 +1,155 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.buffer;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+public class RecvBuffer {
+
+    private LinkedList<ByteBuffer> bufferQueue;
+
+    public RecvBuffer() {
+        bufferQueue = new LinkedList<ByteBuffer>();
+    }
+
+    public synchronized void write(ByteBuffer buffer) {
+        bufferQueue.addLast(buffer);
+    }
+
+    /**
+     * Put buffer as the first into the buffer queue
+     */
+    public synchronized void writeFirst(ByteBuffer buffer) {
+        bufferQueue.addFirst(buffer);
+    }
+
+    /**
+     * Read and return the first buffer if available
+     */
+    public synchronized ByteBuffer readFirst() {
+        if (! bufferQueue.isEmpty()) {
+            return bufferQueue.removeFirst();
+        }
+        return null;
+    }
+
+    /**
+     * Read most available bytes into the dst buffer
+     */
+    public synchronized ByteBuffer readMostBytes() {
+        int len = remaining();
+        return readBytes(len);
+    }
+
+    /**
+     * Read len bytes into the dst buffer if available
+     */
+    public synchronized ByteBuffer readBytes(int len) {
+        if (remaining() < len) { // no enough data that's available
+            throw new BufferOverflowException();
+        }
+
+        ByteBuffer result = null;
+
+        ByteBuffer takenBuffer;
+        if (bufferQueue.size() == 1) {
+            takenBuffer = bufferQueue.removeFirst();
+
+            if (takenBuffer.remaining() == len) {
+                return takenBuffer;
+            }
+
+            result = BufferPool.allocate(len);
+            for (int i = 0; i < len; i++) {
+                result.put(takenBuffer.get());
+            }
+            // Has left bytes so put it back for future reading
+            if (takenBuffer.remaining() > 0) {
+                bufferQueue.addFirst(takenBuffer);
+            }
+        } else {
+            result = BufferPool.allocate(len);
+
+            Iterator<ByteBuffer> iter = bufferQueue.iterator();
+            int alreadyGot = 0, toGet;
+            while (iter.hasNext()) {
+                takenBuffer = iter.next();
+                iter.remove();
+
+                toGet = takenBuffer.remaining() < len - alreadyGot ?
+                    takenBuffer.remaining() : len -alreadyGot;
+                byte[] toGetBytes = new byte[toGet];
+                takenBuffer.get(toGetBytes);
+                result.put(toGetBytes);
+
+                if (takenBuffer.remaining() > 0) {
+                    bufferQueue.addFirst(takenBuffer);
+                }
+
+                alreadyGot += toGet;
+                if (alreadyGot == len) {
+                    break;
+                }
+            }
+        }
+        result.flip();
+
+        return result;
+    }
+
+    public boolean isEmpty() {
+        return bufferQueue.isEmpty();
+    }
+
+    /**
+     * Return count of remaining and left bytes that's available
+     */
+    public int remaining() {
+        if (bufferQueue.isEmpty()) {
+            return 0;
+        } else if (bufferQueue.size() == 1) {
+            return bufferQueue.getFirst().remaining();
+        }
+
+        int result = 0;
+        Iterator<ByteBuffer> iter = bufferQueue.iterator();
+        while (iter.hasNext()) {
+            result += iter.next().remaining();
+        }
+        return result;
+    }
+
+    public synchronized void clear() {
+        if (bufferQueue.isEmpty()) {
+            return;
+        } else if (bufferQueue.size() == 1) {
+            BufferPool.release(bufferQueue.getFirst());
+        }
+
+        Iterator<ByteBuffer> iter = bufferQueue.iterator();
+        while (iter.hasNext()) {
+            BufferPool.release(iter.next());
+        }
+        bufferQueue.clear();
+    }
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/buffer/TransBuffer.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/buffer/TransBuffer.java
new file mode 100644
index 0000000..079a6cd
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/buffer/TransBuffer.java
@@ -0,0 +1,49 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.buffer;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+public class TransBuffer {
+
+    private BlockingQueue<ByteBuffer> bufferQueue;
+
+    public TransBuffer() {
+        bufferQueue = new ArrayBlockingQueue<ByteBuffer>(2);
+    }
+
+    public void write(ByteBuffer buffer) {
+        bufferQueue.add(buffer);
+    }
+
+    public void write(byte[] buffer) {
+        write(ByteBuffer.wrap(buffer));
+    }
+
+    public ByteBuffer read() {
+        return bufferQueue.poll();
+    }
+
+    public boolean isEmpty() {
+        return bufferQueue.isEmpty();
+    }
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/AddressEvent.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/AddressEvent.java
new file mode 100644
index 0000000..80340c8
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/AddressEvent.java
@@ -0,0 +1,39 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.event;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+
+import java.net.InetSocketAddress;
+
+public class AddressEvent extends Event {
+
+    private InetSocketAddress address;
+
+    public AddressEvent(InetSocketAddress address, EventType eventType) {
+        super(eventType);
+        this.address = address;
+    }
+
+    public InetSocketAddress getAddress() {
+        return address;
+    }
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/MessageEvent.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/MessageEvent.java
new file mode 100644
index 0000000..b9d48eb
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/MessageEvent.java
@@ -0,0 +1,41 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.event;
+
+import org.apache.kerby.transport.Transport;
+
+import java.nio.ByteBuffer;
+
+public class MessageEvent extends TransportEvent {
+
+    private MessageEvent(Transport transport, ByteBuffer message) {
+        super(transport, TransportEventType.INBOUND_MESSAGE, message);
+    }
+
+    public ByteBuffer getMessage() {
+        return (ByteBuffer) getEventData();
+    }
+
+    public static MessageEvent createInboundMessageEvent(
+            Transport transport, ByteBuffer message) {
+        return new MessageEvent(transport, message);
+    }
+
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/TransportEvent.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/TransportEvent.java
new file mode 100644
index 0000000..3c2ae34
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/TransportEvent.java
@@ -0,0 +1,56 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.event;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.Transport;
+
+public class TransportEvent extends Event {
+
+    private Transport transport;
+
+    public TransportEvent(Transport transport, EventType eventType) {
+        super(eventType);
+        this.transport = transport;
+    }
+
+    public TransportEvent(Transport transport, EventType eventType, Object eventData) {
+        super(eventType, eventData);
+        this.transport = transport;
+    }
+
+    public Transport getTransport() {
+        return transport;
+    }
+
+    public static TransportEvent createWritableTransportEvent(Transport transport) {
+        return new TransportEvent(transport, TransportEventType.TRANSPORT_WRITABLE);
+    }
+
+    public static TransportEvent createReadableTransportEvent(Transport transport) {
+        return new TransportEvent(transport, TransportEventType.TRANSPORT_READABLE);
+    }
+
+    public static TransportEvent createNewTransportEvent(Transport transport) {
+        return new TransportEvent(transport, TransportEventType.NEW_TRANSPORT);
+    }
+
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/TransportEventType.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/TransportEventType.java
new file mode 100644
index 0000000..6036c02
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/event/TransportEventType.java
@@ -0,0 +1,29 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.event;
+
+import org.apache.kerby.event.EventType;
+
+public enum TransportEventType implements EventType {
+    NEW_TRANSPORT,
+    TRANSPORT_WRITABLE,
+    TRANSPORT_READABLE,
+    INBOUND_MESSAGE
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/DecodingCallback.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/DecodingCallback.java
new file mode 100644
index 0000000..561f78f
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/DecodingCallback.java
@@ -0,0 +1,38 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.tcp;
+
+public interface DecodingCallback {
+
+    /**
+     * OK, enough data is ready, a message can be out
+     */
+    void onMessageComplete(int messageLength, int adjustOffset);
+
+    /**
+     * Need more data to be available
+     */
+    void onMoreDataNeeded();
+
+    /**
+     * Need more data to be available, with determined more data length given
+     */
+    void onMoreDataNeeded(int needDataLength);
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/StreamingDecoder.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/StreamingDecoder.java
new file mode 100644
index 0000000..953b34a
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/StreamingDecoder.java
@@ -0,0 +1,26 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.tcp;
+
+import java.nio.ByteBuffer;
+
+public interface StreamingDecoder {
+    void decode(ByteBuffer streamingBuffer, DecodingCallback callback);
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpAcceptor.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpAcceptor.java
new file mode 100644
index 0000000..752f5d7
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpAcceptor.java
@@ -0,0 +1,112 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.tcp;
+
+import org.apache.kerby.event.AbstractEventHandler;
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.Acceptor;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.AddressEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+public class TcpAcceptor extends Acceptor {
+
+    public TcpAcceptor(StreamingDecoder streamingDecoder) {
+        this(new TcpTransportHandler(streamingDecoder));
+    }
+
+    public TcpAcceptor(TcpTransportHandler transportHandler) {
+        super(transportHandler);
+
+        setEventHandler(new AbstractEventHandler() {
+            @Override
+            protected void doHandle(Event event) throws Exception {
+                if (event.getEventType() == TcpEventType.ADDRESS_BIND) {
+                    try {
+                        doBind((AddressEvent) event);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+
+            @Override
+            public EventType[] getInterestedEvents() {
+                return new EventType[] {
+                        TcpEventType.ADDRESS_BIND
+                };
+            }
+        });
+    }
+
+    @Override
+    protected void doListen(InetSocketAddress socketAddress) {
+        AddressEvent event = TcpAddressEvent.createAddressBindEvent(socketAddress);
+        dispatch(event);
+    }
+
+    @Override
+    protected void dealKey(SelectionKey selectionKey) throws IOException {
+        if (selectionKey.isAcceptable()) {
+            doAccept(selectionKey);
+        } else {
+            super.dealKey(selectionKey);
+        }
+    }
+
+    void doAccept(SelectionKey key) throws IOException {
+        ServerSocketChannel server = (ServerSocketChannel) key.channel();
+        SocketChannel channel;
+
+        try {
+            while ((channel = server.accept()) != null) {
+                channel.configureBlocking(false);
+                channel.socket().setTcpNoDelay(true);
+                channel.socket().setKeepAlive(true);
+
+                Transport transport = new TcpTransport(channel,
+                    ((TcpTransportHandler) transportHandler).getStreamingDecoder());
+
+                channel.register(selector,
+                    SelectionKey.OP_READ | SelectionKey.OP_WRITE, transport);
+                onNewTransport(transport);
+            }
+        } catch (ClosedByInterruptException e) { //NOPMD
+            // No op as normal
+        }
+    }
+
+    protected void doBind(AddressEvent event) throws IOException {
+        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.configureBlocking(false);
+        ServerSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(event.getAddress());
+        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);
+    }
+
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpAddressEvent.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpAddressEvent.java
new file mode 100644
index 0000000..500d224
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpAddressEvent.java
@@ -0,0 +1,36 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.tcp;
+
+import org.apache.kerby.transport.event.AddressEvent;
+
+import java.net.InetSocketAddress;
+
+public class TcpAddressEvent {
+
+    public static AddressEvent createAddressBindEvent(InetSocketAddress address) {
+        return new AddressEvent(address, TcpEventType.ADDRESS_BIND);
+    }
+
+    public static AddressEvent createAddressConnectEvent(InetSocketAddress address) {
+        return new AddressEvent(address, TcpEventType.ADDRESS_CONNECT);
+    }
+
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpConnector.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpConnector.java
new file mode 100644
index 0000000..e460961
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpConnector.java
@@ -0,0 +1,94 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.tcp;
+
+import org.apache.kerby.event.AbstractEventHandler;
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.Connector;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.AddressEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+public class TcpConnector extends Connector {
+
+    public TcpConnector(StreamingDecoder streamingDecoder) {
+        this(new TcpTransportHandler(streamingDecoder));
+    }
+
+    public TcpConnector(TcpTransportHandler transportHandler) {
+        super(transportHandler);
+
+        setEventHandler(new AbstractEventHandler() {
+            @Override
+            protected void doHandle(Event event) throws Exception {
+                if (event.getEventType() ==  TcpEventType.ADDRESS_CONNECT) {
+                    doConnect((AddressEvent) event);
+                }
+            }
+
+            @Override
+            public EventType[] getInterestedEvents() {
+                return new EventType[] {
+                        TcpEventType.ADDRESS_CONNECT
+                };
+            }
+        });
+    }
+
+    @Override
+    protected void doConnect(InetSocketAddress sa) {
+        AddressEvent event = TcpAddressEvent.createAddressConnectEvent(sa);
+        dispatch(event);
+    }
+
+    private void doConnect(AddressEvent event) throws IOException {
+        SocketChannel channel = SocketChannel.open();
+        channel.configureBlocking(false);
+        channel.connect(event.getAddress());
+        channel.register(selector,
+                SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+    }
+
+    @Override
+    protected void dealKey(SelectionKey selectionKey) throws IOException {
+        if (selectionKey.isConnectable()) {
+            doConnect(selectionKey);
+        } else {
+            super.dealKey(selectionKey);
+        }
+    }
+
+    private void doConnect(SelectionKey key) throws IOException {
+        SocketChannel channel = (SocketChannel) key.channel();
+        if (channel.isConnectionPending()) {
+            channel.finishConnect();
+        }
+
+        Transport transport = new TcpTransport(channel,
+                ((TcpTransportHandler) transportHandler).getStreamingDecoder());
+        channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, transport);
+        onNewTransport(transport);
+    }
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpEventType.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpEventType.java
new file mode 100644
index 0000000..e754fa5
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpEventType.java
@@ -0,0 +1,27 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.tcp;
+
+import org.apache.kerby.event.EventType;
+
+public enum TcpEventType implements EventType {
+    ADDRESS_BIND,
+    ADDRESS_CONNECT
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpTransport.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpTransport.java
new file mode 100644
index 0000000..57705ca
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpTransport.java
@@ -0,0 +1,116 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.tcp;
+
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.buffer.BufferPool;
+import org.apache.kerby.transport.buffer.RecvBuffer;
+import org.apache.kerby.transport.event.MessageEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+public class TcpTransport extends Transport {
+
+    private SocketChannel channel;
+
+    private StreamingDecoder streamingDecoder;
+
+    private RecvBuffer recvBuffer;
+
+    public TcpTransport(SocketChannel channel,
+                        StreamingDecoder streamingDecoder) throws IOException {
+        super((InetSocketAddress) channel.getRemoteAddress());
+        this.channel = channel;
+        this.streamingDecoder = streamingDecoder;
+
+        this.recvBuffer = new RecvBuffer();
+    }
+
+    @Override
+    protected void sendOutMessage(ByteBuffer message) throws IOException {
+        channel.write(message);
+    }
+
+    public void onReadable() throws IOException {
+        ByteBuffer writeBuffer = BufferPool.allocate(65536);
+        if (channel.read(writeBuffer) <= 0) {
+            BufferPool.release(writeBuffer);
+            return;
+        }
+
+        writeBuffer.flip();
+        recvBuffer.write(writeBuffer);
+
+        WithReadDataHander rdHandler = new WithReadDataHander();
+        rdHandler.handle();
+    }
+
+    class WithReadDataHander implements DecodingCallback {
+        private ByteBuffer streamingBuffer;
+
+        @Override
+        public void onMessageComplete(int messageLength, int adjustOffset) {
+            ByteBuffer message = null;
+
+            int remaining = streamingBuffer.remaining();
+            if (remaining == messageLength) {
+                message = streamingBuffer;
+            } else if (remaining > messageLength) {
+                message = streamingBuffer.duplicate();
+                int newLimit = streamingBuffer.position() + messageLength;
+                message.limit(newLimit);
+
+                streamingBuffer.position(newLimit);
+                recvBuffer.writeFirst(streamingBuffer);
+            }
+
+            if (message != null) {
+                if (adjustOffset > 0) {
+                    message.position(message.position() + adjustOffset);
+                }
+                dispatcher.dispatch(MessageEvent.createInboundMessageEvent(
+                        TcpTransport.this, message));
+            }
+        }
+
+        @Override
+        public void onMoreDataNeeded() {
+            recvBuffer.writeFirst(streamingBuffer);
+        }
+
+        @Override
+        public void onMoreDataNeeded(int needDataLength) {
+            recvBuffer.writeFirst(streamingBuffer);
+        }
+
+        public void handle() {
+            if (recvBuffer.isEmpty()) {
+                return;
+            }
+
+            streamingBuffer = recvBuffer.readMostBytes();
+
+            streamingDecoder.decode(streamingBuffer.duplicate(), this);
+        }
+    }
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpTransportHandler.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpTransportHandler.java
new file mode 100644
index 0000000..ad010ed
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/tcp/TcpTransportHandler.java
@@ -0,0 +1,77 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.tcp;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.apache.kerby.transport.TransportHandler;
+import org.apache.kerby.transport.event.TransportEvent;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+
+public class TcpTransportHandler extends TransportHandler {
+
+    private StreamingDecoder streamingDecoder;
+
+    public TcpTransportHandler(StreamingDecoder streamingDecoder) {
+        this.streamingDecoder = streamingDecoder;
+    }
+
+    public StreamingDecoder getStreamingDecoder() {
+        return streamingDecoder;
+    }
+
+    @Override
+    public EventType[] getInterestedEvents() {
+        return new TransportEventType[] {
+                TransportEventType.TRANSPORT_READABLE,
+                TransportEventType.TRANSPORT_WRITABLE
+        };
+    }
+
+    @Override
+    protected void doHandle(Event event) throws Exception {
+        EventType eventType = event.getEventType();
+        TransportEvent te = (TransportEvent) event;
+        Transport transport = te.getTransport();
+        if (eventType == TransportEventType.TRANSPORT_READABLE) {
+            transport.onReadable();
+        } else if (eventType == TransportEventType.TRANSPORT_WRITABLE) {
+            transport.onWriteable();
+        }
+    }
+
+    @Override
+    public void helpHandleSelectionKey(SelectionKey selectionKey) throws IOException {
+        if (selectionKey.isReadable()) {
+            selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+            TcpTransport transport = (TcpTransport) selectionKey.attachment();
+            dispatch(TransportEvent.createReadableTransportEvent(transport));
+        } else if (selectionKey.isWritable()) {
+            selectionKey.interestOps(SelectionKey.OP_READ);
+            TcpTransport transport = (TcpTransport) selectionKey.attachment();
+            dispatch(TransportEvent.createWritableTransportEvent(transport));
+        }
+    }
+}
+
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpAcceptor.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpAcceptor.java
new file mode 100644
index 0000000..974f871
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpAcceptor.java
@@ -0,0 +1,84 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.udp;
+
+import org.apache.kerby.event.AbstractEventHandler;
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.Acceptor;
+import org.apache.kerby.transport.event.AddressEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+public class UdpAcceptor extends Acceptor {
+
+    private DatagramChannel serverChannel;
+
+    public UdpAcceptor() {
+        this(new UdpTransportHandler());
+    }
+
+    public UdpAcceptor(UdpTransportHandler udpTransportHandler) {
+        super(udpTransportHandler);
+
+        setEventHandler(new AbstractEventHandler() {
+            @Override
+            protected void doHandle(Event event) throws Exception {
+                if (event.getEventType() ==  UdpEventType.ADDRESS_BIND) {
+                    doBind((AddressEvent) event);
+                }
+            }
+
+            @Override
+            public EventType[] getInterestedEvents() {
+                return new EventType[] {
+                        UdpEventType.ADDRESS_BIND
+                };
+            }
+        });
+    }
+
+    @Override
+    protected void doListen(InetSocketAddress socketAddress) {
+        AddressEvent event = UdpAddressEvent.createAddressBindEvent(socketAddress);
+        dispatch(event);
+    }
+
+    private void doBind(AddressEvent event) throws IOException {
+        serverChannel = DatagramChannel.open();
+        serverChannel.configureBlocking(false);
+        serverChannel.bind(event.getAddress());
+        serverChannel.register(selector, SelectionKey.OP_READ);
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+
+        try {
+            serverChannel.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpAddressEvent.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpAddressEvent.java
new file mode 100644
index 0000000..b29100e
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpAddressEvent.java
@@ -0,0 +1,36 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.udp;
+
+import org.apache.kerby.transport.event.AddressEvent;
+
+import java.net.InetSocketAddress;
+
+public class UdpAddressEvent {
+
+    public static AddressEvent createAddressBindEvent(InetSocketAddress address) {
+        return new AddressEvent(address, UdpEventType.ADDRESS_BIND);
+    }
+
+    public static AddressEvent createAddressConnectEvent(InetSocketAddress address) {
+        return new AddressEvent(address, UdpEventType.ADDRESS_CONNECT);
+    }
+
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpChannelEvent.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpChannelEvent.java
new file mode 100644
index 0000000..e5dd7b2
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpChannelEvent.java
@@ -0,0 +1,47 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.udp;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+
+import java.nio.channels.DatagramChannel;
+
+public class UdpChannelEvent extends Event {
+
+    private DatagramChannel channel;
+
+    private UdpChannelEvent(DatagramChannel channel, EventType eventType) {
+        super(eventType);
+        this.channel = channel;
+    }
+
+    public DatagramChannel getChannel() {
+        return channel;
+    }
+
+    public static UdpChannelEvent makeWritableChannelEvent(DatagramChannel channel) {
+        return new UdpChannelEvent(channel, UdpEventType.CHANNEL_WRITABLE);
+    }
+
+    public static UdpChannelEvent makeReadableChannelEvent(DatagramChannel channel) {
+        return new UdpChannelEvent(channel, UdpEventType.CHANNEL_READABLE);
+    }
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpConnector.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpConnector.java
new file mode 100644
index 0000000..9234a8c
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpConnector.java
@@ -0,0 +1,76 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.udp;
+
+import org.apache.kerby.event.AbstractEventHandler;
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.Connector;
+import org.apache.kerby.transport.event.AddressEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+public class UdpConnector extends Connector {
+
+    public UdpConnector() {
+        this(new UdpTransportHandler());
+    }
+
+    public UdpConnector(UdpTransportHandler transportHandler) {
+        super(transportHandler);
+
+        setEventHandler(new AbstractEventHandler() {
+            @Override
+            protected void doHandle(Event event) throws Exception {
+                if (event.getEventType() == UdpEventType.ADDRESS_CONNECT) {
+                    doConnect((AddressEvent) event);
+                }
+            }
+
+            @Override
+            public EventType[] getInterestedEvents() {
+                return new EventType[] {
+                        UdpEventType.ADDRESS_CONNECT
+                };
+            }
+        });
+    }
+
+    @Override
+    protected void doConnect(InetSocketAddress sa) {
+        AddressEvent event = UdpAddressEvent.createAddressConnectEvent(sa);
+        dispatch(event);
+    }
+
+    private void doConnect(AddressEvent event) throws IOException {
+        InetSocketAddress address = event.getAddress();
+        DatagramChannel channel = DatagramChannel.open();
+        channel.configureBlocking(false);
+        channel.connect(address);
+
+        channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+
+        UdpTransport transport = new UdpTransport(channel, address);
+        onNewTransport(transport);
+    }
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpEventType.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpEventType.java
new file mode 100644
index 0000000..d291f75
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpEventType.java
@@ -0,0 +1,29 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.udp;
+
+import org.apache.kerby.event.EventType;
+
+public enum UdpEventType implements EventType {
+    ADDRESS_BIND,
+    ADDRESS_CONNECT,
+    CHANNEL_WRITABLE,
+    CHANNEL_READABLE
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpTransport.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpTransport.java
new file mode 100644
index 0000000..fe11a64
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpTransport.java
@@ -0,0 +1,65 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.udp;
+
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.buffer.TransBuffer;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEvent;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+public class UdpTransport extends Transport {
+    private DatagramChannel channel;
+
+    protected TransBuffer recvBuffer;
+
+    public UdpTransport(DatagramChannel channel,
+                        InetSocketAddress remoteAddress) {
+        super(remoteAddress);
+        this.channel = channel;
+        this.recvBuffer = new TransBuffer();
+    }
+
+    protected void onRecvData(ByteBuffer data) {
+        if (data != null) {
+            recvBuffer.write(data);
+            dispatcher.dispatch(TransportEvent.createReadableTransportEvent(this));
+        }
+    }
+
+    @Override
+    public void onReadable() throws IOException {
+        super.onReadable();
+
+        if (! recvBuffer.isEmpty()) {
+            ByteBuffer message = recvBuffer.read();
+            dispatcher.dispatch(MessageEvent.createInboundMessageEvent(this, message));
+        }
+    }
+
+    @Override
+    protected void sendOutMessage(ByteBuffer message) throws IOException {
+        channel.send(message, getRemoteAddress());
+    }
+}
diff --git a/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpTransportHandler.java b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpTransportHandler.java
new file mode 100644
index 0000000..fc02bf5
--- /dev/null
+++ b/lib/kerby-event/src/main/java/org/apache/kerby/transport/udp/UdpTransportHandler.java
@@ -0,0 +1,109 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.transport.udp;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.TransportHandler;
+import org.apache.kerby.transport.event.TransportEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.util.HashMap;
+import java.util.Map;
+
+public class UdpTransportHandler extends TransportHandler {
+
+    protected Map<InetSocketAddress, UdpTransport> transports =
+            new HashMap<InetSocketAddress, UdpTransport>();
+
+    @Override
+    public EventType[] getInterestedEvents() {
+        return new EventType[] {
+                UdpEventType.CHANNEL_READABLE,
+                TransportEventType.TRANSPORT_WRITABLE,
+                TransportEventType.TRANSPORT_READABLE,
+                TransportEventType.NEW_TRANSPORT
+        };
+    }
+
+    @Override
+    protected void doHandle(Event event) throws Exception {
+        EventType eventType = event.getEventType();
+        if (eventType == UdpEventType.CHANNEL_READABLE) {
+            UdpChannelEvent ce = (UdpChannelEvent) event;
+            DatagramChannel channel = ce.getChannel();
+            doRead(channel);
+        } else if (eventType == TransportEventType.TRANSPORT_READABLE) {
+            TransportEvent te = (TransportEvent) event;
+            Transport transport = te.getTransport();
+            transport.onReadable();
+        } else if (eventType == TransportEventType.TRANSPORT_WRITABLE) {
+            TransportEvent te = (TransportEvent) event;
+            Transport transport = te.getTransport();
+            transport.onWriteable();
+        }  else if (eventType == TransportEventType.NEW_TRANSPORT) {
+            TransportEvent te = (TransportEvent) event;
+            Transport transport = te.getTransport();
+            if (transport instanceof UdpTransport) {
+                InetSocketAddress remoteAddress = transport.getRemoteAddress();
+                if (! transports.containsKey(remoteAddress)) {
+                    transports.put(remoteAddress, (UdpTransport) transport);
+                }
+            }
+        }
+    }
+
+    private void doRead(DatagramChannel channel) throws IOException {
+        ByteBuffer recvBuffer = ByteBuffer.allocate(65536); // to optimize
+        InetSocketAddress fromAddress = (InetSocketAddress) channel.receive(recvBuffer);
+        if (fromAddress != null) {
+            recvBuffer.flip();
+            UdpTransport transport = transports.get(fromAddress);
+            if (transport == null) {
+                // should be from acceptor
+                transport = new UdpTransport(channel, fromAddress);
+                transport.setDispatcher(getDispatcher());
+                dispatch(TransportEvent.createNewTransportEvent(transport));
+            }
+            transport.onRecvData(recvBuffer);
+        }
+    }
+
+    @Override
+    public void helpHandleSelectionKey(SelectionKey selectionKey) throws IOException {
+        DatagramChannel channel =
+                (DatagramChannel) selectionKey.channel();
+
+        if (selectionKey.isReadable()) {
+            dispatch(UdpChannelEvent.makeReadableChannelEvent(channel));
+        } else if (selectionKey.isWritable()) {
+            dispatch(UdpChannelEvent.makeWritableChannelEvent(channel));
+        }
+        // Udp channel is always writable, so not usable
+        selectionKey.interestOps(SelectionKey.OP_READ);
+    }
+}
+
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/NetworkUtil.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/NetworkUtil.java
new file mode 100644
index 0000000..7e3e22e
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/NetworkUtil.java
@@ -0,0 +1,47 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.kerby.event;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+/**
+ * Some network and event testing utilities.
+ */
+public class NetworkUtil {
+
+    /**
+     * Get a server socket point for testing usage, either TCP or UDP.
+     * @return server socket point
+     */
+    public static int getServerPort() {
+        int serverPort = 0;
+
+        try {
+            ServerSocket serverSocket = new ServerSocket(0);
+            serverPort = serverSocket.getLocalPort();
+            serverSocket.close();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to get a server socket point");
+        }
+
+        return serverPort;
+    }
+}
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/TestBuffer.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/TestBuffer.java
new file mode 100644
index 0000000..d7b0778
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/TestBuffer.java
@@ -0,0 +1,51 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event;
+
+import org.apache.kerby.transport.buffer.RecvBuffer;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestBuffer {
+
+    @Test
+    public void testRecvBuffer() {
+        String testString = "HELLO WORLD";
+        ByteBuffer testMessage = ByteBuffer.wrap(testString.getBytes());
+        ByteBuffer tmp;
+
+        RecvBuffer testBuffer = new RecvBuffer();
+        testBuffer.write(testMessage);
+        tmp = testBuffer.readMostBytes();
+        assertThat(tmp.array()).isEqualTo(testString.getBytes());
+
+        int nTimes = 10;
+        testBuffer.clear();
+        for (int i = 0; i < nTimes; ++i) {
+            testBuffer.write(ByteBuffer.wrap(testString.getBytes()));
+        }
+        int expectedBytes = nTimes * testMessage.limit();
+        tmp = testBuffer.readMostBytes();
+        assertThat(tmp.limit()).isEqualTo(expectedBytes);
+    }
+}
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkBase.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkBase.java
new file mode 100644
index 0000000..2ef7241
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkBase.java
@@ -0,0 +1,64 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.network;
+
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.event.NetworkUtil;
+import org.apache.kerby.transport.tcp.DecodingCallback;
+import org.apache.kerby.transport.tcp.StreamingDecoder;
+
+import java.nio.ByteBuffer;
+
+public class TestNetworkBase {
+    protected String serverHost = "127.0.0.1";
+    protected int tcpPort = 0;
+    protected int udpPort = 0;
+    protected String TEST_MESSAGE = "Hello world!";
+    protected String clientRecvedMessage;
+
+    protected enum TestEventType implements EventType {
+        FINISHED
+    }
+
+    protected void preparePorts() {
+        tcpPort = NetworkUtil.getServerPort();
+        udpPort = NetworkUtil.getServerPort();
+    }
+
+    protected String recvBuffer2String(ByteBuffer buffer) {
+        byte[] bytes = new byte[buffer.remaining()];
+        buffer.get(bytes);
+        return new String(bytes);
+    }
+
+    protected StreamingDecoder createStreamingDecoder() {
+        return new StreamingDecoder() {
+            @Override
+            public void decode(ByteBuffer streamingBuffer, DecodingCallback callback) {
+                int expectedMessageLength = TEST_MESSAGE.getBytes().length;
+                if (streamingBuffer.remaining() >= expectedMessageLength) {
+                    callback.onMessageComplete(expectedMessageLength, -1);
+                } else {
+                    callback.onMoreDataNeeded(expectedMessageLength);
+                }
+            }
+        };
+    }
+}
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkClient.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkClient.java
new file mode 100644
index 0000000..7d00d58
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkClient.java
@@ -0,0 +1,213 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.network;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventHandler;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.event.EventWaiter;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.Network;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestNetworkClient extends TestNetworkBase {
+
+    private EventHub eventHub;
+    private EventWaiter eventWaiter;
+
+    @Before
+    public void setUp() throws IOException {
+        preparePorts();
+
+        setUpServer();
+        setUpClient();
+    }
+
+    private void setUpServer() {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    doRunTcpServer();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    doRunUdpServer();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+    }
+
+    private void doRunTcpServer() throws IOException {
+        Selector selector = Selector.open();
+        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.configureBlocking(false);
+        ServerSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(new InetSocketAddress(tcpPort));
+        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+
+        SocketChannel socketChannel;
+        while (true) {
+            if (selector.selectNow() > 0) {
+                Set<SelectionKey> selectionKeys = selector.selectedKeys();
+                Iterator<SelectionKey> iterator = selectionKeys.iterator();
+                while (iterator.hasNext()) {
+                    SelectionKey selectionKey = iterator.next();
+                    iterator.remove();
+
+                    if (selectionKey.isAcceptable()) {
+                        while ((socketChannel = serverSocketChannel.accept()) != null) {
+                            socketChannel.configureBlocking(false);
+                            socketChannel.socket().setTcpNoDelay(true);
+                            socketChannel.socket().setKeepAlive(true);
+                            socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, socketChannel);
+                            //selectionKey.attach(socketChannel);
+                        }
+                    } else if (selectionKey.isReadable()) {
+                        ByteBuffer recvBuffer = ByteBuffer.allocate(65536);
+                        socketChannel = (SocketChannel) selectionKey.attachment();
+                        if (socketChannel.read(recvBuffer) > 0) {
+                            recvBuffer.flip();
+                            socketChannel.write(recvBuffer);
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void doRunUdpServer() throws IOException {
+        Selector selector = Selector.open();
+        DatagramChannel serverSocketChannel = DatagramChannel.open();
+        serverSocketChannel.configureBlocking(false);
+        DatagramSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(new InetSocketAddress(udpPort));
+        serverSocketChannel.register(selector, SelectionKey.OP_READ);
+
+        while (true) {
+            if (selector.selectNow() > 0) {
+                Set<SelectionKey> selectionKeys = selector.selectedKeys();
+                Iterator<SelectionKey> iterator = selectionKeys.iterator();
+                while (iterator.hasNext()) {
+                    SelectionKey selectionKey = iterator.next();
+                    iterator.remove();
+                    if (selectionKey.isReadable()) {
+                        ByteBuffer recvBuffer = ByteBuffer.allocate(65536);
+                        InetSocketAddress fromAddress = (InetSocketAddress) serverSocketChannel.receive(recvBuffer);
+                        if (fromAddress != null) {
+                            recvBuffer.flip();
+                            serverSocketChannel.send(recvBuffer, fromAddress);
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void setUpClient() throws IOException {
+        eventHub = new EventHub();
+
+        EventHandler messageHandler = new MessageHandler() {
+            @Override
+            protected void handleMessage(MessageEvent event) {
+                if (event.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+                    ByteBuffer buffer = event.getMessage();
+                    if (buffer != null) {
+                        clientRecvedMessage = recvBuffer2String(buffer);
+                        System.out.println("Recved clientRecvedMessage: " + clientRecvedMessage);
+                        Boolean result = TEST_MESSAGE.equals(clientRecvedMessage);
+                        dispatch(new Event(TestEventType.FINISHED, result));
+                    }
+                }
+            }
+        };
+        eventHub.register(messageHandler);
+
+        Network network = new Network();
+        network.setStreamingDecoder(createStreamingDecoder());
+        eventHub.register(network);
+
+        eventWaiter = eventHub.waitEvent(
+                TestEventType.FINISHED,
+                TransportEventType.NEW_TRANSPORT);
+
+        eventHub.start();
+        network.tcpConnect(serverHost, tcpPort);
+        network.udpConnect(serverHost, udpPort);
+    }
+
+    @Test
+    public void testNetworkClient() {
+        Event event = eventWaiter.waitEvent(TransportEventType.NEW_TRANSPORT);
+        Transport transport = ((TransportEvent) event).getTransport();
+        transport.sendMessage(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+        event = eventWaiter.waitEvent(TestEventType.FINISHED);
+        assertThat((Boolean) event.getEventData()).isTrue();
+
+        event = eventWaiter.waitEvent(TransportEventType.NEW_TRANSPORT);
+        transport = ((TransportEvent) event).getTransport();
+        transport.sendMessage(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+        event = eventWaiter.waitEvent(TestEventType.FINISHED);
+        assertThat((Boolean) event.getEventData()).isTrue();
+    }
+
+    @After
+    public void cleanup() {
+        eventHub.stop();
+    }
+}
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkServer.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkServer.java
new file mode 100644
index 0000000..b68745d
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/network/TestNetworkServer.java
@@ -0,0 +1,115 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.network;
+
+import org.apache.kerby.event.EventHandler;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.Network;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SocketChannel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestNetworkServer extends TestNetworkBase {
+
+    private EventHub eventHub;
+
+    @Before
+    public void setUp() throws IOException {
+        preparePorts();
+
+        setUpServer();
+    }
+
+    private void setUpServer() throws IOException {
+        eventHub = new EventHub();
+
+        EventHandler messageHandler = new MessageHandler() {
+            @Override
+            protected void handleMessage(MessageEvent msgEvent) {
+                if (msgEvent.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+                    msgEvent.getTransport().sendMessage(msgEvent.getMessage());
+                }
+            }
+        };
+        eventHub.register(messageHandler);
+
+        Network network = new Network();
+        network.setStreamingDecoder(createStreamingDecoder());
+        eventHub.register(network);
+
+        eventHub.start();
+
+        network.tcpListen(serverHost, tcpPort);
+        network.udpListen(serverHost, udpPort);
+    }
+
+    @Test
+    public void testNetworkServer() throws IOException, InterruptedException {
+        testTcpTransport();
+        testUdpTransport();
+    }
+
+    private void testTcpTransport() throws IOException, InterruptedException {
+        Thread.sleep(10);
+
+        SocketChannel socketChannel = SocketChannel.open();
+        socketChannel.configureBlocking(true);
+        SocketAddress sa = new InetSocketAddress(serverHost, tcpPort);
+        socketChannel.connect(sa);
+        socketChannel.write(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+        ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
+        socketChannel.read(byteBuffer);
+        byteBuffer.flip();
+        clientRecvedMessage = recvBuffer2String(byteBuffer);
+        assertThat(clientRecvedMessage).isEqualTo(TEST_MESSAGE);
+    }
+
+    private void testUdpTransport() throws IOException, InterruptedException {
+        Thread.sleep(10);
+
+        DatagramChannel socketChannel = DatagramChannel.open();
+        socketChannel.configureBlocking(true);
+        SocketAddress sa = new InetSocketAddress(serverHost, udpPort);
+        socketChannel.send(ByteBuffer.wrap(TEST_MESSAGE.getBytes()), sa);
+        ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
+        socketChannel.receive(byteBuffer);
+        byteBuffer.flip();
+        clientRecvedMessage = recvBuffer2String(byteBuffer);
+        assertThat(clientRecvedMessage).isEqualTo(TEST_MESSAGE);
+    }
+
+    @After
+    public void cleanup() {
+        eventHub.stop();
+    }
+}
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpBase.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpBase.java
new file mode 100644
index 0000000..ed060ef
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpBase.java
@@ -0,0 +1,62 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.tcp;
+
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.event.NetworkUtil;
+import org.apache.kerby.transport.tcp.DecodingCallback;
+import org.apache.kerby.transport.tcp.StreamingDecoder;
+
+import java.nio.ByteBuffer;
+
+public class TestTcpBase {
+    protected String serverHost = "127.0.0.1";
+    protected int serverPort = 0;
+    protected String TEST_MESSAGE = "Hello world!";
+    protected String clientRecvedMessage;
+
+    protected enum TestEventType implements EventType {
+        FINISHED
+    }
+
+    protected void preparePort() {
+        serverPort = NetworkUtil.getServerPort();
+    }
+
+    protected String recvBuffer2String(ByteBuffer buffer) {
+        byte[] bytes = new byte[buffer.remaining()];
+        buffer.get(bytes);
+        return new String(bytes);
+    }
+
+    protected StreamingDecoder createStreamingDecoder() {
+        return new StreamingDecoder() {
+            @Override
+            public void decode(ByteBuffer streamingBuffer, DecodingCallback callback) {
+                int expectedMessageLength = TEST_MESSAGE.getBytes().length;
+                if (streamingBuffer.remaining() >= expectedMessageLength) {
+                    callback.onMessageComplete(expectedMessageLength, -1);
+                } else {
+                    callback.onMoreDataNeeded(expectedMessageLength);
+                }
+            }
+        };
+    }
+}
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpClient.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpClient.java
new file mode 100644
index 0000000..823ae7b
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpClient.java
@@ -0,0 +1,162 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.tcp;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventHandler;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.event.EventWaiter;
+import org.apache.kerby.transport.Connector;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.apache.kerby.transport.tcp.TcpConnector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestTcpClient extends TestTcpBase {
+
+    private EventHub eventHub;
+    private EventWaiter eventWaiter;
+
+    @Before
+    public void setUp() throws IOException {
+        preparePort();
+
+        setUpServer();
+        setUpClient();
+    }
+
+    private void setUpServer() {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    doRunServer();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+    }
+
+    private void doRunServer() throws IOException {
+        Selector selector = Selector.open();
+        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.configureBlocking(false);
+        ServerSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(new InetSocketAddress(serverPort));
+        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+
+        SocketChannel socketChannel;
+        while (true) {
+            if (selector.selectNow() > 0) {
+                Set<SelectionKey> selectionKeys = selector.selectedKeys();
+                Iterator<SelectionKey> iterator = selectionKeys.iterator();
+                while (iterator.hasNext()) {
+                    SelectionKey selectionKey = iterator.next();
+                    iterator.remove();
+
+                    if (selectionKey.isAcceptable()) {
+                        while ((socketChannel = serverSocketChannel.accept()) != null) {
+                            socketChannel.configureBlocking(false);
+                            socketChannel.socket().setTcpNoDelay(true);
+                            socketChannel.socket().setKeepAlive(true);
+                            socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, socketChannel);
+                            //selectionKey.attach(socketChannel);
+                        }
+                    } else if (selectionKey.isReadable()) {
+                        ByteBuffer recvBuffer = ByteBuffer.allocate(65536);
+                        socketChannel = (SocketChannel) selectionKey.attachment();
+                        if (socketChannel.read(recvBuffer) > 0) {
+                            recvBuffer.flip();
+                            socketChannel.write(recvBuffer);
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void setUpClient() throws IOException {
+        eventHub = new EventHub();
+
+        EventHandler messageHandler = new MessageHandler() {
+            @Override
+            protected void handleMessage(MessageEvent event) {
+                if (event.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+                    ByteBuffer buffer = event.getMessage();
+                    clientRecvedMessage = recvBuffer2String(buffer);
+                    System.out.println("Recved clientRecvedMessage: " + clientRecvedMessage);
+                    Boolean result = TEST_MESSAGE.equals(clientRecvedMessage);
+                    dispatch(new Event(TestEventType.FINISHED, result));
+                }
+            }
+        };
+        eventHub.register(messageHandler);
+
+        Connector connector = new TcpConnector(createStreamingDecoder());
+        eventHub.register(connector);
+
+        eventWaiter = eventHub.waitEvent(
+                TestEventType.FINISHED,
+                TransportEventType.NEW_TRANSPORT);
+
+        eventHub.start();
+        connector.connect(serverHost, serverPort);
+    }
+
+    @Test
+    public void testTcpTransport() {
+        Event event = eventWaiter.waitEvent(TransportEventType.NEW_TRANSPORT);
+        Transport transport = ((TransportEvent) event).getTransport();
+        transport.sendMessage(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+
+        event = eventWaiter.waitEvent(TestEventType.FINISHED);
+        assertThat((Boolean) event.getEventData()).isTrue();
+    }
+
+    @After
+    public void cleanup() {
+        eventHub.stop();
+    }
+}
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpServer.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpServer.java
new file mode 100644
index 0000000..d95580a
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/tcp/TestTcpServer.java
@@ -0,0 +1,94 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.tcp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.kerby.event.EventHandler;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.transport.Acceptor;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.apache.kerby.transport.tcp.TcpAcceptor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTcpServer extends TestTcpBase {
+
+    private EventHub eventHub;
+
+    @Before
+    public void setUp() throws IOException {
+        preparePort();
+
+        setUpServer();
+    }
+
+    private void setUpServer() throws IOException {
+        eventHub = new EventHub();
+
+        EventHandler messageHandler = new MessageHandler() {
+            @Override
+            protected void handleMessage(MessageEvent msgEvent) {
+                if (msgEvent.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+                    msgEvent.getTransport().sendMessage(msgEvent.getMessage());
+                }
+            }
+        };
+        eventHub.register(messageHandler);
+
+        Acceptor acceptor = new TcpAcceptor(createStreamingDecoder());
+        eventHub.register(acceptor);
+
+        eventHub.start();
+        acceptor.listen(serverHost, serverPort);
+    }
+
+    @Test
+    public void testTcpTransport() throws IOException, InterruptedException {
+        Thread.sleep(15);
+
+        SocketChannel socketChannel = SocketChannel.open();
+        socketChannel.configureBlocking(true);
+        SocketAddress sa = new InetSocketAddress(serverHost, serverPort);
+        socketChannel.connect(sa);
+        socketChannel.write(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+        ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
+        socketChannel.read(byteBuffer);
+        byteBuffer.flip();
+        clientRecvedMessage = recvBuffer2String(byteBuffer);
+
+        assertThat(clientRecvedMessage).isEqualTo(TEST_MESSAGE);
+    }
+
+    @After
+    public void cleanup() {
+        eventHub.stop();
+    }
+}
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpBase.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpBase.java
new file mode 100644
index 0000000..1887043
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpBase.java
@@ -0,0 +1,46 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.udp;
+
+import org.apache.kerby.event.EventType;
+import org.apache.kerby.event.NetworkUtil;
+
+import java.nio.ByteBuffer;
+
+public class TestUdpBase {
+    protected String serverHost = "127.0.0.1";
+    protected int serverPort = 0;
+    protected String TEST_MESSAGE = "Hello world!";
+    protected String clientRecvedMessage;
+
+    protected enum TestEventType implements EventType {
+        FINISHED
+    }
+
+    protected void preparePort() {
+        serverPort = NetworkUtil.getServerPort();
+    }
+
+    protected String recvBuffer2String(ByteBuffer buffer) {
+        byte[] bytes = new byte[buffer.remaining()];
+        buffer.get(bytes);
+        return new String(bytes);
+    }
+}
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpClient.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpClient.java
new file mode 100644
index 0000000..6f3453e
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpClient.java
@@ -0,0 +1,151 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.udp;
+
+import org.apache.kerby.event.Event;
+import org.apache.kerby.event.EventHandler;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.event.EventWaiter;
+import org.apache.kerby.transport.Connector;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.Transport;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.udp.UdpConnector;
+import org.apache.kerby.transport.event.TransportEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestUdpClient extends TestUdpBase {
+
+    private EventHub eventHub;
+    private EventWaiter eventWaiter;
+
+    @Before
+    public void setUp() throws IOException {
+        preparePort();
+
+        setUpServer();
+        setUpClient();
+    }
+
+    private void setUpServer() {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    doRunServer();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+    }
+
+    private void doRunServer() throws IOException {
+        Selector selector = Selector.open();
+        DatagramChannel serverSocketChannel = DatagramChannel.open();
+        serverSocketChannel.configureBlocking(false);
+        DatagramSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(new InetSocketAddress(serverPort));
+        serverSocketChannel.register(selector, SelectionKey.OP_READ);
+
+        while (true) {
+            if (selector.selectNow() > 0) {
+                Set<SelectionKey> selectionKeys = selector.selectedKeys();
+                Iterator<SelectionKey> iterator = selectionKeys.iterator();
+                while (iterator.hasNext()) {
+                    SelectionKey selectionKey = iterator.next();
+                    iterator.remove();
+                    if (selectionKey.isReadable()) {
+                        ByteBuffer recvBuffer = ByteBuffer.allocate(65536);
+                        InetSocketAddress fromAddress = (InetSocketAddress) serverSocketChannel.receive(recvBuffer);
+                        if (fromAddress != null) {
+                            recvBuffer.flip();
+                            serverSocketChannel.send(recvBuffer, fromAddress);
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void setUpClient() throws IOException {
+        eventHub = new EventHub();
+
+        EventHandler messageHandler = new MessageHandler() {
+            @Override
+            protected void handleMessage(MessageEvent msgEvent) {
+                if (msgEvent.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+                    ByteBuffer buffer = msgEvent.getMessage();
+                    clientRecvedMessage = recvBuffer2String(buffer);
+                    System.out.println("Recved clientRecvedMessage: " + clientRecvedMessage);
+                    Boolean result = TEST_MESSAGE.equals(clientRecvedMessage);
+                    dispatch(new Event(TestEventType.FINISHED, result));
+                }
+            }
+        };
+        eventHub.register(messageHandler);
+
+        Connector connector = new UdpConnector();
+        eventHub.register(connector);
+
+        eventWaiter = eventHub.waitEvent(
+                TestEventType.FINISHED,
+                TransportEventType.NEW_TRANSPORT);
+
+        eventHub.start();
+        connector.connect(serverHost, serverPort);
+    }
+
+    @Test
+    public void testUdpTransport() {
+        Event event = eventWaiter.waitEvent(TransportEventType.NEW_TRANSPORT);
+        Transport transport = ((TransportEvent) event).getTransport();
+        transport.sendMessage(ByteBuffer.wrap(TEST_MESSAGE.getBytes()));
+
+        event = eventWaiter.waitEvent(TestEventType.FINISHED);
+        assertThat((Boolean) event.getEventData()).isTrue();
+    }
+
+    @After
+    public void cleanup() {
+        eventHub.stop();
+    }
+}
diff --git a/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpServer.java b/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpServer.java
new file mode 100644
index 0000000..f44aa53
--- /dev/null
+++ b/lib/kerby-event/src/test/java/org/apache/kerby/event/udp/TestUdpServer.java
@@ -0,0 +1,93 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.kerby.event.udp;
+
+import org.apache.kerby.event.EventHandler;
+import org.apache.kerby.event.EventHub;
+import org.apache.kerby.transport.Acceptor;
+import org.apache.kerby.transport.MessageHandler;
+import org.apache.kerby.transport.event.MessageEvent;
+import org.apache.kerby.transport.event.TransportEventType;
+import org.apache.kerby.transport.udp.UdpAcceptor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestUdpServer extends TestUdpBase {
+
+    private EventHub eventHub;
+
+    @Before
+    public void setUp() throws IOException {
+        preparePort();
+
+        setUpServer();
+    }
+
+    private void setUpServer() throws IOException {
+        eventHub = new EventHub();
+
+        EventHandler messageHandler = new MessageHandler() {
+            @Override
+            protected void handleMessage(MessageEvent msgEvent) {
+                if (msgEvent.getEventType() == TransportEventType.INBOUND_MESSAGE) {
+                    msgEvent.getTransport().sendMessage(msgEvent.getMessage());
+                }
+            }
+        };
+        eventHub.register(messageHandler);
+
+        Acceptor acceptor = new UdpAcceptor();
+        eventHub.register(acceptor);
+
+        eventHub.start();
+        acceptor.listen(serverHost, serverPort);
+    }
+
+    @Test
+    public void testUdpTransport() throws IOException, InterruptedException {
+        Thread.sleep(10);
+
+        DatagramChannel socketChannel = DatagramChannel.open();
+        socketChannel.configureBlocking(true);
+        SocketAddress sa = new InetSocketAddress(serverHost, serverPort);
+        socketChannel.send(ByteBuffer.wrap(TEST_MESSAGE.getBytes()), sa);
+        ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
+        socketChannel.receive(byteBuffer);
+        byteBuffer.flip();
+        clientRecvedMessage = recvBuffer2String(byteBuffer);
+
+        assertThat(clientRecvedMessage).isEqualTo(TEST_MESSAGE);
+    }
+
+    @After
+    public void cleanup() {
+        eventHub.stop();
+    }
+}
diff --git a/lib/pom.xml b/lib/pom.xml
index 1b9af80..d2859f3 100644
--- a/lib/pom.xml
+++ b/lib/pom.xml
@@ -28,6 +28,7 @@
 
   <modules>
     <module>kerby-config</module>
+    <module>kerby-event</module>
     <module>kerby-util</module>
   </modules>