diff --git a/README.md b/README.md index cde3a3a..02794e2 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,14 @@ Licensed under the Apache License 2.0. SocketIOListener handler = new SocketIOListener() { @Override - public void onMessage(SocketIOClient client, String message) { + public void onEvent(SocketIOClient client, Packet packet) { + ... + } + + @Override + public void onMessage(SocketIOClient client, Packet packet) { + // get a message + packet.getData().toString(); ... } @@ -33,7 +40,10 @@ Licensed under the Apache License 2.0. } @Override - public void onJsonObject(SocketIOClient client, Object obj) { + public void onJsonObject(SocketIOClient client, Packet packet) { + // get a json object + packet.getData(); + ... SampleObject obj = new SampleObject(); // send object to socket.io client diff --git a/src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java b/src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java index a1ac11f..79bbac5 100644 --- a/src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java +++ b/src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java @@ -35,7 +35,6 @@ import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpHeaders; -import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpRequest; import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponseStatus; @@ -86,7 +85,7 @@ public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Di f.addListener(ChannelFutureListener.CLOSE); return; } - if (HttpMethod.GET.equals(req.getMethod()) && queryDecoder.getPath().equals(connectPath)) { + if (queryDecoder.getPath().equals(connectPath)) { authorize(channel, req, queryDecoder.getParameters()); return; } @@ -104,7 +103,7 @@ public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Di String transports = "websocket,xhr-polling"; //String transports = "websocket"; String heartbeatTimeoutVal = String.valueOf(configuration.getHeartbeatTimeout()); - if (configuration.getHeartbeatTimeout() == 0) { + if (!configuration.isHeartbeatsEnabled()) { heartbeatTimeoutVal = ""; } diff --git a/src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java b/src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java index 0f162f3..cdc97b5 100644 --- a/src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java +++ b/src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java @@ -26,7 +26,7 @@ import com.corundumstudio.socketio.scheduler.CancelableScheduler; import com.corundumstudio.socketio.scheduler.SchedulerKey; import com.corundumstudio.socketio.scheduler.SchedulerKey.Type; -public class HeartbeatHandler { +public class HeartbeatHandler implements Disconnectable { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -62,4 +62,9 @@ public class HeartbeatHandler { }, configuration.getHeartbeatTimeout(), TimeUnit.SECONDS); } + @Override + public void onDisconnect(SocketIOClient client) { + scheduler.cancel(new SchedulerKey(Type.HEARBEAT_TIMEOUT, client.getSessionId())); + } + } diff --git a/src/main/java/com/corundumstudio/socketio/PacketListener.java b/src/main/java/com/corundumstudio/socketio/PacketListener.java index 004bed2..4a93214 100644 --- a/src/main/java/com/corundumstudio/socketio/PacketListener.java +++ b/src/main/java/com/corundumstudio/socketio/PacketListener.java @@ -32,16 +32,20 @@ public class PacketListener { public void onPacket(Packet packet, SocketIOClient client) { switch (packet.getType()) { + case EVENT: + socketIOHandler.onEvent(client, packet); + break; + case HEARTBEAT: heartbeatHandler.onHeartbeat(client); break; case MESSAGE: - socketIOHandler.onMessage(client, packet.getData().toString()); + socketIOHandler.onMessage(client, packet); break; case JSON: - socketIOHandler.onJsonObject(client, packet.getData()); + socketIOHandler.onJsonObject(client, packet); break; case DISCONNECT: diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOListener.java b/src/main/java/com/corundumstudio/socketio/SocketIOListener.java index 77bb3cf..7cab7b7 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOListener.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOListener.java @@ -15,14 +15,18 @@ */ package com.corundumstudio.socketio; +import com.corundumstudio.socketio.parser.Packet; + public interface SocketIOListener { void onConnect(SocketIOClient client); - void onJsonObject(SocketIOClient client, Object obj); + void onJsonObject(SocketIOClient client, Packet packet); - void onMessage(SocketIOClient client, String message); + void onMessage(SocketIOClient client, Packet packet); void onDisconnect(SocketIOClient client); + void onEvent(SocketIOClient client, Packet packet); + } diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java b/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java index c243f47..3e24b9c 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java @@ -56,6 +56,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne private CancelableScheduler scheduler; private PacketHandler packetHandler; + private HeartbeatHandler heartbeatHandler; public void start(Configuration configuration) { this.socketIOHandler = configuration.getListener(); @@ -65,7 +66,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne Encoder encoder = new Encoder(objectMapper); Decoder decoder = new Decoder(objectMapper); - HeartbeatHandler heartbeatHandler = new HeartbeatHandler(configuration, scheduler); + heartbeatHandler = new HeartbeatHandler(configuration, scheduler); PacketListener packetListener = new PacketListener(socketIOHandler, this, heartbeatHandler); String connectPath = configuration.getContext() + "/" + protocol + "/"; @@ -97,6 +98,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne public void onDisconnect(SocketIOClient client) { log.debug("Client with sessionId: {} disconnected by client request", client.getSessionId()); + heartbeatHandler.onDisconnect(client); xhrPollingTransport.onDisconnect(client); webSocketTransport.onDisconnect(client); authorizeHandler.onDisconnect(client); diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java index 95737b4..d335c2a 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java @@ -19,6 +19,7 @@ import java.net.InetSocketAddress; import org.codehaus.jackson.map.annotate.JsonSerialize; import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.slf4j.Logger; @@ -32,6 +33,7 @@ public class SocketIOServer { private SocketIOPipelineFactory pipelineFactory = new SocketIOPipelineFactory(); + private Channel mainChannel; private Configuration config; public SocketIOServer(Configuration configuration) { @@ -40,8 +42,8 @@ public class SocketIOServer { } public void setPipelineFactory(SocketIOPipelineFactory pipelineFactory) { - this.pipelineFactory = pipelineFactory; - } + this.pipelineFactory = pipelineFactory; + } public void start() { ChannelFactory factory = new NioServerSocketChannelFactory(config.getBossExecutor(), config.getWorkerExecutor()); @@ -51,13 +53,14 @@ public class SocketIOServer { bootstrap.setPipelineFactory(pipelineFactory); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); - bootstrap.bind(new InetSocketAddress(config.getHostname(), config.getPort())); + mainChannel = bootstrap.bind(new InetSocketAddress(config.getHostname(), config.getPort())); log.info("SocketIO server started at port: {}", config.getPort()); } public void stop() { pipelineFactory.stop(); + mainChannel.close(); bootstrap.releaseExternalResources(); } diff --git a/src/main/java/com/corundumstudio/socketio/parser/Packet.java b/src/main/java/com/corundumstudio/socketio/parser/Packet.java index 92a60cf..ad0ddb1 100644 --- a/src/main/java/com/corundumstudio/socketio/parser/Packet.java +++ b/src/main/java/com/corundumstudio/socketio/parser/Packet.java @@ -54,6 +54,13 @@ public class Packet { this.data = data; } + /** + * Get packet data + *
+     * @return json object for {@link PacketType.JSON} type
+     * message for {@link PacketType.MESSAGE} type
+     * 
+ */ public Object getData() { return data; } diff --git a/src/main/java/com/corundumstudio/socketio/transport/BaseClient.java b/src/main/java/com/corundumstudio/socketio/transport/BaseClient.java index b3d5bd3..7aee687 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/BaseClient.java +++ b/src/main/java/com/corundumstudio/socketio/transport/BaseClient.java @@ -51,4 +51,29 @@ abstract class BaseClient implements SocketIOClient { return channel.getRemoteAddress(); } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((sessionId == null) ? 0 : sessionId.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + BaseClient other = (BaseClient) obj; + if (sessionId == null) { + if (other.sessionId != null) + return false; + } else if (!sessionId.equals(other.sessionId)) + return false; + return true; + } + } diff --git a/src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java b/src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java index 50bfb6d..8e506cd 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java +++ b/src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java @@ -37,11 +37,11 @@ public class WebSocketClient extends BaseClient { } public Channel getChannel() { - return channel; + return channel; } public ChannelFuture send(Packet packet) { - return channel.write(new WebSocketPacketMessage(sessionId, packet)); + return channel.write(new WebSocketPacketMessage(sessionId, packet)); } public void disconnect() { diff --git a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java index 9ebc9fe..5229aba 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java @@ -24,6 +24,7 @@ import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandler.Sharable; import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; @@ -81,6 +82,17 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements } } + @Override + public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) + throws Exception { + WebSocketClient client = channelId2Client.get(ctx.getChannel().getId()); + if (client != null) { + disconnectable.onDisconnect(client); + } else { + super.channelDisconnected(ctx, e); + } + } + private void handshake(ChannelHandlerContext ctx, HttpRequest req) { QueryStringDecoder queryDecoder = new QueryStringDecoder(req.getUri()); Channel channel = ctx.getChannel();