From 6460f343bc8d3b03548880e1667370d858047456 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 26 Dec 2013 19:45:38 +0400 Subject: [PATCH] Distributed authorization support for websocket transport. Issue #96 --- .gitignore | 2 + .../socketio/HandshakeData.java | 5 +- .../socketio/SocketIOChannelInitializer.java | 7 ++- .../socketio/handler/AuthorizeHandler.java | 53 +++++++++++----- .../socketio/store/RedisPubSubStore.java | 2 +- .../socketio/store/RedisStoreFactory.java | 5 +- .../socketio/store/StoreFactory.java | 3 +- .../store/pubsub/BaseStoreFactory.java | 62 ++++++++++++++----- .../socketio/store/pubsub/ConnectMessage.java | 38 ++++++++++++ .../store/pubsub/DisconnectMessage.java | 38 ++++++++++++ .../store/pubsub/HandshakeMessage.java | 38 ++++++++++++ .../socketio/store/pubsub/PubSubStore.java | 7 +++ 12 files changed, 223 insertions(+), 37 deletions(-) create mode 100644 src/main/java/com/corundumstudio/socketio/store/pubsub/ConnectMessage.java create mode 100644 src/main/java/com/corundumstudio/socketio/store/pubsub/DisconnectMessage.java create mode 100644 src/main/java/com/corundumstudio/socketio/store/pubsub/HandshakeMessage.java diff --git a/.gitignore b/.gitignore index 0f373f7..1e6c912 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ /.classpath /.project /target + +/gnupg diff --git a/src/main/java/com/corundumstudio/socketio/HandshakeData.java b/src/main/java/com/corundumstudio/socketio/HandshakeData.java index 61f887c..e9c1553 100644 --- a/src/main/java/com/corundumstudio/socketio/HandshakeData.java +++ b/src/main/java/com/corundumstudio/socketio/HandshakeData.java @@ -15,12 +15,15 @@ */ package com.corundumstudio.socketio; +import java.io.Serializable; import java.net.InetSocketAddress; import java.util.Date; import java.util.List; import java.util.Map; -public final class HandshakeData { +public final class HandshakeData implements Serializable { + + private static final long serialVersionUID = 1196350300161819978L; private final Map> headers; private final InetSocketAddress address; diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java b/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java index 3218485..0ca1f2e 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java @@ -51,6 +51,8 @@ import com.corundumstudio.socketio.parser.Encoder; import com.corundumstudio.socketio.parser.JsonSupport; import com.corundumstudio.socketio.scheduler.CancelableScheduler; import com.corundumstudio.socketio.store.StoreFactory; +import com.corundumstudio.socketio.store.pubsub.DisconnectMessage; +import com.corundumstudio.socketio.store.pubsub.PubSubStore; import com.corundumstudio.socketio.transport.FlashPolicyHandler; import com.corundumstudio.socketio.transport.FlashSocketTransport; import com.corundumstudio.socketio.transport.MainBaseClient; @@ -123,7 +125,7 @@ public class SocketIOChannelInitializer extends ChannelInitializer impl authorizeHandler = new AuthorizeHandler(connectPath, scheduler, configuration, namespacesHub); StoreFactory factory = configuration.getStoreFactory(); - factory.init(namespacesHub, jsonSupport); + factory.init(namespacesHub, authorizeHandler, jsonSupport); xhrPollingTransport = new XHRPollingTransport(connectPath, ackManager, this, scheduler, authorizeHandler, configuration); webSocketTransport = new WebSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler, factory); @@ -201,6 +203,9 @@ public class SocketIOChannelInitializer extends ChannelInitializer impl flashSocketTransport.onDisconnect(client); authorizeHandler.onDisconnect(client); configuration.getStoreFactory().onDisconnect(client); + + configuration.getStoreFactory().getPubSubStore().publish(PubSubStore.DISCONNECT, new DisconnectMessage(client.getSessionId())); + log.debug("Client with sessionId: {} disconnected", client.getSessionId()); } diff --git a/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java b/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java index ffbf90b..aa28346 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java +++ b/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java @@ -54,6 +54,9 @@ import com.corundumstudio.socketio.parser.PacketType; import com.corundumstudio.socketio.scheduler.CancelableScheduler; import com.corundumstudio.socketio.scheduler.SchedulerKey; import com.corundumstudio.socketio.scheduler.SchedulerKey.Type; +import com.corundumstudio.socketio.store.pubsub.ConnectMessage; +import com.corundumstudio.socketio.store.pubsub.HandshakeMessage; +import com.corundumstudio.socketio.store.pubsub.PubSubStore; import com.corundumstudio.socketio.transport.MainBaseClient; @Sharable @@ -103,16 +106,6 @@ public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Di private void authorize(Channel channel, String origin, Map> params, FullHttpRequest req) throws IOException { - UUID sessionId = UUID.randomUUID(); - authorizedSessionIds.add(sessionId); - - scheduleDisconnect(channel, sessionId); - - String heartbeatTimeoutVal = String.valueOf(configuration.getHeartbeatTimeout()); - if (!configuration.isHeartbeatsEnabled()) { - heartbeatTimeoutVal = ""; - } - Map> headers = new HashMap>(req.headers().names().size()); for (String name : req.headers().names()) { List values = req.headers().getAll(name); @@ -125,15 +118,22 @@ public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Di boolean result = configuration.getAuthorizationListener().isAuthorized(data); if (result) { - String msg = sessionId + ":" + heartbeatTimeoutVal + ":" + configuration.getCloseTimeout() + ":" + configuration.getTransports(); + UUID sessionId = UUID.randomUUID(); + + scheduleDisconnect(channel, sessionId); + + String msg = createHandshake(sessionId); List jsonpParams = params.get("jsonp"); String jsonpParam = null; if (jsonpParams != null) { jsonpParam = jsonpParams.get(0); } - channel.write(new AuthorizeMessage(msg, jsonpParam, origin, sessionId)); + + handshake(sessionId); + HandshakeMessage message = new HandshakeMessage(sessionId); + configuration.getStoreFactory().getPubSubStore().publish(PubSubStore.HANDSHAKE, message); log.debug("Handshake authorized for sessionId: {}", sessionId); } else { HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.FORBIDDEN); @@ -144,6 +144,15 @@ public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Di } } + private String createHandshake(UUID sessionId) { + String heartbeatTimeoutVal = String.valueOf(configuration.getHeartbeatTimeout()); + if (!configuration.isHeartbeatsEnabled()) { + heartbeatTimeoutVal = ""; + } + String msg = sessionId + ":" + heartbeatTimeoutVal + ":" + configuration.getCloseTimeout() + ":" + configuration.getTransports(); + return msg; + } + private void scheduleDisconnect(Channel channel, final UUID sessionId) { channel.closeFuture().addListener(new ChannelFutureListener() { @Override @@ -164,9 +173,23 @@ public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Di return authorizedSessionIds.contains(sessionId); } - public void connect(MainBaseClient client) { - SchedulerKey key = new SchedulerKey(Type.AUTHORIZE, client.getSessionId()); + public void handshake(UUID sessionId) { + authorizedSessionIds.add(sessionId); + } + + public void connect(UUID sessionId) { + SchedulerKey key = new SchedulerKey(Type.AUTHORIZE, sessionId); disconnectScheduler.cancel(key); + } + + public void disconnect(UUID sessionId) { + authorizedSessionIds.remove(sessionId); + } + + public void connect(MainBaseClient client) { + connect(client.getSessionId()); + configuration.getStoreFactory().getPubSubStore().publish(PubSubStore.CONNECT, new ConnectMessage(client.getSessionId())); + client.send(new Packet(PacketType.CONNECT)); Namespace ns = namespacesHub.get(Namespace.DEFAULT_NAME); @@ -176,7 +199,7 @@ public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Di @Override public void onDisconnect(MainBaseClient client) { - authorizedSessionIds.remove(client.getSessionId()); + disconnect(client.getSessionId()); } } diff --git a/src/main/java/com/corundumstudio/socketio/store/RedisPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/RedisPubSubStore.java index 998b686..ca65f91 100644 --- a/src/main/java/com/corundumstudio/socketio/store/RedisPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/RedisPubSubStore.java @@ -39,7 +39,7 @@ import com.corundumstudio.socketio.store.pubsub.PubSubStore; public class RedisPubSubStore implements PubSubStore { - private final ExecutorService executorService = Executors.newFixedThreadPool(5); + private final ExecutorService executorService = Executors.newFixedThreadPool(6); private final Logger log = LoggerFactory.getLogger(getClass()); diff --git a/src/main/java/com/corundumstudio/socketio/store/RedisStoreFactory.java b/src/main/java/com/corundumstudio/socketio/store/RedisStoreFactory.java index 9619067..4da726b 100644 --- a/src/main/java/com/corundumstudio/socketio/store/RedisStoreFactory.java +++ b/src/main/java/com/corundumstudio/socketio/store/RedisStoreFactory.java @@ -19,6 +19,7 @@ import java.util.UUID; import redis.clients.jedis.Jedis; +import com.corundumstudio.socketio.handler.AuthorizeHandler; import com.corundumstudio.socketio.namespace.NamespacesHub; import com.corundumstudio.socketio.parser.JsonSupport; import com.corundumstudio.socketio.store.pubsub.BaseStoreFactory; @@ -43,14 +44,14 @@ public class RedisStoreFactory extends BaseStoreFactory { } @Override - public void init(NamespacesHub namespacesHub, JsonSupport jsonSupport) { + public void init(NamespacesHub namespacesHub, AuthorizeHandler authorizeHandler, JsonSupport jsonSupport) { pubSubRedisStore = new RedisPubSubStore(redisPub, redisSub, getNodeId(), jsonSupport); redisClient.connect(); redisPub.connect(); redisSub.connect(); - super.init(namespacesHub, jsonSupport); + super.init(namespacesHub, authorizeHandler, jsonSupport); } diff --git a/src/main/java/com/corundumstudio/socketio/store/StoreFactory.java b/src/main/java/com/corundumstudio/socketio/store/StoreFactory.java index 12c5abb..39cf8ce 100644 --- a/src/main/java/com/corundumstudio/socketio/store/StoreFactory.java +++ b/src/main/java/com/corundumstudio/socketio/store/StoreFactory.java @@ -18,6 +18,7 @@ package com.corundumstudio.socketio.store; import java.util.UUID; import com.corundumstudio.socketio.Disconnectable; +import com.corundumstudio.socketio.handler.AuthorizeHandler; import com.corundumstudio.socketio.namespace.NamespacesHub; import com.corundumstudio.socketio.parser.JsonSupport; import com.corundumstudio.socketio.store.pubsub.PubSubStore; @@ -31,7 +32,7 @@ public interface StoreFactory extends Disconnectable { PubSubStore getPubSubStore(); - void init(NamespacesHub namespacesHub, JsonSupport jsonSupport); + void init(NamespacesHub namespacesHub, AuthorizeHandler authorizeHandler, JsonSupport jsonSupport); Store create(UUID sessionId); diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java index 8d4fb21..497093a 100644 --- a/src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java @@ -15,6 +15,10 @@ */ package com.corundumstudio.socketio.store.pubsub; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.corundumstudio.socketio.handler.AuthorizeHandler; import com.corundumstudio.socketio.namespace.NamespacesHub; import com.corundumstudio.socketio.parser.JsonSupport; import com.corundumstudio.socketio.store.StoreFactory; @@ -22,24 +26,47 @@ import com.corundumstudio.socketio.transport.MainBaseClient; public abstract class BaseStoreFactory implements StoreFactory { + private final Logger log = LoggerFactory.getLogger(getClass()); + private Long nodeId = (long) (Math.random() * 1000000); protected Long getNodeId() { return nodeId; } - public void init(final NamespacesHub namespacesHub, JsonSupport jsonSupport) { + public void init(final NamespacesHub namespacesHub, final AuthorizeHandler authorizeHandler, JsonSupport jsonSupport) { + getPubSubStore().subscribe(PubSubStore.DISCONNECT, new PubSubListener() { + @Override + public void onMessage(DisconnectMessage msg) { + authorizeHandler.disconnect(msg.getSessionId()); + log.debug("{} sessionId: {}", PubSubStore.DISCONNECT, msg.getSessionId()); + } + }, DisconnectMessage.class); + + getPubSubStore().subscribe(PubSubStore.CONNECT, new PubSubListener() { + @Override + public void onMessage(ConnectMessage msg) { + authorizeHandler.connect(msg.getSessionId()); + log.debug("{} sessionId: {}", PubSubStore.CONNECT, msg.getSessionId()); + } + }, ConnectMessage.class); + + getPubSubStore().subscribe(PubSubStore.HANDSHAKE, new PubSubListener() { + @Override + public void onMessage(HandshakeMessage msg) { + authorizeHandler.handshake(msg.getSessionId()); + log.debug("{} sessionId: {}", PubSubStore.HANDSHAKE, msg.getSessionId()); + } + }, HandshakeMessage.class); + getPubSubStore().subscribe(PubSubStore.DISPATCH, new PubSubListener() { @Override public void onMessage(DispatchMessage msg) { String name = msg.getRoom(); - String[] parts = name.split("/"); - String namespaceName = name; - if (parts.length > 1) { - namespaceName = parts[0]; - } + String namespaceName = extractNamespaceName(name); namespacesHub.get(namespaceName).dispatch(name, msg.getPacket()); + log.debug("{} packet: {}", PubSubStore.DISPATCH, msg.getPacket()); } }, DispatchMessage.class); @@ -48,12 +75,9 @@ public abstract class BaseStoreFactory implements StoreFactory { public void onMessage(JoinLeaveMessage msg) { String name = msg.getRoom(); - String[] parts = name.split("/"); - String namespaceName = name; - if (parts.length > 1) { - namespaceName = parts[0]; - } + String namespaceName = extractNamespaceName(name); namespacesHub.get(namespaceName).join(name, msg.getSessionId()); + log.debug("{} sessionId: {}", PubSubStore.JOIN, msg.getSessionId()); } }, JoinLeaveMessage.class); @@ -62,12 +86,9 @@ public abstract class BaseStoreFactory implements StoreFactory { public void onMessage(JoinLeaveMessage msg) { String name = msg.getRoom(); - String[] parts = name.split("/"); - String namespaceName = name; - if (parts.length > 1) { - namespaceName = parts[0]; - } + String namespaceName = extractNamespaceName(name); namespacesHub.get(namespaceName).leave(name, msg.getSessionId()); + log.debug("{} sessionId: {}", PubSubStore.LEAVE, msg.getSessionId()); } }, JoinLeaveMessage.class); } @@ -79,4 +100,13 @@ public abstract class BaseStoreFactory implements StoreFactory { public void onDisconnect(MainBaseClient client) { } + private String extractNamespaceName(String name) { + String[] parts = name.split("/"); + String namespaceName = name; + if (parts.length > 1) { + namespaceName = parts[0]; + } + return namespaceName; + } + } diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/ConnectMessage.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/ConnectMessage.java new file mode 100644 index 0000000..04d716b --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/ConnectMessage.java @@ -0,0 +1,38 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio.store.pubsub; + +import java.util.UUID; + +public class ConnectMessage extends PubSubMessage { + + private static final long serialVersionUID = 3108918714495865101L; + + private UUID sessionId; + + public ConnectMessage() { + } + + public ConnectMessage(UUID sessionId) { + super(); + this.sessionId = sessionId; + } + + public UUID getSessionId() { + return sessionId; + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/DisconnectMessage.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/DisconnectMessage.java new file mode 100644 index 0000000..3067a30 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/DisconnectMessage.java @@ -0,0 +1,38 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio.store.pubsub; + +import java.util.UUID; + +public class DisconnectMessage extends PubSubMessage { + + private static final long serialVersionUID = -2763553673397520368L; + + private UUID sessionId; + + public DisconnectMessage() { + } + + public DisconnectMessage(UUID sessionId) { + super(); + this.sessionId = sessionId; + } + + public UUID getSessionId() { + return sessionId; + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/HandshakeMessage.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/HandshakeMessage.java new file mode 100644 index 0000000..2edd257 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/HandshakeMessage.java @@ -0,0 +1,38 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio.store.pubsub; + +import java.util.UUID; + +public class HandshakeMessage extends PubSubMessage { + + private static final long serialVersionUID = 5767127795325210150L; + + private UUID sessionId; + + public HandshakeMessage() { + } + + public HandshakeMessage(UUID sessionId) { + super(); + this.sessionId = sessionId; + } + + public UUID getSessionId() { + return sessionId; + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java index 81e9e0c..a78677b 100644 --- a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java @@ -18,6 +18,13 @@ package com.corundumstudio.socketio.store.pubsub; public interface PubSubStore { + // TODO refactor to enum + String DISCONNECT = "disconnect"; + + String CONNECT = "connect"; + + String HANDSHAKE = "handshake"; + String JOIN = "join"; String LEAVE = "leave";