Browse Source

Flashsocket support added. Issue #27

master
Nikita 13 years ago
parent
commit
96d571b1d1
  1. 6
      src/main/java/com/corundumstudio/socketio/AckManager.java
  2. 5
      src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java
  3. 33
      src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java
  4. 55
      src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java
  5. 44
      src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java
  6. 30
      src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java
  7. 6
      src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

6
src/main/java/com/corundumstudio/socketio/AckManager.java

@ -32,7 +32,7 @@ import com.corundumstudio.socketio.transport.BaseClient;
public class AckManager implements Disconnectable {
private class AckEntry {
class AckEntry {
final Map<Long, AckCallback> ackCallbacks = new ConcurrentHashMap<Long, AckCallback>();
final AtomicLong ackIndex = new AtomicLong(-1);
@ -51,10 +51,6 @@ public class AckManager implements Disconnectable {
return ackCallbacks.remove(index);
}
public AtomicLong getAckIndex() {
return ackIndex;
}
public void initAckIndex(long index) {
ackIndex.compareAndSet(-1, index);
}

5
src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java

@ -51,6 +51,9 @@ import com.corundumstudio.socketio.scheduler.CancelableScheduler;
import com.corundumstudio.socketio.scheduler.SchedulerKey;
import com.corundumstudio.socketio.scheduler.SchedulerKey.Type;
import com.corundumstudio.socketio.transport.BaseClient;
import com.corundumstudio.socketio.transport.FlashSocketTransport;
import com.corundumstudio.socketio.transport.WebSocketTransport;
import com.corundumstudio.socketio.transport.XHRPollingTransport;
@Sharable
public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Disconnectable {
@ -102,7 +105,7 @@ public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Di
scheduleDisconnect(channel, sessionId);
String transports = "websocket,xhr-polling";
String transports = WebSocketTransport.NAME + "," + FlashSocketTransport.NAME + "," + XHRPollingTransport.NAME;
String heartbeatTimeoutVal = String.valueOf(configuration.getHeartbeatTimeout());
if (!configuration.isHeartbeatsEnabled()) {
heartbeatTimeoutVal = "";

33
src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java

@ -40,20 +40,24 @@ import com.corundumstudio.socketio.parser.Encoder;
import com.corundumstudio.socketio.parser.JsonSupport;
import com.corundumstudio.socketio.scheduler.CancelableScheduler;
import com.corundumstudio.socketio.transport.BaseClient;
import com.corundumstudio.socketio.transport.FlashPolicyHandler;
import com.corundumstudio.socketio.transport.FlashSocketTransport;
import com.corundumstudio.socketio.transport.WebSocketTransport;
import com.corundumstudio.socketio.transport.XHRPollingTransport;
public class SocketIOPipelineFactory implements ChannelPipelineFactory, DisconnectableHub {
protected static final String SOCKETIO_ENCODER = "socketioEncoder";
protected static final String WEB_SOCKET_TRANSPORT = "webSocketTransport";
protected static final String XHR_POLLING_TRANSPORT = "xhrPollingTransport";
protected static final String AUTHORIZE_HANDLER = "authorizeHandler";
protected static final String PACKET_HANDLER = "packetHandler";
protected static final String HTTP_ENCODER = "encoder";
protected static final String HTTP_AGGREGATOR = "aggregator";
protected static final String HTTP_REQUEST_DECODER = "decoder";
protected static final String SSL_HANDLER = "ssl";
public static final String SOCKETIO_ENCODER = "socketioEncoder";
public static final String WEB_SOCKET_TRANSPORT = "webSocketTransport";
public static final String FLASH_SOCKET_TRANSPORT = "flashSocketTransport";
public static final String XHR_POLLING_TRANSPORT = "xhrPollingTransport";
public static final String AUTHORIZE_HANDLER = "authorizeHandler";
public static final String PACKET_HANDLER = "packetHandler";
public static final String HTTP_ENCODER = "encoder";
public static final String HTTP_AGGREGATOR = "aggregator";
public static final String HTTP_REQUEST_DECODER = "decoder";
public static final String SSL_HANDLER = "ssl";
public static final String FLASH_POLICY_HANDLER = "flashPolicyHandler";
private final Logger log = LoggerFactory.getLogger(getClass());
@ -64,6 +68,8 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne
private AuthorizeHandler authorizeHandler;
private XHRPollingTransport xhrPollingTransport;
private WebSocketTransport webSocketTransport;
private FlashSocketTransport flashSocketTransport;
private final FlashPolicyHandler flashPolicyHandler = new FlashPolicyHandler();
private SocketIOEncoder socketIOEncoder;
private CancelableScheduler scheduler;
@ -99,18 +105,23 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne
authorizeHandler = new AuthorizeHandler(connectPath, scheduler, configuration, namespacesHub);
xhrPollingTransport = new XHRPollingTransport(connectPath, ackManager, this, scheduler, authorizeHandler, configuration);
webSocketTransport = new WebSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler);
flashSocketTransport = new FlashSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler);
socketIOEncoder = new SocketIOEncoder(encoder);
}
public Iterable<SocketIOClient> getAllClients() {
// TODO refactor to transport registry
Iterable<SocketIOClient> xhrClients = xhrPollingTransport.getAllClients();
Iterable<SocketIOClient> webSocketClients = webSocketTransport.getAllClients();
return new CompositeIterable<SocketIOClient>(xhrClients, webSocketClients);
Iterable<SocketIOClient> flashSocketClients = flashSocketTransport.getAllClients();
return new CompositeIterable<SocketIOClient>(xhrClients, webSocketClients, flashSocketClients);
}
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
pipeline.addLast(FLASH_POLICY_HANDLER, flashPolicyHandler);
if (sslContext != null) {
SSLEngine engine = sslContext.createSSLEngine();
engine.setUseClientMode(false);
@ -126,6 +137,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne
pipeline.addLast(AUTHORIZE_HANDLER, authorizeHandler);
pipeline.addLast(XHR_POLLING_TRANSPORT, xhrPollingTransport);
pipeline.addLast(WEB_SOCKET_TRANSPORT, webSocketTransport);
pipeline.addLast(FLASH_SOCKET_TRANSPORT, flashSocketTransport);
pipeline.addLast(SOCKETIO_ENCODER, socketIOEncoder);
@ -155,6 +167,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne
ackManager.onDisconnect(client);
xhrPollingTransport.onDisconnect(client);
webSocketTransport.onDisconnect(client);
flashSocketTransport.onDisconnect(client);
authorizeHandler.onDisconnect(client);
}

55
src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java

@ -0,0 +1,55 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.transport;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.util.CharsetUtil;
import com.corundumstudio.socketio.SocketIOPipelineFactory;
@Sharable
public class FlashPolicyHandler extends SimpleChannelUpstreamHandler {
private final ChannelBuffer requestBuffer = ChannelBuffers.copiedBuffer("<policy-file-request/>", CharsetUtil.UTF_8);
private final ChannelBuffer responseBuffer = ChannelBuffers.copiedBuffer(
"<?xml version=\"1.0\"?>"
+ "<!DOCTYPE cross-domain-policy SYSTEM \"/xml/dtds/cross-domain-policy.dtd\">"
+ "<cross-domain-policy> "
+ " <site-control permitted-cross-domain-policies=\"master-only\"/>"
+ " <allow-access-from domain=\"*\" to-ports=\"*\" />"
+ "</cross-domain-policy>", CharsetUtil.UTF_8);
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelBuffer inBuffer = (ChannelBuffer) e.getMessage();
ChannelBuffer data = inBuffer.slice(0, requestBuffer.readableBytes());
if (data.equals(requestBuffer)) {
ChannelFuture f = e.getChannel().write(responseBuffer);
f.addListener(ChannelFutureListener.CLOSE);
ctx.getPipeline().remove(SocketIOPipelineFactory.FLASH_POLICY_HANDLER);
return;
}
super.messageReceived(ctx, e);
}
}

44
src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java

@ -0,0 +1,44 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.transport;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import com.corundumstudio.socketio.AckManager;
import com.corundumstudio.socketio.AuthorizeHandler;
import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.HeartbeatHandler;
import com.corundumstudio.socketio.SocketIOPipelineFactory;
@Sharable
public class FlashSocketTransport extends WebSocketTransport {
public static final String NAME = "flashsocket";
public FlashSocketTransport(String connectPath, boolean isSsl, AckManager ackManager,
DisconnectableHub disconnectable, AuthorizeHandler authorizeHandler,
HeartbeatHandler heartbeatHandler) {
super(connectPath, isSsl, ackManager, disconnectable, authorizeHandler, heartbeatHandler);
path = connectPath + NAME;
}
@Override
protected void removeHandler(ChannelPipeline pipeline) {
pipeline.remove(SocketIOPipelineFactory.WEB_SOCKET_TRANSPORT);
}
}

30
src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java

@ -27,6 +27,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
@ -48,11 +49,14 @@ import com.corundumstudio.socketio.Disconnectable;
import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.HeartbeatHandler;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOPipelineFactory;
import com.corundumstudio.socketio.messages.PacketsMessage;
@Sharable
public class WebSocketTransport extends SimpleChannelUpstreamHandler implements Disconnectable {
public static final String NAME = "websocket";
private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<UUID, WebSocketClient> sessionId2Client = new ConcurrentHashMap<UUID, WebSocketClient>();
@ -62,13 +66,13 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements
private final HeartbeatHandler heartbeatHandler;
private final AuthorizeHandler authorizeHandler;
private final DisconnectableHub disconnectableHub;
private final String path;
private final boolean isSsl;
protected String path;
public WebSocketTransport(String connectPath, boolean isSsl, AckManager ackManager, DisconnectableHub disconnectable,
AuthorizeHandler authorizeHandler, HeartbeatHandler heartbeatHandler) {
this.path = connectPath + "websocket";
this.path = connectPath + NAME;
this.isSsl = isSsl;
this.authorizeHandler = authorizeHandler;
this.ackManager = ackManager;
@ -86,7 +90,13 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements
receivePackets(ctx, frame.getBinaryData());
} else if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
handshake(ctx, req);
QueryStringDecoder queryDecoder = new QueryStringDecoder(req.getUri());
String path = queryDecoder.getPath();
if (path.startsWith(this.path)) {
handshake(ctx, path, req);
} else {
ctx.sendUpstream(e);
}
} else {
ctx.sendUpstream(e);
}
@ -103,14 +113,8 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements
}
}
private void handshake(ChannelHandlerContext ctx, HttpRequest req) {
QueryStringDecoder queryDecoder = new QueryStringDecoder(req.getUri());
private void handshake(ChannelHandlerContext ctx, String path, HttpRequest req) {
Channel channel = ctx.getChannel();
String path = queryDecoder.getPath();
if (!path.startsWith(this.path)) {
return;
}
String[] parts = path.split("/");
if (parts.length <= 3) {
log.warn("Wrong GET request path: {}, from ip: {}. Channel closed!",
@ -152,6 +156,12 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements
authorizeHandler.connect(client);
heartbeatHandler.onHeartbeat(client);
channel.getPipeline().remove(SocketIOPipelineFactory.FLASH_POLICY_HANDLER);
removeHandler(channel.getPipeline());
}
protected void removeHandler(ChannelPipeline pipeline) {
pipeline.remove(SocketIOPipelineFactory.FLASH_SOCKET_TRANSPORT);
}
private String getWebSocketLocation(HttpRequest req) {

6
src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

@ -47,6 +47,7 @@ import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.Disconnectable;
import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOPipelineFactory;
import com.corundumstudio.socketio.messages.PacketsMessage;
import com.corundumstudio.socketio.messages.XHRErrorMessage;
import com.corundumstudio.socketio.messages.XHRPostMessage;
@ -61,6 +62,8 @@ import com.corundumstudio.socketio.scheduler.SchedulerKey.Type;
@Sharable
public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements Disconnectable {
public static final String NAME = "xhr-polling";
private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<UUID, XHRPollingClient> sessionId2Client = new ConcurrentHashMap<UUID, XHRPollingClient>();
@ -74,7 +77,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
public XHRPollingTransport(String connectPath, AckManager ackManager, DisconnectableHub disconnectable, CancelableScheduler scheduler,
AuthorizeHandler authorizeHandler, Configuration configuration) {
this.path = connectPath + "xhr-polling/";
this.path = connectPath + NAME + "/";
this.ackManager = ackManager;
this.authorizeHandler = authorizeHandler;
this.configuration = configuration;
@ -190,6 +193,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
client.update(channel, origin);
authorizeHandler.connect(client);
channel.getPipeline().remove(SocketIOPipelineFactory.FLASH_POLICY_HANDLER);
log.debug("Client for sessionId: {} was created", sessionId);
return client;
}

Loading…
Cancel
Save