From 5d3bb2d2ce7a873a3ac1346e7aed396b4f6e1747 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 30 May 2013 21:26:00 +0400 Subject: [PATCH] Room support. Issue #47 --- .../socketio/BroadcastOperations.java | 7 +++ .../socketio/SocketIOClient.java | 4 ++ .../socketio/SocketIOServer.java | 8 ++- .../socketio/misc/IterableCollection.java | 8 ++- .../socketio/namespace/Namespace.java | 50 +++++++++++++++++-- .../socketio/namespace/NamespacesHub.java | 13 +++++ .../socketio/transport/NamespaceClient.java | 10 ++++ 7 files changed, 93 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java index 7f0d3d7..5840739 100644 --- a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java +++ b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java @@ -15,6 +15,9 @@ */ package com.corundumstudio.socketio; +import java.util.Collection; + +import com.corundumstudio.socketio.misc.IterableCollection; import com.corundumstudio.socketio.parser.Packet; public class BroadcastOperations implements ClientOperations { @@ -26,6 +29,10 @@ public class BroadcastOperations implements ClientOperations { this.clients = clients; } + public Collection getClients() { + return new IterableCollection(clients); + } + @Override public void sendMessage(String message) { for (SocketIOClient client : clients) { diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOClient.java b/src/main/java/com/corundumstudio/socketio/SocketIOClient.java index 0823ec8..1e2d337 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOClient.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOClient.java @@ -91,4 +91,8 @@ public interface SocketIOClient extends ClientOperations { */ boolean isChannelOpen(); + void joinRoom(T roomKey); + + void leaveRoom(T roomKey); + } diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java index 2cc2663..f22abb6 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java @@ -16,6 +16,7 @@ package com.corundumstudio.socketio; import java.net.InetSocketAddress; +import java.util.Collection; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.Channel; @@ -61,7 +62,7 @@ public class SocketIOServer implements ClientListeners { * * @return Iterable object with clients */ - public Iterable getAllClients() { + public Collection getAllClients() { return pipelineFactory.getAllClients(); } @@ -69,6 +70,11 @@ public class SocketIOServer implements ClientListeners { return getBroadcastOperations(pipelineFactory.getAllClients()); } + public BroadcastOperations getRoomOperations(T roomKey) { + Iterable clients = namespacesHub.getRoomClients(roomKey); + return new BroadcastOperations(clients); + } + public BroadcastOperations getBroadcastOperations(Iterable clients) { return new BroadcastOperations(clients); } diff --git a/src/main/java/com/corundumstudio/socketio/misc/IterableCollection.java b/src/main/java/com/corundumstudio/socketio/misc/IterableCollection.java index c973427..a0e8e2a 100644 --- a/src/main/java/com/corundumstudio/socketio/misc/IterableCollection.java +++ b/src/main/java/com/corundumstudio/socketio/misc/IterableCollection.java @@ -22,18 +22,22 @@ public class IterableCollection extends AbstractCollection { private final CompositeIterable iterable; + public IterableCollection(Iterable iterable) { + this(new CompositeIterable(iterable)); + } + public IterableCollection(CompositeIterable iterable) { this.iterable = iterable; } @Override public Iterator iterator() { - return new CompositeIterable(iterable).iterator(); + return new CompositeIterable(iterable).iterator(); } @Override public int size() { - Iterator iterator = new CompositeIterable(iterable).iterator(); + Iterator iterator = new CompositeIterable(iterable).iterator(); int count = 0; while (iterator.hasNext()) { iterator.next(); diff --git a/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java b/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java index 83e492e..91672ed 100644 --- a/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java +++ b/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java @@ -15,6 +15,7 @@ */ package com.corundumstudio.socketio.namespace; +import java.util.Collections; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -43,7 +44,7 @@ public class Namespace implements SocketIONamespace { public static final String DEFAULT_NAME = ""; - private final Set clients = new ConcurrentHashSet(); + private final Set allClients = new ConcurrentHashSet(); private final ConcurrentMap> eventListeners = new ConcurrentHashMap>(); private final ConcurrentMap, Queue>> jsonObjectListeners = @@ -52,6 +53,8 @@ public class Namespace implements SocketIONamespace { private final Queue connectListeners = new ConcurrentLinkedQueue(); private final Queue disconnectListeners = new ConcurrentLinkedQueue(); + private final ConcurrentMap> roomClients = new ConcurrentHashMap>(); + private final String name; private final JsonSupport jsonSupport; @@ -62,7 +65,7 @@ public class Namespace implements SocketIONamespace { } public void addClient(SocketIOClient client) { - clients.add(client); + allClients.add(client); } public String getName() { @@ -136,7 +139,7 @@ public class Namespace implements SocketIONamespace { for (DisconnectListener listener : disconnectListeners) { listener.onDisconnect(client); } - clients.remove(client); + allClients.remove(client); } @Override @@ -161,7 +164,7 @@ public class Namespace implements SocketIONamespace { @Override public BroadcastOperations getBroadcastOperations() { - return new BroadcastOperations(clients); + return new BroadcastOperations(allClients); } @@ -202,4 +205,43 @@ public class Namespace implements SocketIONamespace { engine.scan(this, listeners, listenersClass); } + public void joinRoom(Object roomKey, SocketIOClient namespaceClient) { + Queue clients = roomClients.get(roomKey); + if (clients == null) { + clients = new ConcurrentLinkedQueue(); + Queue oldClients = roomClients.putIfAbsent(roomKey, clients); + if (oldClients != null) { + clients = oldClients; + } + } + clients.add(namespaceClient); + if (clients != roomClients.get(roomKey)) { + // re-join if queue has been replaced + joinRoom(roomKey, namespaceClient); + } + } + + public void leaveRoom(Object roomKey, SocketIOClient namespaceClient) { + Queue clients = roomClients.get(roomKey); + if (clients == null) { + return; + } + clients.remove(namespaceClient); + if (clients.isEmpty()) { + roomClients.remove(roomKey); + // join which was added after queue deletion + for (SocketIOClient socketIOClient : clients) { + joinRoom(roomKey, socketIOClient); + } + } + } + + public Iterable getRoomClients(Object roomKey) { + Queue clients = roomClients.get(roomKey); + if (clients == null) { + return Collections.emptyList(); + } + return clients; + } + } diff --git a/src/main/java/com/corundumstudio/socketio/namespace/NamespacesHub.java b/src/main/java/com/corundumstudio/socketio/namespace/NamespacesHub.java index 83da5ab..66e68be 100644 --- a/src/main/java/com/corundumstudio/socketio/namespace/NamespacesHub.java +++ b/src/main/java/com/corundumstudio/socketio/namespace/NamespacesHub.java @@ -15,9 +15,13 @@ */ package com.corundumstudio.socketio.namespace; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import com.corundumstudio.socketio.SocketIOClient; +import com.corundumstudio.socketio.misc.CompositeIterable; import com.corundumstudio.socketio.parser.JsonSupport; public class NamespacesHub { @@ -41,6 +45,15 @@ public class NamespacesHub { return namespace; } + public Iterable getRoomClients(T roomKey) { + List> allClients = new ArrayList>(); + for (Namespace namespace : namespaces.values()) { + Iterable clients = namespace.getRoomClients(roomKey); + allClients.add(clients); + } + return new CompositeIterable(allClients); + } + public Namespace get(String name) { return namespaces.get(name); } diff --git a/src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java b/src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java index bfce26e..dab2075 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java +++ b/src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java @@ -169,4 +169,14 @@ public class NamespaceClient implements SocketIOClient { return true; } + @Override + public void joinRoom(T roomKey) { + namespace.joinRoom(roomKey, this); + } + + @Override + public void leaveRoom(T roomKey) { + namespace.leaveRoom(roomKey, this); + } + }