diff --git a/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java b/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java index 6fa8214..b83fa2b 100644 --- a/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java +++ b/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java @@ -178,6 +178,11 @@ public class Namespace implements SocketIONamespace { } public void onDisconnect(SocketIOClient client) { + allClients.remove(client.getSessionId()); + + leave(getName(), client.getSessionId()); + storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName())); + for (DisconnectListener listener : disconnectListeners) { try { listener.onDisconnect(client); @@ -185,10 +190,6 @@ public class Namespace implements SocketIONamespace { log.error("Can't execute onDisconnect listener", e); } } - allClients.remove(client.getSessionId()); - - leave(getName(), client.getSessionId()); - storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName())); } @Override diff --git a/src/main/java/com/corundumstudio/socketio/transport/MainBaseClient.java b/src/main/java/com/corundumstudio/socketio/transport/MainBaseClient.java index 47bf231..5fec89c 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/MainBaseClient.java +++ b/src/main/java/com/corundumstudio/socketio/transport/MainBaseClient.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import com.corundumstudio.socketio.DisconnectableHub; import com.corundumstudio.socketio.HandshakeData; @@ -49,7 +50,8 @@ public abstract class MainBaseClient { private final ConcurrentMap namespaceClients = new ConcurrentHashMap(); private final Store store; - private final DisconnectableHub disconnectable; + private final AtomicBoolean disconnected = new AtomicBoolean(); + private final DisconnectableHub disconnectableHub; private final AckManager ackManager; private final UUID sessionId; private final Transport transport; @@ -60,7 +62,7 @@ public abstract class MainBaseClient { Transport transport, StoreFactory storeFactory, HandshakeData handshakeData) { this.sessionId = sessionId; this.ackManager = ackManager; - this.disconnectable = disconnectable; + this.disconnectableHub = disconnectable; this.transport = transport; this.store = storeFactory.createStore(sessionId); this.handshakeData = handshakeData; @@ -75,7 +77,7 @@ public abstract class MainBaseClient { public void removeChildClient(SocketIOClient client) { namespaceClients.remove((Namespace) client.getNamespace()); if (namespaceClients.isEmpty()) { - disconnectable.onDisconnect(this); + disconnectableHub.onDisconnect(this); } } @@ -93,7 +95,12 @@ public abstract class MainBaseClient { return namespaceClients.values(); } + public boolean isConnected() { + return !disconnected.get(); + } + public void onChannelDisconnect() { + disconnected.set(true); for (SocketIOClient client : getAllChildClients()) { ((NamespaceClient) client).onDisconnect(); } diff --git a/src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java b/src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java index 1853951..0f1a2e8 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java +++ b/src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java @@ -19,6 +19,7 @@ import java.net.SocketAddress; import java.util.Arrays; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import com.corundumstudio.socketio.AckCallback; import com.corundumstudio.socketio.HandshakeData; @@ -30,6 +31,7 @@ import com.corundumstudio.socketio.parser.PacketType; public class NamespaceClient implements SocketIOClient { + private final AtomicBoolean disconnected = new AtomicBoolean(); private final MainBaseClient baseClient; private final Namespace namespace; @@ -95,8 +97,15 @@ public class NamespaceClient implements SocketIOClient { send(packet); } + private boolean isConnected() { + return !disconnected.get() && baseClient.isConnected(); + } + @Override public void send(Packet packet, AckCallback ackCallback) { + if (!isConnected()) { + return; + } long index = baseClient.getAckManager().registerAck(getSessionId(), ackCallback); packet.setId(index); if (!ackCallback.getResultClass().equals(Void.class)) { @@ -107,6 +116,9 @@ public class NamespaceClient implements SocketIOClient { @Override public void send(Packet packet) { + if (!isConnected()) { + return; + } packet.setEndpoint(namespace.getName()); baseClient.send(packet); } @@ -119,8 +131,10 @@ public class NamespaceClient implements SocketIOClient { } public void onDisconnect() { - namespace.onDisconnect(this); + disconnected.set(true); + baseClient.removeChildClient(this); + namespace.onDisconnect(this); } @Override