Browse Source

WebSocket transport fixed

master
Nikita 12 years ago
parent
commit
0bc083ced7
  1. 2
      src/main/java/com/corundumstudio/socketio/SocketIOEncoder.java
  2. 5
      src/main/java/com/corundumstudio/socketio/handler/ResourceHandler.java
  3. 10
      src/main/java/com/corundumstudio/socketio/messages/XHROutMessage.java
  4. 22
      src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java

2
src/main/java/com/corundumstudio/socketio/SocketIOEncoder.java

@ -204,7 +204,7 @@ public class SocketIOEncoder extends ChannelOutboundHandlerAdapter implements Me
WebSocketFrame res = new TextWebSocketFrame(message);
log.trace("Out message: {} sessionId: {}", new Object[] {
message.toString(CharsetUtil.UTF_8), webSocketPacketMessage.getSessionId()});
if (channel.isOpen()) {
if (channel.isActive()) {
channel.write(res);
} else {
log.debug("Channel was closed, for sessionId: {}", webSocketPacketMessage.getSessionId());

5
src/main/java/com/corundumstudio/socketio/handler/ResourceHandler.java

@ -165,10 +165,9 @@ public class ResourceHandler extends ChannelOutboundHandlerAdapter {
// No encryption - use zero-copy.
final FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, fileLength);
writeFuture = ch.write(region);
writeFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
writeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
public void operationComplete(ChannelFuture future) throws Exception {
region.release();
}
});

10
src/main/java/com/corundumstudio/socketio/messages/XHROutMessage.java

@ -15,6 +15,8 @@
*/
package com.corundumstudio.socketio.messages;
import java.util.UUID;
import io.netty.channel.Channel;
import com.corundumstudio.socketio.MessageHandler;
@ -22,15 +24,21 @@ import com.corundumstudio.socketio.MessageHandler;
public class XHROutMessage extends BaseMessage {
private final String origin;
private final UUID sessionId;
public XHROutMessage(String origin) {
public XHROutMessage(String origin, UUID sessionId) {
this.origin = origin;
this.sessionId = sessionId;
}
public String getOrigin() {
return origin;
}
public UUID getSessionId() {
return sessionId;
}
@Override
public void handleMessage(MessageHandler handler, Channel channel) {
handler.handle(this, channel);

22
src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java

@ -17,6 +17,8 @@ package com.corundumstudio.socketio.transport;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
@ -26,6 +28,7 @@ import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
@ -96,6 +99,10 @@ public class WebSocketTransport extends BaseTransport {
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
@ -108,7 +115,7 @@ public class WebSocketTransport extends BaseTransport {
}
private void handshake(ChannelHandlerContext ctx, String path, FullHttpRequest req) {
Channel channel = ctx.channel();
final Channel channel = ctx.channel();
String[] parts = path.split("/");
if (parts.length <= 3) {
log.warn("Wrong GET request path: {}, from ip: {}. Channel closed!",
@ -117,14 +124,19 @@ public class WebSocketTransport extends BaseTransport {
return;
}
UUID sessionId = UUID.fromString(parts[4]);
final UUID sessionId = UUID.fromString(parts[4]);
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(req), null, false);
WebSocketServerHandshaker handshaker = factory.newHandshaker(req);
if (handshaker != null) {
handshaker.handshake(channel, req);
connectClient(channel, sessionId);
ChannelFuture f = handshaker.handshake(channel, req);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
connectClient(channel, sessionId);
}
});
} else {
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
}
@ -132,7 +144,7 @@ public class WebSocketTransport extends BaseTransport {
private void receivePackets(ChannelHandlerContext ctx, ByteBuf channelBuffer) throws IOException {
WebSocketClient client = channelId2Client.get(ctx.channel());
ctx.fireChannelRead(new PacketsMessage(client, channelBuffer));
ctx.pipeline().fireChannelRead(new PacketsMessage(client, channelBuffer));
}
private void connectClient(Channel channel, UUID sessionId) {

Loading…
Cancel
Save