Browse Source

Issue #3 fixed

master
Nikita 13 years ago
parent
commit
9ee72a4090
  1. 60
      src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java
  2. 48
      src/main/java/com/corundumstudio/socketio/CancelableScheduler.java
  3. 39
      src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java
  4. 24
      src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java
  5. 5
      src/main/java/com/corundumstudio/socketio/parser/Decoder.java
  6. 33
      src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

60
src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java

@ -19,21 +19,20 @@ import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
@ -53,21 +52,21 @@ public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Di
private final Logger log = LoggerFactory.getLogger(getClass());
// 'UUID' to 'timestamp' mapping
// this map will be always smaller than 'connectedSessionIds'
private final Map<UUID, Long> authorizedSessionIds = new ConcurrentHashMap<UUID, Long>();
private final Set<UUID> connectedSessionIds = Collections.newSetFromMap(new ConcurrentHashMap<UUID, Boolean>());
private final CancelableScheduler<UUID> disconnectScheduler;
private final Set<UUID> authorizedSessionIds =
Collections.newSetFromMap(new ConcurrentHashMap<UUID, Boolean>());
private final String connectPath;
private final Configuration configuration;
private final SocketIOListener socketIOListener;
public AuthorizeHandler(String connectPath, SocketIOListener socketIOListener, Configuration configuration) {
public AuthorizeHandler(String connectPath, SocketIOListener socketIOListener, CancelableScheduler<UUID> scheduler, Configuration configuration) {
super();
this.connectPath = connectPath;
this.socketIOListener = socketIOListener;
this.configuration = configuration;
this.disconnectScheduler = scheduler;
}
@Override
@ -93,10 +92,10 @@ public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Di
private void authorize(Channel channel, HttpRequest req, Map<String, List<String>> params)
throws IOException {
removeStaleAuthorizedIds();
final UUID sessionId = UUID.randomUUID();
authorizedSessionIds.put(sessionId, System.currentTimeMillis());
authorizedSessionIds.add(sessionId);
scheduleDisconnect(channel, sessionId);
String transports = "xhr-polling,websocket";
//String transports = "websocket";
@ -117,36 +116,35 @@ public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Di
log.debug("New sessionId: {} authorized", sessionId);
}
public boolean isSessionAuthorized(UUID sessionId) {
return connectedSessionIds.contains(sessionId)
|| authorizedSessionIds.containsKey(sessionId);
}
private void scheduleDisconnect(Channel channel, final UUID sessionId) {
ChannelFuture future = channel.getCloseFuture();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
disconnectScheduler.schedule(sessionId, new Runnable() {
@Override
public void run() {
authorizedSessionIds.remove(sessionId);
log.debug("Authorized sessionId: {} removed due to connection timeout", sessionId);
}
}, configuration.getCloseTimeout(), TimeUnit.SECONDS);
}
});
}
/**
* Remove stale authorized client ids which
* has not connected during some timeout
*/
private void removeStaleAuthorizedIds() {
for (Iterator<Entry<UUID, Long>> iterator = authorizedSessionIds.entrySet().iterator(); iterator.hasNext();) {
Entry<UUID, Long> entry = iterator.next();
if (System.currentTimeMillis() - entry.getValue() > 60*1000) {
iterator.remove();
log.debug("Authorized sessionId: {} cleared due to connection timeout", entry.getKey());
}
}
public boolean isSessionAuthorized(UUID sessionId) {
return authorizedSessionIds.contains(sessionId);
}
public void connect(SocketIOClient client) {
authorizedSessionIds.remove(client.getSessionId());
connectedSessionIds.add(client.getSessionId());
disconnectScheduler.cancel(client.getSessionId());
client.send(new Packet(PacketType.CONNECT));
socketIOListener.onConnect(client);
}
@Override
public void onDisconnect(SocketIOClient client) {
connectedSessionIds.remove(client.getSessionId());
authorizedSessionIds.remove(client.getSessionId());
}
}

48
src/main/java/com/corundumstudio/socketio/CancelableScheduler.java

