Browse Source

Distributed authorization support for websocket transport. Issue #96

master
Nikita 12 years ago
parent
commit
6460f343bc
  1. 2
      .gitignore
  2. 5
      src/main/java/com/corundumstudio/socketio/HandshakeData.java
  3. 7
      src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java
  4. 53
      src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java
  5. 2
      src/main/java/com/corundumstudio/socketio/store/RedisPubSubStore.java
  6. 5
      src/main/java/com/corundumstudio/socketio/store/RedisStoreFactory.java
  7. 3
      src/main/java/com/corundumstudio/socketio/store/StoreFactory.java
  8. 62
      src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java
  9. 38
      src/main/java/com/corundumstudio/socketio/store/pubsub/ConnectMessage.java
  10. 38
      src/main/java/com/corundumstudio/socketio/store/pubsub/DisconnectMessage.java
  11. 38
      src/main/java/com/corundumstudio/socketio/store/pubsub/HandshakeMessage.java
  12. 7
      src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java

2
.gitignore

@ -3,3 +3,5 @@
/.classpath
/.project
/target
/gnupg

5
src/main/java/com/corundumstudio/socketio/HandshakeData.java

@ -15,12 +15,15 @@
*/
package com.corundumstudio.socketio;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.Date;
import java.util.List;
import java.util.Map;
public final class HandshakeData {
public final class HandshakeData implements Serializable {
private static final long serialVersionUID = 1196350300161819978L;
private final Map<String, List<String>> headers;
private final InetSocketAddress address;

7
src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java

@ -51,6 +51,8 @@ import com.corundumstudio.socketio.parser.Encoder;
import com.corundumstudio.socketio.parser.JsonSupport;
import com.corundumstudio.socketio.scheduler.CancelableScheduler;
import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.store.pubsub.DisconnectMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.transport.FlashPolicyHandler;
import com.corundumstudio.socketio.transport.FlashSocketTransport;
import com.corundumstudio.socketio.transport.MainBaseClient;
@ -123,7 +125,7 @@ public class SocketIOChannelInitializer extends ChannelInitializer<Channel> impl
authorizeHandler = new AuthorizeHandler(connectPath, scheduler, configuration, namespacesHub);
StoreFactory factory = configuration.getStoreFactory();
factory.init(namespacesHub, jsonSupport);
factory.init(namespacesHub, authorizeHandler, jsonSupport);
xhrPollingTransport = new XHRPollingTransport(connectPath, ackManager, this, scheduler, authorizeHandler, configuration);
webSocketTransport = new WebSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler, factory);
@ -201,6 +203,9 @@ public class SocketIOChannelInitializer extends ChannelInitializer<Channel> impl
flashSocketTransport.onDisconnect(client);
authorizeHandler.onDisconnect(client);
configuration.getStoreFactory().onDisconnect(client);
configuration.getStoreFactory().getPubSubStore().publish(PubSubStore.DISCONNECT, new DisconnectMessage(client.getSessionId()));
log.debug("Client with sessionId: {} disconnected", client.getSessionId());
}

53
src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java

