Browse Source

onEvent handler added

master
Nikita 13 years ago
parent
commit
692be47124
  1. 14
      README.md
  2. 5
      src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java
  3. 7
      src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java
  4. 8
      src/main/java/com/corundumstudio/socketio/PacketListener.java
  5. 8
      src/main/java/com/corundumstudio/socketio/SocketIOListener.java
  6. 4
      src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java
  7. 9
      src/main/java/com/corundumstudio/socketio/SocketIOServer.java
  8. 7
      src/main/java/com/corundumstudio/socketio/parser/Packet.java
  9. 25
      src/main/java/com/corundumstudio/socketio/transport/BaseClient.java
  10. 4
      src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java
  11. 12
      src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java

14
README.md

@ -18,7 +18,14 @@ Licensed under the Apache License 2.0.
SocketIOListener handler = new SocketIOListener() {
@Override
public void onMessage(SocketIOClient client, String message) {
public void onEvent(SocketIOClient client, Packet packet) {
...
}
@Override
public void onMessage(SocketIOClient client, Packet packet) {
// get a message
packet.getData().toString();
...
}
@ -33,7 +40,10 @@ Licensed under the Apache License 2.0.
}
@Override
public void onJsonObject(SocketIOClient client, Object obj) {
public void onJsonObject(SocketIOClient client, Packet packet) {
// get a json object
packet.getData();
...
SampleObject obj = new SampleObject();
// send object to socket.io client

5
src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java

@ -35,7 +35,6 @@ import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
@ -86,7 +85,7 @@ public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Di
f.addListener(ChannelFutureListener.CLOSE);
return;
}
if (HttpMethod.GET.equals(req.getMethod()) && queryDecoder.getPath().equals(connectPath)) {
if (queryDecoder.getPath().equals(connectPath)) {
authorize(channel, req, queryDecoder.getParameters());
return;
}
@ -104,7 +103,7 @@ public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Di
String transports = "websocket,xhr-polling";
//String transports = "websocket";
String heartbeatTimeoutVal = String.valueOf(configuration.getHeartbeatTimeout());
if (configuration.getHeartbeatTimeout() == 0) {
if (!configuration.isHeartbeatsEnabled()) {
heartbeatTimeoutVal = "";
}

7
src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java

@ -26,7 +26,7 @@ import com.corundumstudio.socketio.scheduler.CancelableScheduler;
import com.corundumstudio.socketio.scheduler.SchedulerKey;
import com.corundumstudio.socketio.scheduler.SchedulerKey.Type;
public class HeartbeatHandler {
public class HeartbeatHandler implements Disconnectable {
private final Logger log = LoggerFactory.getLogger(getClass());
@ -62,4 +62,9 @@ public class HeartbeatHandler {
}, configuration.getHeartbeatTimeout(), TimeUnit.SECONDS);
}
@Override
public void onDisconnect(SocketIOClient client) {
scheduler.cancel(new SchedulerKey(Type.HEARBEAT_TIMEOUT, client.getSessionId()));
}
}

8
src/main/java/com/corundumstudio/socketio/PacketListener.java

@ -32,16 +32,20 @@ public class PacketListener {
public void onPacket(Packet packet, SocketIOClient client) {
switch (packet.getType()) {
case EVENT:
socketIOHandler.onEvent(client, packet);
break;
case HEARTBEAT:
heartbeatHandler.onHeartbeat(client);
break;
case MESSAGE:
socketIOHandler.onMessage(client, packet.getData().toString());
socketIOHandler.onMessage(client, packet);
break;
case JSON:
socketIOHandler.onJsonObject(client, packet.getData());
socketIOHandler.onJsonObject(client, packet);
break;
case DISCONNECT:

8
src/main/java/com/corundumstudio/socketio/SocketIOListener.java

@ -15,14 +15,18 @@
*/
package com.corundumstudio.socketio;
import com.corundumstudio.socketio.parser.Packet;
public interface SocketIOListener {
void onConnect(SocketIOClient client);
void onJsonObject(SocketIOClient client, Object obj);
void onJsonObject(SocketIOClient client, Packet packet);
void onMessage(SocketIOClient client, String message);
void onMessage(SocketIOClient client, Packet packet);
void onDisconnect(SocketIOClient client);
void onEvent(SocketIOClient client, Packet packet);
}

4
src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java

@ -56,6 +56,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne
private CancelableScheduler scheduler;
private PacketHandler packetHandler;
private HeartbeatHandler heartbeatHandler;
public void start(Configuration configuration) {
this.socketIOHandler = configuration.getListener();
@ -65,7 +66,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne
Encoder encoder = new Encoder(objectMapper);
Decoder decoder = new Decoder(objectMapper);
HeartbeatHandler heartbeatHandler = new HeartbeatHandler(configuration, scheduler);
heartbeatHandler = new HeartbeatHandler(configuration, scheduler);
PacketListener packetListener = new PacketListener(socketIOHandler, this, heartbeatHandler);
String connectPath = configuration.getContext() + "/" + protocol + "/";
@ -97,6 +98,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne
public void onDisconnect(SocketIOClient client) {
log.debug("Client with sessionId: {} disconnected by client request", client.getSessionId());
heartbeatHandler.onDisconnect(client);
xhrPollingTransport.onDisconnect(client);
webSocketTransport.onDisconnect(client);
authorizeHandler.onDisconnect(client);

9
src/main/java/com/corundumstudio/socketio/SocketIOServer.java

@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
@ -32,6 +33,7 @@ public class SocketIOServer {
private SocketIOPipelineFactory pipelineFactory = new SocketIOPipelineFactory();
private Channel mainChannel;
private Configuration config;
public SocketIOServer(Configuration configuration) {
@ -40,8 +42,8 @@ public class SocketIOServer {
}
public void setPipelineFactory(SocketIOPipelineFactory pipelineFactory) {
this.pipelineFactory = pipelineFactory;
}
this.pipelineFactory = pipelineFactory;
}
public void start() {
ChannelFactory factory = new NioServerSocketChannelFactory(config.getBossExecutor(), config.getWorkerExecutor());
@ -51,13 +53,14 @@ public class SocketIOServer {
bootstrap.setPipelineFactory(pipelineFactory);
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
bootstrap.bind(new InetSocketAddress(config.getHostname(), config.getPort()));
mainChannel = bootstrap.bind(new InetSocketAddress(config.getHostname(), config.getPort()));
log.info("SocketIO server started at port: {}", config.getPort());
}
public void stop() {
pipelineFactory.stop();
mainChannel.close();
bootstrap.releaseExternalResources();
}

7
src/main/java/com/corundumstudio/socketio/parser/Packet.java

@ -54,6 +54,13 @@ public class Packet {
this.data = data;
}
/**
* Get packet data
* <pre>
* @return <b>json object</b> for {@link PacketType.JSON} type
* <b>message</b> for {@link PacketType.MESSAGE} type
* </pre>
*/
public Object getData() {
return data;
}

25
src/main/java/com/corundumstudio/socketio/transport/BaseClient.java

@ -51,4 +51,29 @@ abstract class BaseClient implements SocketIOClient {
return channel.getRemoteAddress();
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((sessionId == null) ? 0 : sessionId.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
BaseClient other = (BaseClient) obj;
if (sessionId == null) {
if (other.sessionId != null)
return false;
} else if (!sessionId.equals(other.sessionId))
return false;
return true;
}
}

4
src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java

@ -37,11 +37,11 @@ public class WebSocketClient extends BaseClient {
}
public Channel getChannel() {
return channel;
return channel;
}
public ChannelFuture send(Packet packet) {
return channel.write(new WebSocketPacketMessage(sessionId, packet));
return channel.write(new WebSocketPacketMessage(sessionId, packet));
}
public void disconnect() {

12
src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java

@ -24,6 +24,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
@ -81,6 +82,17 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements
}
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
WebSocketClient client = channelId2Client.get(ctx.getChannel().getId());
if (client != null) {
disconnectable.onDisconnect(client);
} else {
super.channelDisconnected(ctx, e);
}
}
private void handshake(ChannelHandlerContext ctx, HttpRequest req) {
QueryStringDecoder queryDecoder = new QueryStringDecoder(req.getUri());
Channel channel = ctx.getChannel();

Loading…
Cancel
Save