Browse Source

Issue #7 fixed

master
Nikita 13 years ago
parent
commit
9f18780398
  1. 13
      src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java
  2. 10
      src/main/java/com/corundumstudio/socketio/Configuration.java
  3. 24
      src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java
  4. 11
      src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java
  5. 10
      src/main/java/com/corundumstudio/socketio/scheduler/CancelableScheduler.java
  6. 61
      src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java
  7. 16
      src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java
  8. 47
      src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

13
src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java

@ -46,13 +46,16 @@ import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.messages.AuthorizeMessage;
import com.corundumstudio.socketio.parser.Packet;
import com.corundumstudio.socketio.parser.PacketType;
import com.corundumstudio.socketio.scheduler.CancelableScheduler;
import com.corundumstudio.socketio.scheduler.SchedulerKey;
import com.corundumstudio.socketio.scheduler.SchedulerKey.Type;
@Sharable
public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Disconnectable {
private final Logger log = LoggerFactory.getLogger(getClass());
private final CancelableScheduler<UUID> disconnectScheduler;
private final CancelableScheduler disconnectScheduler;
private final Set<UUID> authorizedSessionIds =
Collections.newSetFromMap(new ConcurrentHashMap<UUID, Boolean>());
@ -61,7 +64,7 @@ public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Di
private final Configuration configuration;
private final SocketIOListener socketIOListener;
public AuthorizeHandler(String connectPath, SocketIOListener socketIOListener, CancelableScheduler<UUID> scheduler, Configuration configuration) {
public AuthorizeHandler(String connectPath, SocketIOListener socketIOListener, CancelableScheduler scheduler, Configuration configuration) {
super();
this.connectPath = connectPath;
this.socketIOListener = socketIOListener;
@ -122,7 +125,8 @@ public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Di
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
disconnectScheduler.schedule(sessionId, new Runnable() {
SchedulerKey key = new SchedulerKey(Type.AUTHORIZE, sessionId);
disconnectScheduler.schedule(key, new Runnable() {
@Override
public void run() {
authorizedSessionIds.remove(sessionId);
@ -138,7 +142,8 @@ public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Di
}
public void connect(SocketIOClient client) {
disconnectScheduler.cancel(client.getSessionId());
SchedulerKey key = new SchedulerKey(Type.AUTHORIZE, client.getSessionId());
disconnectScheduler.cancel(key);
client.send(new Packet(PacketType.CONNECT));
socketIOListener.onConnect(client);
}

10
src/main/java/com/corundumstudio/socketio/Configuration.java

@ -28,6 +28,9 @@ public class Configuration {
private Executor workerExecutor = Executors.newCachedThreadPool();
private boolean allowCustomRequests = false;
private int pollingDuration = 20;
private int heartbeatThreadPoolSize = Runtime.getRuntime().availableProcessors() * 2;
private int heartbeatTimeout = 60;
private int heartbeatInterval = 25;
@ -175,4 +178,11 @@ public class Configuration {
this.allowCustomRequests = allowCustomRequests;
}
public int getPollingDuration() {
return pollingDuration;
}
public void setPollingDuration(int pollingDuration) {
this.pollingDuration = pollingDuration;
}
}

24
src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java

@ -15,7 +15,6 @@
*/
package com.corundumstudio.socketio;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
@ -23,36 +22,39 @@ import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.parser.Packet;
import com.corundumstudio.socketio.parser.PacketType;
import com.corundumstudio.socketio.scheduler.CancelableScheduler;
import com.corundumstudio.socketio.scheduler.SchedulerKey;
import com.corundumstudio.socketio.scheduler.SchedulerKey.Type;
public class HeartbeatHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
private final CancelableScheduler<UUID> scheduler;
private final CancelableScheduler scheduler;
private final Configuration configuration;
public HeartbeatHandler(Configuration configuration, CancelableScheduler<UUID> scheduler) {
public HeartbeatHandler(Configuration configuration, CancelableScheduler scheduler) {
this.configuration = configuration;
this.scheduler = scheduler;
}
public void onHeartbeat(final SocketIOClient client) {
scheduler.cancel(client.getSessionId());
if (!configuration.isHeartbeatsEnabled()) {
return;
}
scheduler.cancel(new SchedulerKey(Type.HEARBEAT_TIMEOUT, client.getSessionId()));
scheduler.schedule(new Runnable() {
public void run() {
sendHeartbeat(client);
client.send(new Packet(PacketType.HEARTBEAT));
scheduleClientHeartbeatCheck(client);
}
}, configuration.getHeartbeatInterval(), TimeUnit.SECONDS);
}
public void sendHeartbeat(SocketIOClient client) {
client.send(new Packet(PacketType.HEARTBEAT));
scheduleClientHeartbeatCheck(client);
}
private void scheduleClientHeartbeatCheck(final SocketIOClient client) {
scheduler.schedule(new Runnable() {
SchedulerKey key = new SchedulerKey(Type.HEARBEAT_TIMEOUT, client.getSessionId());
scheduler.schedule(key, new Runnable() {
public void run() {
client.disconnect();
log.debug("Client with sessionId: {} disconnected due to heartbeat timeout", client.getSessionId());

11
src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java

@ -17,8 +17,6 @@ package com.corundumstudio.socketio;
import static org.jboss.netty.channel.Channels.pipeline;
import java.util.UUID;
import org.codehaus.jackson.map.ObjectMapper;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
@ -30,6 +28,7 @@ import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.parser.Decoder;
import com.corundumstudio.socketio.parser.Encoder;
import com.corundumstudio.socketio.scheduler.CancelableScheduler;
import com.corundumstudio.socketio.transport.WebSocketTransport;
import com.corundumstudio.socketio.transport.XHRPollingTransport;
@ -54,13 +53,13 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne
private SocketIOEncoder socketIOEncoder;
private SocketIOListener socketIOHandler;
private CancelableScheduler<UUID> scheduler;
private CancelableScheduler scheduler;
private PacketHandler packetHandler;
public void start(Configuration configuration) {
this.socketIOHandler = configuration.getListener();
scheduler = new CancelableScheduler<UUID>(configuration.getHeartbeatThreadPoolSize());
scheduler = new CancelableScheduler(configuration.getHeartbeatThreadPoolSize());
ObjectMapper objectMapper = configuration.getObjectMapper();
Encoder encoder = new Encoder(objectMapper);
@ -73,8 +72,8 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne
packetHandler = new PacketHandler(packetListener, decoder);
authorizeHandler = new AuthorizeHandler(connectPath, socketIOHandler, scheduler, configuration);
xhrPollingTransport = new XHRPollingTransport(connectPath, this, scheduler, heartbeatHandler, authorizeHandler, configuration);
webSocketTransport = new WebSocketTransport(connectPath, this, authorizeHandler);
xhrPollingTransport = new XHRPollingTransport(connectPath, this, scheduler, authorizeHandler, configuration);
webSocketTransport = new WebSocketTransport(connectPath, this, authorizeHandler, heartbeatHandler);
socketIOEncoder = new SocketIOEncoder(objectMapper, encoder);
}

10
src/main/java/com/corundumstudio/socketio/CancelableScheduler.java → src/main/java/com/corundumstudio/socketio/scheduler/CancelableScheduler.java

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio;
package com.corundumstudio.socketio.scheduler;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -22,16 +22,16 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class CancelableScheduler<T> {
public class CancelableScheduler {
private final Map<T, Future<?>> scheduledFutures = new ConcurrentHashMap<T, Future<?>>();
private final Map<Object, Future<?>> scheduledFutures = new ConcurrentHashMap<Object, Future<?>>();
private final ScheduledExecutorService executorService;
public CancelableScheduler(int threadPoolSize) {
executorService = Executors.newScheduledThreadPool(threadPoolSize);
}
public void cancel(T key) {
public void cancel(SchedulerKey key) {
Future<?> future = scheduledFutures.remove(key);
if (future != null) {
future.cancel(false);
@ -42,7 +42,7 @@ public class CancelableScheduler<T> {
executorService.schedule(runnable, delay, unit);
}
public void schedule(final T key, final Runnable runnable, long delay, TimeUnit unit) {
public void schedule(final SchedulerKey key, final Runnable runnable, long delay, TimeUnit unit) {
Future<?> future = executorService.schedule(new Runnable() {
@Override
public void run() {

61
src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java

@ -0,0 +1,61 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.scheduler;
import java.util.UUID;
public class SchedulerKey {
public enum Type {NOOP, HEARBEAT_TIMEOUT, CLOSE_TIMEOUT, AUTHORIZE};
private Type type;
private UUID sessionId;
public SchedulerKey(Type type, UUID sessionId) {
this.type = type;
this.sessionId = sessionId;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result
+ ((sessionId == null) ? 0 : sessionId.hashCode());
result = prime * result + ((type == null) ? 0 : type.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SchedulerKey other = (SchedulerKey) obj;
if (sessionId == null) {
if (other.sessionId != null)
return false;
} else if (!sessionId.equals(other.sessionId))
return false;
if (type != other.type)
return false;
return true;
}
}

16
src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java

@ -22,11 +22,11 @@ import java.util.concurrent.ConcurrentHashMap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
@ -34,12 +34,12 @@ import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import org.jboss.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.AuthorizeHandler;
import com.corundumstudio.socketio.Disconnectable;
import com.corundumstudio.socketio.HeartbeatHandler;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.messages.PacketsMessage;
@ -51,15 +51,18 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements
private final Map<UUID, WebSocketClient> sessionId2Client = new ConcurrentHashMap<UUID, WebSocketClient>();
private final Map<Integer, WebSocketClient> channelId2Client = new ConcurrentHashMap<Integer, WebSocketClient>();
private final HeartbeatHandler heartbeatHandler;
private final AuthorizeHandler authorizeHandler;
private final Disconnectable disconnectable;
private final String path;
public WebSocketTransport(String connectPath, Disconnectable disconnectable, AuthorizeHandler authorizeHandler) {
public WebSocketTransport(String connectPath, Disconnectable disconnectable,
AuthorizeHandler authorizeHandler, HeartbeatHandler heartbeatHandler) {
this.path = connectPath + "websocket";
this.authorizeHandler = authorizeHandler;
this.disconnectable = disconnectable;
this.heartbeatHandler = heartbeatHandler;
}
@Override
@ -109,11 +112,6 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements
private void receivePackets(ChannelHandlerContext ctx,
ChannelBuffer channelBuffer) throws IOException {
WebSocketClient client = channelId2Client.get(ctx.getChannel().getId());
if (log.isTraceEnabled()) {
String content = channelBuffer.toString(CharsetUtil.UTF_8);
log.trace("In message: {} sessionId: {}", new Object[] {content, client.getSessionId()});
}
Channels.fireMessageReceived(ctx.getChannel(), new PacketsMessage(client, channelBuffer));
}
@ -129,6 +127,8 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements
channelId2Client.put(channel.getId(), client);
sessionId2Client.put(sessionId, client);
authorizeHandler.connect(client);
heartbeatHandler.onHeartbeat(client);
}
private String getWebSocketLocation(HttpRequest req) {

47
src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

@ -24,11 +24,11 @@ import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
@ -37,10 +37,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.AuthorizeHandler;
import com.corundumstudio.socketio.CancelableScheduler;
import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.Disconnectable;
import com.corundumstudio.socketio.HeartbeatHandler;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.messages.PacketsMessage;
import com.corundumstudio.socketio.messages.XHRErrorMessage;
@ -49,6 +47,9 @@ import com.corundumstudio.socketio.parser.ErrorAdvice;
import com.corundumstudio.socketio.parser.ErrorReason;
import com.corundumstudio.socketio.parser.Packet;
import com.corundumstudio.socketio.parser.PacketType;
import com.corundumstudio.socketio.scheduler.CancelableScheduler;
import com.corundumstudio.socketio.scheduler.SchedulerKey;
import com.corundumstudio.socketio.scheduler.SchedulerKey.Type;
@Sharable
public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements Disconnectable {
@ -56,22 +57,20 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<UUID, XHRPollingClient> sessionId2Client = new ConcurrentHashMap<UUID, XHRPollingClient>();
private final CancelableScheduler<UUID> disconnectScheduler;
private final CancelableScheduler scheduler;
private final AuthorizeHandler authorizeHandler;
private final HeartbeatHandler heartbeatHandler;
private final Disconnectable disconnectable;
private final Configuration configuration;
private final String path;
public XHRPollingTransport(String connectPath, Disconnectable disconnectable, CancelableScheduler<UUID> scheduler,
HeartbeatHandler heartbeatHandler, AuthorizeHandler authorizeHandler, Configuration configuration) {
public XHRPollingTransport(String connectPath, Disconnectable disconnectable, CancelableScheduler scheduler,
AuthorizeHandler authorizeHandler, Configuration configuration) {
this.path = connectPath + "xhr-polling/";
this.authorizeHandler = authorizeHandler;
this.configuration = configuration;
this.heartbeatHandler = heartbeatHandler;
this.disconnectable = disconnectable;
this.disconnectScheduler = scheduler;
this.scheduler = scheduler;
}
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
@ -86,8 +85,6 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
if (parts.length > 3) {
UUID sessionId = UUID.fromString(parts[4]);
scheduleDisconnect(channel, sessionId);
if (HttpMethod.POST.equals(req.getMethod())) {
onPost(sessionId, channel, req);
} else if (HttpMethod.GET.equals(req.getMethod())) {
@ -108,13 +105,28 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
ctx.sendUpstream(e);
}
private void scheduleNoop(Channel channel, final UUID sessionId) {
SchedulerKey key = new SchedulerKey(Type.NOOP, sessionId);
scheduler.cancel(key);
scheduler.schedule(key, new Runnable() {
@Override
public void run() {
XHRPollingClient client = sessionId2Client.get(sessionId);
if (client != null) {
client.send(new Packet(PacketType.NOOP));
}
}
}, configuration.getPollingDuration(), TimeUnit.SECONDS);
}
private void scheduleDisconnect(Channel channel, final UUID sessionId) {
disconnectScheduler.cancel(sessionId);
final SchedulerKey key = new SchedulerKey(Type.CLOSE_TIMEOUT, sessionId);
scheduler.cancel(key);
ChannelFuture future = channel.getCloseFuture();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
disconnectScheduler.schedule(sessionId, new Runnable() {
scheduler.schedule(key, new Runnable() {
@Override
public void run() {
XHRPollingClient client = sessionId2Client.get(sessionId);
@ -146,6 +158,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
sendError(channel, req, sessionId);
return;
}
String origin = req.getHeader(HttpHeaders.Names.ORIGIN);
XHRPollingClient client = sessionId2Client.get(sessionId);
if (client == null) {
@ -153,17 +166,17 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
}
client.update(channel, origin);
scheduleDisconnect(channel, sessionId);
scheduleNoop(channel, sessionId);
}
private XHRPollingClient createClient(String origin, Channel channel, UUID sessionId) {
private XHRPollingClient createClient(String origin, Channel channel, UUID sessionId) {
XHRPollingClient client = new XHRPollingClient(authorizeHandler, sessionId);
sessionId2Client.put(sessionId, client);
client.update(channel, origin);
authorizeHandler.connect(client);
if (configuration.isHeartbeatsEnabled()) {
heartbeatHandler.sendHeartbeat(client);
}
log.debug("Client for sessionId: {} was created", sessionId);
return client;
}

Loading…
Cancel
Save