From 96d571b1d164453e0ecd861466b24de450177bb4 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 25 Sep 2012 18:00:34 +0400 Subject: [PATCH] Flashsocket support added. Issue #27 --- .../corundumstudio/socketio/AckManager.java | 6 +- .../socketio/AuthorizeHandler.java | 5 +- .../socketio/SocketIOPipelineFactory.java | 33 +++++++---- .../transport/FlashPolicyHandler.java | 55 +++++++++++++++++++ .../transport/FlashSocketTransport.java | 44 +++++++++++++++ .../transport/WebSocketTransport.java | 30 ++++++---- .../transport/XHRPollingTransport.java | 6 +- 7 files changed, 152 insertions(+), 27 deletions(-) create mode 100644 src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java create mode 100644 src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java diff --git a/src/main/java/com/corundumstudio/socketio/AckManager.java b/src/main/java/com/corundumstudio/socketio/AckManager.java index 5d76f1c..688bcd3 100644 --- a/src/main/java/com/corundumstudio/socketio/AckManager.java +++ b/src/main/java/com/corundumstudio/socketio/AckManager.java @@ -32,7 +32,7 @@ import com.corundumstudio.socketio.transport.BaseClient; public class AckManager implements Disconnectable { - private class AckEntry { + class AckEntry { final Map ackCallbacks = new ConcurrentHashMap(); final AtomicLong ackIndex = new AtomicLong(-1); @@ -51,10 +51,6 @@ public class AckManager implements Disconnectable { return ackCallbacks.remove(index); } - public AtomicLong getAckIndex() { - return ackIndex; - } - public void initAckIndex(long index) { ackIndex.compareAndSet(-1, index); } diff --git a/src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java b/src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java index 6ac0503..051b6c8 100644 --- a/src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java +++ b/src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java @@ -51,6 +51,9 @@ import com.corundumstudio.socketio.scheduler.CancelableScheduler; import com.corundumstudio.socketio.scheduler.SchedulerKey; import com.corundumstudio.socketio.scheduler.SchedulerKey.Type; import com.corundumstudio.socketio.transport.BaseClient; +import com.corundumstudio.socketio.transport.FlashSocketTransport; +import com.corundumstudio.socketio.transport.WebSocketTransport; +import com.corundumstudio.socketio.transport.XHRPollingTransport; @Sharable public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Disconnectable { @@ -102,7 +105,7 @@ public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Di scheduleDisconnect(channel, sessionId); - String transports = "websocket,xhr-polling"; + String transports = WebSocketTransport.NAME + "," + FlashSocketTransport.NAME + "," + XHRPollingTransport.NAME; String heartbeatTimeoutVal = String.valueOf(configuration.getHeartbeatTimeout()); if (!configuration.isHeartbeatsEnabled()) { heartbeatTimeoutVal = ""; diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java b/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java index e003569..eab9936 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java @@ -40,20 +40,24 @@ import com.corundumstudio.socketio.parser.Encoder; import com.corundumstudio.socketio.parser.JsonSupport; import com.corundumstudio.socketio.scheduler.CancelableScheduler; import com.corundumstudio.socketio.transport.BaseClient; +import com.corundumstudio.socketio.transport.FlashPolicyHandler; +import com.corundumstudio.socketio.transport.FlashSocketTransport; import com.corundumstudio.socketio.transport.WebSocketTransport; import com.corundumstudio.socketio.transport.XHRPollingTransport; public class SocketIOPipelineFactory implements ChannelPipelineFactory, DisconnectableHub { - protected static final String SOCKETIO_ENCODER = "socketioEncoder"; - protected static final String WEB_SOCKET_TRANSPORT = "webSocketTransport"; - protected static final String XHR_POLLING_TRANSPORT = "xhrPollingTransport"; - protected static final String AUTHORIZE_HANDLER = "authorizeHandler"; - protected static final String PACKET_HANDLER = "packetHandler"; - protected static final String HTTP_ENCODER = "encoder"; - protected static final String HTTP_AGGREGATOR = "aggregator"; - protected static final String HTTP_REQUEST_DECODER = "decoder"; - protected static final String SSL_HANDLER = "ssl"; + public static final String SOCKETIO_ENCODER = "socketioEncoder"; + public static final String WEB_SOCKET_TRANSPORT = "webSocketTransport"; + public static final String FLASH_SOCKET_TRANSPORT = "flashSocketTransport"; + public static final String XHR_POLLING_TRANSPORT = "xhrPollingTransport"; + public static final String AUTHORIZE_HANDLER = "authorizeHandler"; + public static final String PACKET_HANDLER = "packetHandler"; + public static final String HTTP_ENCODER = "encoder"; + public static final String HTTP_AGGREGATOR = "aggregator"; + public static final String HTTP_REQUEST_DECODER = "decoder"; + public static final String SSL_HANDLER = "ssl"; + public static final String FLASH_POLICY_HANDLER = "flashPolicyHandler"; private final Logger log = LoggerFactory.getLogger(getClass()); @@ -64,6 +68,8 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne private AuthorizeHandler authorizeHandler; private XHRPollingTransport xhrPollingTransport; private WebSocketTransport webSocketTransport; + private FlashSocketTransport flashSocketTransport; + private final FlashPolicyHandler flashPolicyHandler = new FlashPolicyHandler(); private SocketIOEncoder socketIOEncoder; private CancelableScheduler scheduler; @@ -99,18 +105,23 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne authorizeHandler = new AuthorizeHandler(connectPath, scheduler, configuration, namespacesHub); xhrPollingTransport = new XHRPollingTransport(connectPath, ackManager, this, scheduler, authorizeHandler, configuration); webSocketTransport = new WebSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler); + flashSocketTransport = new FlashSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler); socketIOEncoder = new SocketIOEncoder(encoder); } public Iterable getAllClients() { + // TODO refactor to transport registry Iterable xhrClients = xhrPollingTransport.getAllClients(); Iterable webSocketClients = webSocketTransport.getAllClients(); - return new CompositeIterable(xhrClients, webSocketClients); + Iterable flashSocketClients = flashSocketTransport.getAllClients(); + return new CompositeIterable(xhrClients, webSocketClients, flashSocketClients); } public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = pipeline(); + pipeline.addLast(FLASH_POLICY_HANDLER, flashPolicyHandler); + if (sslContext != null) { SSLEngine engine = sslContext.createSSLEngine(); engine.setUseClientMode(false); @@ -126,6 +137,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne pipeline.addLast(AUTHORIZE_HANDLER, authorizeHandler); pipeline.addLast(XHR_POLLING_TRANSPORT, xhrPollingTransport); pipeline.addLast(WEB_SOCKET_TRANSPORT, webSocketTransport); + pipeline.addLast(FLASH_SOCKET_TRANSPORT, flashSocketTransport); pipeline.addLast(SOCKETIO_ENCODER, socketIOEncoder); @@ -155,6 +167,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne ackManager.onDisconnect(client); xhrPollingTransport.onDisconnect(client); webSocketTransport.onDisconnect(client); + flashSocketTransport.onDisconnect(client); authorizeHandler.onDisconnect(client); } diff --git a/src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java b/src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java new file mode 100644 index 0000000..941e7c0 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java @@ -0,0 +1,55 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio.transport; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.ChannelHandler.Sharable; +import org.jboss.netty.util.CharsetUtil; + +import com.corundumstudio.socketio.SocketIOPipelineFactory; + +@Sharable +public class FlashPolicyHandler extends SimpleChannelUpstreamHandler { + + private final ChannelBuffer requestBuffer = ChannelBuffers.copiedBuffer("", CharsetUtil.UTF_8); + private final ChannelBuffer responseBuffer = ChannelBuffers.copiedBuffer( + "" + + "" + + " " + + " " + + " " + + "", CharsetUtil.UTF_8); + + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + ChannelBuffer inBuffer = (ChannelBuffer) e.getMessage(); + ChannelBuffer data = inBuffer.slice(0, requestBuffer.readableBytes()); + if (data.equals(requestBuffer)) { + ChannelFuture f = e.getChannel().write(responseBuffer); + f.addListener(ChannelFutureListener.CLOSE); + ctx.getPipeline().remove(SocketIOPipelineFactory.FLASH_POLICY_HANDLER); + return; + } + super.messageReceived(ctx, e); + } +} diff --git a/src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java b/src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java new file mode 100644 index 0000000..fa67a8e --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java @@ -0,0 +1,44 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio.transport; + +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelHandler.Sharable; + +import com.corundumstudio.socketio.AckManager; +import com.corundumstudio.socketio.AuthorizeHandler; +import com.corundumstudio.socketio.DisconnectableHub; +import com.corundumstudio.socketio.HeartbeatHandler; +import com.corundumstudio.socketio.SocketIOPipelineFactory; + +@Sharable +public class FlashSocketTransport extends WebSocketTransport { + + public static final String NAME = "flashsocket"; + + public FlashSocketTransport(String connectPath, boolean isSsl, AckManager ackManager, + DisconnectableHub disconnectable, AuthorizeHandler authorizeHandler, + HeartbeatHandler heartbeatHandler) { + super(connectPath, isSsl, ackManager, disconnectable, authorizeHandler, heartbeatHandler); + path = connectPath + NAME; + } + + @Override + protected void removeHandler(ChannelPipeline pipeline) { + pipeline.remove(SocketIOPipelineFactory.WEB_SOCKET_TRANSPORT); + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java index c3d1d64..a82f000 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java @@ -27,6 +27,7 @@ import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandler.Sharable; import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.MessageEvent; @@ -48,11 +49,14 @@ import com.corundumstudio.socketio.Disconnectable; import com.corundumstudio.socketio.DisconnectableHub; import com.corundumstudio.socketio.HeartbeatHandler; import com.corundumstudio.socketio.SocketIOClient; +import com.corundumstudio.socketio.SocketIOPipelineFactory; import com.corundumstudio.socketio.messages.PacketsMessage; @Sharable public class WebSocketTransport extends SimpleChannelUpstreamHandler implements Disconnectable { + public static final String NAME = "websocket"; + private final Logger log = LoggerFactory.getLogger(getClass()); private final Map sessionId2Client = new ConcurrentHashMap(); @@ -62,13 +66,13 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements private final HeartbeatHandler heartbeatHandler; private final AuthorizeHandler authorizeHandler; private final DisconnectableHub disconnectableHub; - private final String path; private final boolean isSsl; + protected String path; public WebSocketTransport(String connectPath, boolean isSsl, AckManager ackManager, DisconnectableHub disconnectable, AuthorizeHandler authorizeHandler, HeartbeatHandler heartbeatHandler) { - this.path = connectPath + "websocket"; + this.path = connectPath + NAME; this.isSsl = isSsl; this.authorizeHandler = authorizeHandler; this.ackManager = ackManager; @@ -86,7 +90,13 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements receivePackets(ctx, frame.getBinaryData()); } else if (msg instanceof HttpRequest) { HttpRequest req = (HttpRequest) msg; - handshake(ctx, req); + QueryStringDecoder queryDecoder = new QueryStringDecoder(req.getUri()); + String path = queryDecoder.getPath(); + if (path.startsWith(this.path)) { + handshake(ctx, path, req); + } else { + ctx.sendUpstream(e); + } } else { ctx.sendUpstream(e); } @@ -103,14 +113,8 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements } } - private void handshake(ChannelHandlerContext ctx, HttpRequest req) { - QueryStringDecoder queryDecoder = new QueryStringDecoder(req.getUri()); + private void handshake(ChannelHandlerContext ctx, String path, HttpRequest req) { Channel channel = ctx.getChannel(); - String path = queryDecoder.getPath(); - if (!path.startsWith(this.path)) { - return; - } - String[] parts = path.split("/"); if (parts.length <= 3) { log.warn("Wrong GET request path: {}, from ip: {}. Channel closed!", @@ -152,6 +156,12 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements authorizeHandler.connect(client); heartbeatHandler.onHeartbeat(client); + channel.getPipeline().remove(SocketIOPipelineFactory.FLASH_POLICY_HANDLER); + removeHandler(channel.getPipeline()); + } + + protected void removeHandler(ChannelPipeline pipeline) { + pipeline.remove(SocketIOPipelineFactory.FLASH_SOCKET_TRANSPORT); } private String getWebSocketLocation(HttpRequest req) { diff --git a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java index 3cc89c1..7342d06 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java @@ -47,6 +47,7 @@ import com.corundumstudio.socketio.Configuration; import com.corundumstudio.socketio.Disconnectable; import com.corundumstudio.socketio.DisconnectableHub; import com.corundumstudio.socketio.SocketIOClient; +import com.corundumstudio.socketio.SocketIOPipelineFactory; import com.corundumstudio.socketio.messages.PacketsMessage; import com.corundumstudio.socketio.messages.XHRErrorMessage; import com.corundumstudio.socketio.messages.XHRPostMessage; @@ -61,6 +62,8 @@ import com.corundumstudio.socketio.scheduler.SchedulerKey.Type; @Sharable public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements Disconnectable { + public static final String NAME = "xhr-polling"; + private final Logger log = LoggerFactory.getLogger(getClass()); private final Map sessionId2Client = new ConcurrentHashMap(); @@ -74,7 +77,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements public XHRPollingTransport(String connectPath, AckManager ackManager, DisconnectableHub disconnectable, CancelableScheduler scheduler, AuthorizeHandler authorizeHandler, Configuration configuration) { - this.path = connectPath + "xhr-polling/"; + this.path = connectPath + NAME + "/"; this.ackManager = ackManager; this.authorizeHandler = authorizeHandler; this.configuration = configuration; @@ -190,6 +193,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements client.update(channel, origin); authorizeHandler.connect(client); + channel.getPipeline().remove(SocketIOPipelineFactory.FLASH_POLICY_HANDLER); log.debug("Client for sessionId: {} was created", sessionId); return client; }