diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOEncoder.java b/src/main/java/com/corundumstudio/socketio/SocketIOEncoder.java index 097211e..32407ca 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOEncoder.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOEncoder.java @@ -204,7 +204,7 @@ public class SocketIOEncoder extends ChannelOutboundHandlerAdapter implements Me WebSocketFrame res = new TextWebSocketFrame(message); log.trace("Out message: {} sessionId: {}", new Object[] { message.toString(CharsetUtil.UTF_8), webSocketPacketMessage.getSessionId()}); - if (channel.isOpen()) { + if (channel.isActive()) { channel.write(res); } else { log.debug("Channel was closed, for sessionId: {}", webSocketPacketMessage.getSessionId()); diff --git a/src/main/java/com/corundumstudio/socketio/handler/ResourceHandler.java b/src/main/java/com/corundumstudio/socketio/handler/ResourceHandler.java index 6df6740..5afe00b 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/ResourceHandler.java +++ b/src/main/java/com/corundumstudio/socketio/handler/ResourceHandler.java @@ -165,10 +165,9 @@ public class ResourceHandler extends ChannelOutboundHandlerAdapter { // No encryption - use zero-copy. final FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, fileLength); writeFuture = ch.write(region); - writeFuture.addListener(new GenericFutureListener>() { - + writeFuture.addListener(new ChannelFutureListener() { @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(ChannelFuture future) throws Exception { region.release(); } }); diff --git a/src/main/java/com/corundumstudio/socketio/messages/XHROutMessage.java b/src/main/java/com/corundumstudio/socketio/messages/XHROutMessage.java index 4dd8d47..694ca9d 100644 --- a/src/main/java/com/corundumstudio/socketio/messages/XHROutMessage.java +++ b/src/main/java/com/corundumstudio/socketio/messages/XHROutMessage.java @@ -15,6 +15,8 @@ */ package com.corundumstudio.socketio.messages; +import java.util.UUID; + import io.netty.channel.Channel; import com.corundumstudio.socketio.MessageHandler; @@ -22,15 +24,21 @@ import com.corundumstudio.socketio.MessageHandler; public class XHROutMessage extends BaseMessage { private final String origin; + private final UUID sessionId; - public XHROutMessage(String origin) { + public XHROutMessage(String origin, UUID sessionId) { this.origin = origin; + this.sessionId = sessionId; } public String getOrigin() { return origin; } + public UUID getSessionId() { + return sessionId; + } + @Override public void handleMessage(MessageHandler handler, Channel channel) { handler.handle(this, channel); diff --git a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java index 9ccfec4..dfc661c 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java @@ -17,6 +17,8 @@ package com.corundumstudio.socketio.transport; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -26,6 +28,7 @@ import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.QueryStringDecoder; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; @@ -96,6 +99,10 @@ public class WebSocketTransport extends BaseTransport { } } + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { @@ -108,7 +115,7 @@ public class WebSocketTransport extends BaseTransport { } private void handshake(ChannelHandlerContext ctx, String path, FullHttpRequest req) { - Channel channel = ctx.channel(); + final Channel channel = ctx.channel(); String[] parts = path.split("/"); if (parts.length <= 3) { log.warn("Wrong GET request path: {}, from ip: {}. Channel closed!", @@ -117,14 +124,19 @@ public class WebSocketTransport extends BaseTransport { return; } - UUID sessionId = UUID.fromString(parts[4]); + final UUID sessionId = UUID.fromString(parts[4]); WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory( getWebSocketLocation(req), null, false); WebSocketServerHandshaker handshaker = factory.newHandshaker(req); if (handshaker != null) { - handshaker.handshake(channel, req); - connectClient(channel, sessionId); + ChannelFuture f = handshaker.handshake(channel, req); + f.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + connectClient(channel, sessionId); + } + }); } else { WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); } @@ -132,7 +144,7 @@ public class WebSocketTransport extends BaseTransport { private void receivePackets(ChannelHandlerContext ctx, ByteBuf channelBuffer) throws IOException { WebSocketClient client = channelId2Client.get(ctx.channel()); - ctx.fireChannelRead(new PacketsMessage(client, channelBuffer)); + ctx.pipeline().fireChannelRead(new PacketsMessage(client, channelBuffer)); } private void connectClient(Channel channel, UUID sessionId) {