Browse Source

disconnected fixed

master
Nikita 11 years ago
parent
commit
5153b51d1d
  1. 9
      src/main/java/com/corundumstudio/socketio/namespace/Namespace.java
  2. 13
      src/main/java/com/corundumstudio/socketio/transport/MainBaseClient.java
  3. 16
      src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java

9
src/main/java/com/corundumstudio/socketio/namespace/Namespace.java

@ -178,6 +178,11 @@ public class Namespace implements SocketIONamespace {
} }
public void onDisconnect(SocketIOClient client) { public void onDisconnect(SocketIOClient client) {
allClients.remove(client.getSessionId());
leave(getName(), client.getSessionId());
storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
for (DisconnectListener listener : disconnectListeners) { for (DisconnectListener listener : disconnectListeners) {
try { try {
listener.onDisconnect(client); listener.onDisconnect(client);
@ -185,10 +190,6 @@ public class Namespace implements SocketIONamespace {
log.error("Can't execute onDisconnect listener", e); log.error("Can't execute onDisconnect listener", e);
} }
} }
allClients.remove(client.getSessionId());
leave(getName(), client.getSessionId());
storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
} }
@Override @Override

13
src/main/java/com/corundumstudio/socketio/transport/MainBaseClient.java

@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import com.corundumstudio.socketio.DisconnectableHub; import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.HandshakeData; import com.corundumstudio.socketio.HandshakeData;
@ -49,7 +50,8 @@ public abstract class MainBaseClient {
private final ConcurrentMap<Namespace, SocketIOClient> namespaceClients = new ConcurrentHashMap<Namespace, SocketIOClient>(); private final ConcurrentMap<Namespace, SocketIOClient> namespaceClients = new ConcurrentHashMap<Namespace, SocketIOClient>();
private final Store store; private final Store store;
private final DisconnectableHub disconnectable;
private final AtomicBoolean disconnected = new AtomicBoolean();
private final DisconnectableHub disconnectableHub;
private final AckManager ackManager; private final AckManager ackManager;
private final UUID sessionId; private final UUID sessionId;
private final Transport transport; private final Transport transport;
@ -60,7 +62,7 @@ public abstract class MainBaseClient {
Transport transport, StoreFactory storeFactory, HandshakeData handshakeData) { Transport transport, StoreFactory storeFactory, HandshakeData handshakeData) {
this.sessionId = sessionId; this.sessionId = sessionId;
this.ackManager = ackManager; this.ackManager = ackManager;
this.disconnectable = disconnectable;
this.disconnectableHub = disconnectable;
this.transport = transport; this.transport = transport;
this.store = storeFactory.createStore(sessionId); this.store = storeFactory.createStore(sessionId);
this.handshakeData = handshakeData; this.handshakeData = handshakeData;
@ -75,7 +77,7 @@ public abstract class MainBaseClient {
public void removeChildClient(SocketIOClient client) { public void removeChildClient(SocketIOClient client) {
namespaceClients.remove((Namespace) client.getNamespace()); namespaceClients.remove((Namespace) client.getNamespace());
if (namespaceClients.isEmpty()) { if (namespaceClients.isEmpty()) {
disconnectable.onDisconnect(this);
disconnectableHub.onDisconnect(this);
} }
} }
@ -93,7 +95,12 @@ public abstract class MainBaseClient {
return namespaceClients.values(); return namespaceClients.values();
} }
public boolean isConnected() {
return !disconnected.get();
}
public void onChannelDisconnect() { public void onChannelDisconnect() {
disconnected.set(true);
for (SocketIOClient client : getAllChildClients()) { for (SocketIOClient client : getAllChildClients()) {
((NamespaceClient) client).onDisconnect(); ((NamespaceClient) client).onDisconnect();
} }

16
src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java

@ -19,6 +19,7 @@ import java.net.SocketAddress;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import com.corundumstudio.socketio.AckCallback; import com.corundumstudio.socketio.AckCallback;
import com.corundumstudio.socketio.HandshakeData; import com.corundumstudio.socketio.HandshakeData;
@ -30,6 +31,7 @@ import com.corundumstudio.socketio.parser.PacketType;
public class NamespaceClient implements SocketIOClient { public class NamespaceClient implements SocketIOClient {
private final AtomicBoolean disconnected = new AtomicBoolean();
private final MainBaseClient baseClient; private final MainBaseClient baseClient;
private final Namespace namespace; private final Namespace namespace;
@ -95,8 +97,15 @@ public class NamespaceClient implements SocketIOClient {
send(packet); send(packet);
} }
private boolean isConnected() {
return !disconnected.get() && baseClient.isConnected();
}
@Override @Override
public void send(Packet packet, AckCallback<?> ackCallback) { public void send(Packet packet, AckCallback<?> ackCallback) {
if (!isConnected()) {
return;
}
long index = baseClient.getAckManager().registerAck(getSessionId(), ackCallback); long index = baseClient.getAckManager().registerAck(getSessionId(), ackCallback);
packet.setId(index); packet.setId(index);
if (!ackCallback.getResultClass().equals(Void.class)) { if (!ackCallback.getResultClass().equals(Void.class)) {
@ -107,6 +116,9 @@ public class NamespaceClient implements SocketIOClient {
@Override @Override
public void send(Packet packet) { public void send(Packet packet) {
if (!isConnected()) {
return;
}
packet.setEndpoint(namespace.getName()); packet.setEndpoint(namespace.getName());
baseClient.send(packet); baseClient.send(packet);
} }
@ -119,8 +131,10 @@ public class NamespaceClient implements SocketIOClient {
} }
public void onDisconnect() { public void onDisconnect() {
namespace.onDisconnect(this);
disconnected.set(true);
baseClient.removeChildClient(this); baseClient.removeChildClient(this);
namespace.onDisconnect(this);
} }
@Override @Override

Loading…
Cancel
Save