@ -54,6 +54,9 @@ import com.corundumstudio.socketio.parser.PacketType;
import com.corundumstudio.socketio.scheduler.CancelableScheduler;
import com.corundumstudio.socketio.scheduler.SchedulerKey;
import com.corundumstudio.socketio.scheduler.SchedulerKey.Type;
import com.corundumstudio.socketio.store.pubsub.ConnectMessage;
import com.corundumstudio.socketio.store.pubsub.HandshakeMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.transport.MainBaseClient;
@Sharable
@ -103,16 +106,6 @@ public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Di
private void authorize(Channel channel, String origin, Map<String, List<String>> params, FullHttpRequest req)
throws IOException {
UUID sessionId = UUID.randomUUID();
authorizedSessionIds.add(sessionId);
scheduleDisconnect(channel, sessionId);
String heartbeatTimeoutVal = String.valueOf(configuration.getHeartbeatTimeout());
if (!configuration.isHeartbeatsEnabled()) {
heartbeatTimeoutVal = "";
}
Map<String, List<String>> headers = new HashMap<String, List<String>>(req.headers().names().size());
for (String name : req.headers().names()) {
List<String> values = req.headers().getAll(name);
@ -125,15 +118,22 @@ public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Di
boolean result = configuration.getAuthorizationListener().isAuthorized(data);
if (result) {
String msg = sessionId + ":" + heartbeatTimeoutVal + ":" + configuration.getCloseTimeout() + ":" + configuration.getTransports();
UUID sessionId = UUID.randomUUID();
scheduleDisconnect(channel, sessionId);
String msg = createHandshake(sessionId);
List<String> jsonpParams = params.get("jsonp");
String jsonpParam = null;
if (jsonpParams != null) {
jsonpParam = jsonpParams.get(0);
}
channel.write(new AuthorizeMessage(msg, jsonpParam, origin, sessionId));
handshake(sessionId);
HandshakeMessage message = new HandshakeMessage(sessionId);
configuration.getStoreFactory().getPubSubStore().publish(PubSubStore.HANDSHAKE, message);
log.debug("Handshake authorized for sessionId: {}", sessionId);
} else {
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.FORBIDDEN);
@ -144,6 +144,15 @@ public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Di
}
}
private String createHandshake(UUID sessionId) {
String heartbeatTimeoutVal = String.valueOf(configuration.getHeartbeatTimeout());
if (!configuration.isHeartbeatsEnabled()) {
heartbeatTimeoutVal = "";
}
String msg = sessionId + ":" + heartbeatTimeoutVal + ":" + configuration.getCloseTimeout() + ":" + configuration.getTransports();
return msg;
}
private void scheduleDisconnect(Channel channel, final UUID sessionId) {
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override
@ -164,9 +173,23 @@ public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Di
return authorizedSessionIds.contains(sessionId);
}
public void connect(MainBaseClient client) {
SchedulerKey key = new SchedulerKey(Type.AUTHORIZE, client.getSessionId());
public void handshake(UUID sessionId) {
authorizedSessionIds.add(sessionId);
}
public void connect(UUID sessionId) {
SchedulerKey key = new SchedulerKey(Type.AUTHORIZE, sessionId);
disconnectScheduler.cancel(key);
}
public void disconnect(UUID sessionId) {
authorizedSessionIds.remove(sessionId);
}
public void connect(MainBaseClient client) {
connect(client.getSessionId());
configuration.getStoreFactory().getPubSubStore().publish(PubSubStore.CONNECT, new ConnectMessage(client.getSessionId()));
client.send(new Packet(PacketType.CONNECT));
Namespace ns = namespacesHub.get(Namespace.DEFAULT_NAME);
@ -176,7 +199,7 @@ public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Di
@Override
public void onDisconnect(MainBaseClient client) {
authorizedSessionIds.remove(client.getSessionId());
disconnect(client.getSessionId());
}
}

2
src/main/java/com/corundumstudio/socketio/store/RedisPubSubStore.java

@ -39,7 +39,7 @@ import com.corundumstudio.socketio.store.pubsub.PubSubStore;
public class RedisPubSubStore implements PubSubStore {
private final ExecutorService executorService = Executors.newFixedThreadPool(5);
private final ExecutorService executorService = Executors.newFixedThreadPool(6);
private final Logger log = LoggerFactory.getLogger(getClass());

5
src/main/java/com/corundumstudio/socketio/store/RedisStoreFactory.java

@ -19,6 +19,7 @@ import java.util.UUID;
import redis.clients.jedis.Jedis;
import com.corundumstudio.socketio.handler.AuthorizeHandler;
import com.corundumstudio.socketio.namespace.NamespacesHub;
import com.corundumstudio.socketio.parser.JsonSupport;
import com.corundumstudio.socketio.store.pubsub.BaseStoreFactory;
@ -43,14 +44,14 @@ public class RedisStoreFactory extends BaseStoreFactory {
}
@Override
public void init(NamespacesHub namespacesHub, JsonSupport jsonSupport) {
public void init(NamespacesHub namespacesHub, AuthorizeHandler authorizeHandler, JsonSupport jsonSupport) {
pubSubRedisStore = new RedisPubSubStore(redisPub, redisSub, getNodeId(), jsonSupport);
redisClient.connect();
redisPub.connect();
redisSub.connect();
super.init(namespacesHub, jsonSupport);
super.init(namespacesHub, authorizeHandler, jsonSupport);
}

3
src/main/java/com/corundumstudio/socketio/store/StoreFactory.java

@ -18,6 +18,7 @@ package com.corundumstudio.socketio.store;
import java.util.UUID;
import com.corundumstudio.socketio.Disconnectable;
import com.corundumstudio.socketio.handler.AuthorizeHandler;
import com.corundumstudio.socketio.namespace.NamespacesHub;
import com.corundumstudio.socketio.parser.JsonSupport;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
@ -31,7 +32,7 @@ public interface StoreFactory extends Disconnectable {
PubSubStore getPubSubStore();
void init(NamespacesHub namespacesHub, JsonSupport jsonSupport);
void init(NamespacesHub namespacesHub, AuthorizeHandler authorizeHandler, JsonSupport jsonSupport);
Store create(UUID sessionId);

62
src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java

@ -15,6 +15,10 @@
*/
package com.corundumstudio.socketio.store.pubsub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.handler.AuthorizeHandler;
import com.corundumstudio.socketio.namespace.NamespacesHub;
import com.corundumstudio.socketio.parser.JsonSupport;
import com.corundumstudio.socketio.store.StoreFactory;
@ -22,24 +26,47 @@ import com.corundumstudio.socketio.transport.MainBaseClient;
public abstract class BaseStoreFactory implements StoreFactory {
private final Logger log = LoggerFactory.getLogger(getClass());
private Long nodeId = (long) (Math.random() * 1000000);
protected Long getNodeId() {
return nodeId;
}
public void init(final NamespacesHub namespacesHub, JsonSupport jsonSupport) {
public void init(final NamespacesHub namespacesHub, final AuthorizeHandler authorizeHandler, JsonSupport jsonSupport) {
getPubSubStore().subscribe(PubSubStore.DISCONNECT, new PubSubListener<DisconnectMessage>() {
@Override
public void onMessage(DisconnectMessage msg) {
authorizeHandler.disconnect(msg.getSessionId());
log.debug("{} sessionId: {}", PubSubStore.DISCONNECT, msg.getSessionId());
}
}, DisconnectMessage.class);
getPubSubStore().subscribe(PubSubStore.CONNECT, new PubSubListener<ConnectMessage>() {
@Override
public void onMessage(ConnectMessage msg) {
authorizeHandler.connect(msg.getSessionId());
log.debug("{} sessionId: {}", PubSubStore.CONNECT, msg.getSessionId());
}
}, ConnectMessage.class);
getPubSubStore().subscribe(PubSubStore.HANDSHAKE, new PubSubListener<HandshakeMessage>() {
@Override
public void onMessage(HandshakeMessage msg) {
authorizeHandler.handshake(msg.getSessionId());
log.debug("{} sessionId: {}", PubSubStore.HANDSHAKE, msg.getSessionId());
}
}, HandshakeMessage.class);
getPubSubStore().subscribe(PubSubStore.DISPATCH, new PubSubListener<DispatchMessage>() {
@Override
public void onMessage(DispatchMessage msg) {
String name = msg.getRoom();
String[] parts = name.split("/");
String namespaceName = name;
if (parts.length > 1) {
namespaceName = parts[0];
}
String namespaceName = extractNamespaceName(name);
namespacesHub.get(namespaceName).dispatch(name, msg.getPacket());
log.debug("{} packet: {}", PubSubStore.DISPATCH, msg.getPacket());
}
}, DispatchMessage.class);
@ -48,12 +75,9 @@ public abstract class BaseStoreFactory implements StoreFactory {
public void onMessage(JoinLeaveMessage msg) {
String name = msg.getRoom();
String[] parts = name.split("/");
String namespaceName = name;
if (parts.length > 1) {
namespaceName = parts[0];
}
String namespaceName = extractNamespaceName(name);
namespacesHub.get(namespaceName).join(name, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubStore.JOIN, msg.getSessionId());
}
}, JoinLeaveMessage.class);
@ -62,12 +86,9 @@ public abstract class BaseStoreFactory implements StoreFactory {
public void onMessage(JoinLeaveMessage msg) {
String name = msg.getRoom();
String[] parts = name.split("/");
String namespaceName = name;
if (parts.length > 1) {
namespaceName = parts[0];
}
String namespaceName = extractNamespaceName(name);
namespacesHub.get(namespaceName).leave(name, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubStore.LEAVE, msg.getSessionId());
}
}, JoinLeaveMessage.class);
}
@ -79,4 +100,13 @@ public abstract class BaseStoreFactory implements StoreFactory {
public void onDisconnect(MainBaseClient client) {
}
private String extractNamespaceName(String name) {
String[] parts = name.split("/");
String namespaceName = name;
if (parts.length > 1) {
namespaceName = parts[0];
}
return namespaceName;
}
}

38
src/main/java/com/corundumstudio/socketio/store/pubsub/ConnectMessage.java

@ -0,0 +1,38 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.store.pubsub;
import java.util.UUID;
public class ConnectMessage extends PubSubMessage {
private static final long serialVersionUID = 3108918714495865101L;
private UUID sessionId;
public ConnectMessage() {
}
public ConnectMessage(UUID sessionId) {
super();
this.sessionId = sessionId;
}
public UUID getSessionId() {
return sessionId;
}
}

38
src/main/java/com/corundumstudio/socketio/store/pubsub/DisconnectMessage.java

@ -0,0 +1,38 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.store.pubsub;
import java.util.UUID;
public class DisconnectMessage extends PubSubMessage {
private static final long serialVersionUID = -2763553673397520368L;
private UUID sessionId;
public DisconnectMessage() {
}
public DisconnectMessage(UUID sessionId) {
super();
this.sessionId = sessionId;
}
public UUID getSessionId() {
return sessionId;
}
}

38
src/main/java/com/corundumstudio/socketio/store/pubsub/HandshakeMessage.java

@ -0,0 +1,38 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.store.pubsub;
import java.util.UUID;
public class HandshakeMessage extends PubSubMessage {
private static final long serialVersionUID = 5767127795325210150L;
private UUID sessionId;
public HandshakeMessage() {
}
public HandshakeMessage(UUID sessionId) {
super();
this.sessionId = sessionId;
}
public UUID getSessionId() {
return sessionId;
}
}

7
src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java

@ -18,6 +18,13 @@ package com.corundumstudio.socketio.store.pubsub;
public interface PubSubStore {
// TODO refactor to enum
String DISCONNECT = "disconnect";
String CONNECT = "connect";
String HANDSHAKE = "handshake";
String JOIN = "join";
String LEAVE = "leave";

Loading…
Cancel
Save