Browse Source

Rooms handling fixed.

master
Nikita 11 years ago
parent
commit
8296295fb8
  1. 22
      src/main/java/com/corundumstudio/socketio/BroadcastOperations.java
  2. 20
      src/main/java/com/corundumstudio/socketio/namespace/Namespace.java
  3. 18
      src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java
  4. 8
      src/main/java/com/corundumstudio/socketio/store/pubsub/DispatchMessage.java
  5. 8
      src/main/java/com/corundumstudio/socketio/store/pubsub/JoinLeaveMessage.java

22
src/main/java/com/corundumstudio/socketio/BroadcastOperations.java

@ -15,11 +15,13 @@
*/
package com.corundumstudio.socketio;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.Map.Entry;
import com.corundumstudio.socketio.misc.IterableCollection;
import com.corundumstudio.socketio.namespace.Namespace;
@ -36,7 +38,7 @@ import com.corundumstudio.socketio.store.pubsub.PubSubStore;
public class BroadcastOperations implements ClientOperations {
private final Iterable<SocketIOClient> clients;
private final Set<String> namespaceRooms = new HashSet<String>();
private final Map<String, List<String>> namespaceRooms = new HashMap<String, List<String>>();
private final StoreFactory storeFactory;
public BroadcastOperations(Iterable<SocketIOClient> clients, StoreFactory storeFactory) {
@ -45,14 +47,22 @@ public class BroadcastOperations implements ClientOperations {
for (SocketIOClient socketIOClient : clients) {
Namespace namespace = (Namespace)socketIOClient.getNamespace();
List<String> rooms = namespace.getRooms(socketIOClient);
namespaceRooms.addAll(rooms);
List<String> roomsList = namespaceRooms.get(namespace.getName());
if (roomsList == null) {
roomsList = new ArrayList<String>();
namespaceRooms.put(namespace.getName(), roomsList);
}
roomsList.addAll(rooms);
}
this.storeFactory = storeFactory;
}
private void dispatch(Packet packet) {
for (String room : namespaceRooms) {
storeFactory.pubSubStore().publish(PubSubStore.DISPATCH, new DispatchMessage(room, packet));
for (Entry<String, List<String>> entry : namespaceRooms.entrySet()) {
for (String room : entry.getValue()) {
storeFactory.pubSubStore().publish(PubSubStore.DISPATCH, new DispatchMessage(room, packet, entry.getKey()));
}
}
}

20
src/main/java/com/corundumstudio/socketio/namespace/Namespace.java

@ -40,7 +40,6 @@ import com.corundumstudio.socketio.listener.DisconnectListener;
import com.corundumstudio.socketio.parser.JsonSupport;
import com.corundumstudio.socketio.parser.Packet;
import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.store.pubsub.DispatchMessage;
import com.corundumstudio.socketio.store.pubsub.JoinLeaveMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.transport.NamespaceClient;
@ -162,7 +161,7 @@ public class Namespace implements SocketIONamespace {
allClients.remove(client);
leave(getName(), client.getSessionId());
storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName()));
storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
}
@Override
@ -180,7 +179,7 @@ public class Namespace implements SocketIONamespace {
}
join(getName(), client.getSessionId());
storeFactory.pubSubStore().publish(PubSubStore.JOIN, new JoinLeaveMessage(client.getSessionId(), getName()));
storeFactory.pubSubStore().publish(PubSubStore.JOIN, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
}
@Override
@ -235,17 +234,8 @@ public class Namespace implements SocketIONamespace {
}
public void joinRoom(String room, UUID sessionId) {
room += getName() + "/" + room;
join(room, sessionId);
storeFactory.pubSubStore().publish(PubSubStore.JOIN, new JoinLeaveMessage(sessionId, room));
}
public void doDispatch(String room, Packet packet) {
if (room != null && !room.isEmpty()) {
room += getName() + "/" + room;
}
storeFactory.pubSubStore().publish(PubSubStore.DISPATCH, new DispatchMessage(room, packet));
storeFactory.pubSubStore().publish(PubSubStore.JOIN, new JoinLeaveMessage(sessionId, room, getName()));
}
public void dispatch(String room, Packet packet) {
@ -274,10 +264,8 @@ public class Namespace implements SocketIONamespace {
}
public void leaveRoom(String room, UUID sessionId) {
room += getName() + "/" + room;
leave(room, sessionId);
storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(sessionId, room));
storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(sessionId, room, getName()));
}
public void leave(String room, UUID sessionId) {

18
src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java

@ -55,8 +55,7 @@ public abstract class BaseStoreFactory implements StoreFactory {
public void onMessage(DispatchMessage msg) {
String name = msg.getRoom();
String namespaceName = extractNamespaceName(name);
namespacesHub.get(namespaceName).dispatch(name, msg.getPacket());
namespacesHub.get(msg.getNamespace()).dispatch(name, msg.getPacket());
log.debug("{} packet: {}", PubSubStore.DISPATCH, msg.getPacket());
}
}, DispatchMessage.class);
@ -66,8 +65,7 @@ public abstract class BaseStoreFactory implements StoreFactory {
public void onMessage(JoinLeaveMessage msg) {
String name = msg.getRoom();
String namespaceName = extractNamespaceName(name);
namespacesHub.get(namespaceName).join(name, msg.getSessionId());
namespacesHub.get(msg.getNamespace()).join(name, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubStore.JOIN, msg.getSessionId());
}
}, JoinLeaveMessage.class);
@ -77,8 +75,7 @@ public abstract class BaseStoreFactory implements StoreFactory {
public void onMessage(JoinLeaveMessage msg) {
String name = msg.getRoom();
String namespaceName = extractNamespaceName(name);
namespacesHub.get(namespaceName).leave(name, msg.getSessionId());
namespacesHub.get(msg.getNamespace()).leave(name, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubStore.LEAVE, msg.getSessionId());
}
}, JoinLeaveMessage.class);
@ -91,15 +88,6 @@ public abstract class BaseStoreFactory implements StoreFactory {
public void onDisconnect(MainBaseClient client) {
}
private String extractNamespaceName(String name) {
String[] parts = name.split("/");
String namespaceName = name;
if (parts.length > 1) {
namespaceName = parts[0];
}
return namespaceName;
}
@Override
public String toString() {
return getClass().getSimpleName() + " (distributed session store, distributed publish/subscribe)";

8
src/main/java/com/corundumstudio/socketio/store/pubsub/DispatchMessage.java

@ -22,14 +22,20 @@ public class DispatchMessage extends PubSubMessage {
private static final long serialVersionUID = 6692047718303934349L;
private String room;
private String namespace;
private Packet packet;
public DispatchMessage() {
}
public DispatchMessage(String room, Packet packet) {
public DispatchMessage(String room, Packet packet, String namespace) {
this.room = room;
this.packet = packet;
this.namespace = namespace;
}
public String getNamespace() {
return namespace;
}
public Packet getPacket() {

8
src/main/java/com/corundumstudio/socketio/store/pubsub/JoinLeaveMessage.java

@ -22,15 +22,21 @@ public class JoinLeaveMessage extends PubSubMessage {
private static final long serialVersionUID = -944515928988033174L;
private UUID sessionId;
private String namespace;
private String room;
public JoinLeaveMessage() {
}
public JoinLeaveMessage(UUID id, String room) {
public JoinLeaveMessage(UUID id, String room, String namespace) {
super();
this.sessionId = id;
this.room = room;
this.namespace = namespace;
}
public String getNamespace() {
return namespace;
}
public UUID getSessionId() {

Loading…
Cancel
Save