@ -0,0 +1,48 @@
package com.corundumstudio.socketio;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class CancelableScheduler<T> {
private final Map<T, Future<?>> scheduledFutures = new ConcurrentHashMap<T, Future<?>>();
private final ScheduledExecutorService executorService;
public CancelableScheduler(int threadPoolSize) {
executorService = Executors.newScheduledThreadPool(threadPoolSize);
}
public void cancel(T key) {
Future<?> future = scheduledFutures.remove(key);
if (future != null) {
future.cancel(false);
}
}
public void schedule(Runnable runnable, long delay, TimeUnit unit) {
executorService.schedule(runnable, delay, unit);
}
public void schedule(final T key, final Runnable runnable, long delay, TimeUnit unit) {
Future<?> future = executorService.schedule(new Runnable() {
@Override
public void run() {
try {
runnable.run();
} finally {
scheduledFutures.remove(key);
}
}
}, delay, unit);
scheduledFutures.put(key, future);
}
public void shutdown() {
executorService.shutdownNow();
}
}

39
src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java

@ -15,12 +15,7 @@
*/
package com.corundumstudio.socketio;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
@ -33,19 +28,18 @@ public class HeartbeatHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<UUID, Future<?>> scheduledHeartbeatFutures = new ConcurrentHashMap<UUID, Future<?>>();
private final ScheduledExecutorService executorService;
private final CancelableScheduler<UUID> scheduler;
private final Configuration configuration;
public HeartbeatHandler(Configuration configuration) {
this.executorService = Executors.newScheduledThreadPool(configuration.getHeartbeatThreadPoolSize());
public HeartbeatHandler(Configuration configuration, CancelableScheduler<UUID> scheduler) {
this.configuration = configuration;
this.scheduler = scheduler;
}
public void onHeartbeat(final SocketIOClient client) {
cancelClientHeartbeatCheck(client);
scheduler.cancel(client.getSessionId());
executorService.schedule(new Runnable() {
scheduler.schedule(new Runnable() {
public void run() {
sendHeartbeat(client);
}
@ -57,30 +51,13 @@ public class HeartbeatHandler {
scheduleClientHeartbeatCheck(client);
}
private void cancelClientHeartbeatCheck(SocketIOClient client) {
Future<?> future = scheduledHeartbeatFutures.remove(client.getSessionId());
if (future != null) {
future.cancel(false);
}
}
private void scheduleClientHeartbeatCheck(final SocketIOClient client) {
Future<?> future = executorService.schedule(new Runnable() {
scheduler.schedule(new Runnable() {
public void run() {
try {
client.disconnect();
} finally {
UUID sessionId = client.getSessionId();
scheduledHeartbeatFutures.remove(sessionId);
log.debug("Client with sessionId: {} disconnected due to heartbeat timeout", sessionId);
}
client.disconnect();
log.debug("Client with sessionId: {} disconnected due to heartbeat timeout", client.getSessionId());
}
}, configuration.getHeartbeatTimeout(), TimeUnit.SECONDS);
scheduledHeartbeatFutures.put(client.getSessionId(), future);
}
public void shutdown() {
executorService.shutdown();
}
}

24
src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java

@ -17,6 +17,8 @@ package com.corundumstudio.socketio;
import static org.jboss.netty.channel.Channels.pipeline;
import java.util.UUID;
import org.codehaus.jackson.map.ObjectMapper;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
@ -38,29 +40,31 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne
private final int protocol = 1;
private final AuthorizeHandler authorizeHandler;
private XHRPollingTransport xhrPollingTransport;
private WebSocketTransport webSocketTransport;
private SocketIOEncoder socketIOEncoder;
private final XHRPollingTransport xhrPollingTransport;
private final WebSocketTransport webSocketTransport;
private final SocketIOEncoder socketIOEncoder;
private SocketIOListener socketIOHandler;
private HeartbeatHandler heartbeatHandler;
private final SocketIOListener socketIOHandler;
private final CancelableScheduler<UUID> scheduler;
private PacketHandler packetHandler;
private final PacketHandler packetHandler;
public SocketIOPipelineFactory(Configuration configuration) {
this.socketIOHandler = configuration.getListener();
this.heartbeatHandler = new HeartbeatHandler(configuration);
scheduler = new CancelableScheduler<UUID>(configuration.getHeartbeatThreadPoolSize());
ObjectMapper objectMapper = configuration.getObjectMapper();
Encoder encoder = new Encoder(objectMapper);
Decoder decoder = new Decoder(objectMapper);
HeartbeatHandler heartbeatHandler = new HeartbeatHandler(configuration, scheduler);
PacketListener packetListener = new PacketListener(socketIOHandler, this, heartbeatHandler);
String connectPath = configuration.getContext() + "/" + protocol + "/";
packetHandler = new PacketHandler(packetListener, decoder);
authorizeHandler = new AuthorizeHandler(connectPath, socketIOHandler, configuration);
xhrPollingTransport = new XHRPollingTransport(connectPath, this, heartbeatHandler, authorizeHandler, configuration);
authorizeHandler = new AuthorizeHandler(connectPath, socketIOHandler, scheduler, configuration);
xhrPollingTransport = new XHRPollingTransport(connectPath, this, scheduler, heartbeatHandler, authorizeHandler, configuration);
webSocketTransport = new WebSocketTransport(connectPath, this, authorizeHandler);
socketIOEncoder = new SocketIOEncoder(objectMapper, encoder);
}
@ -92,7 +96,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne
}
public void stop() {
heartbeatHandler.shutdown();
scheduler.shutdown();
}
}

5
src/main/java/com/corundumstudio/socketio/parser/Decoder.java

@ -16,13 +16,10 @@
package com.corundumstudio.socketio.parser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
@ -30,7 +27,7 @@ import org.jboss.netty.util.CharsetUtil;
public class Decoder {
private byte separator = (byte)':';
private final byte separator = (byte)':';
private final Pattern packetPattern = Pattern.compile("([^:]+):([0-9]+)?(\\+)?:([^:]+)?:?([\\s\\S]*)?");
private final Pattern ackPattern = Pattern.compile("^([0-9]+)(\\+)?(.*)");

33
src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

@ -19,8 +19,11 @@ import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
@ -34,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.AuthorizeHandler;
import com.corundumstudio.socketio.CancelableScheduler;
import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.Disconnectable;
import com.corundumstudio.socketio.HeartbeatHandler;
@ -52,20 +56,22 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<UUID, XHRPollingClient> sessionId2Client = new ConcurrentHashMap<UUID, XHRPollingClient>();
private final CancelableScheduler<UUID> disconnectScheduler;
private final AuthorizeHandler authorizeHandler;
private final HeartbeatHandler heartbeatHandler;
private final Disconnectable disconnectable;
private final String path;
private final Configuration configuration;
private final String path;
public XHRPollingTransport(String connectPath, Disconnectable disconnectable,
public XHRPollingTransport(String connectPath, Disconnectable disconnectable, CancelableScheduler<UUID> scheduler,
HeartbeatHandler heartbeatHandler, AuthorizeHandler authorizeHandler, Configuration configuration) {
this.path = connectPath + "xhr-polling/";
this.authorizeHandler = authorizeHandler;
this.configuration = configuration;
this.heartbeatHandler = heartbeatHandler;
this.disconnectable = disconnectable;
this.disconnectScheduler = scheduler;
}
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
@ -79,6 +85,9 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
String[] parts = queryDecoder.getPath().split("/");
if (parts.length > 3) {
UUID sessionId = UUID.fromString(parts[4]);
scheduleDisconnect(channel, sessionId);
if (HttpMethod.POST.equals(req.getMethod())) {
onPost(sessionId, channel, req);
} else if (HttpMethod.GET.equals(req.getMethod())) {
@ -99,6 +108,26 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
ctx.sendUpstream(e);
}
private void scheduleDisconnect(Channel channel, final UUID sessionId) {
disconnectScheduler.cancel(sessionId);
ChannelFuture future = channel.getCloseFuture();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
disconnectScheduler.schedule(sessionId, new Runnable() {
@Override
public void run() {
XHRPollingClient client = sessionId2Client.get(sessionId);
if (client != null) {
disconnectable.onDisconnect(client);
log.debug("Client: {} disconnected due to connection timeout", sessionId);
}
}
}, configuration.getCloseTimeout(), TimeUnit.SECONDS);
}
});
}
private void onPost(UUID sessionId, Channel channel, HttpRequest req) throws IOException {
XHRPollingClient client = sessionId2Client.get(sessionId);
if (client == null) {

Loading…
Cancel
Save