Browse Source

PubSub shutdown handling. Issue #66

master
Nikita 12 years ago
parent
commit
6d531b7683
  1. 6
      src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java
  2. 6
      src/main/java/com/corundumstudio/socketio/SocketIOServer.java
  3. 2
      src/main/java/com/corundumstudio/socketio/StoreFactory.java
  4. 10
      src/main/java/com/corundumstudio/socketio/scheduler/CancelableScheduler.java
  5. 4
      src/main/java/com/corundumstudio/socketio/store/MemoryStoreFactory.java
  6. 9
      src/main/java/com/corundumstudio/socketio/store/RedisStoreFactory.java
  7. 4
      src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubMemoryStore.java
  8. 14
      src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubRedisStore.java
  9. 2
      src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java

6
src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java

@ -49,10 +49,6 @@ import com.corundumstudio.socketio.parser.Decoder;
import com.corundumstudio.socketio.parser.Encoder;
import com.corundumstudio.socketio.parser.JsonSupport;
import com.corundumstudio.socketio.scheduler.CancelableScheduler;
import com.corundumstudio.socketio.store.pubsub.DispatchMessage;
import com.corundumstudio.socketio.store.pubsub.JoinLeaveMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.transport.FlashPolicyHandler;
import com.corundumstudio.socketio.transport.FlashSocketTransport;
import com.corundumstudio.socketio.transport.MainBaseClient;
@ -207,6 +203,8 @@ public class SocketIOChannelInitializer extends ChannelInitializer<Channel> impl
}
public void stop() {
StoreFactory factory = configuration.getStoreFactory();
factory.shutdown();
scheduler.shutdown();
}

6
src/main/java/com/corundumstudio/socketio/SocketIOServer.java

@ -125,8 +125,10 @@ public class SocketIOServer implements ClientListeners {
* Stop server
*/
public void stop() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully().syncUninterruptibly();
workerGroup.shutdownGracefully().syncUninterruptibly();
pipelineFactory.stop();
}
public SocketIONamespace addNamespace(String name) {

2
src/main/java/com/corundumstudio/socketio/StoreFactory.java

@ -29,4 +29,6 @@ public interface StoreFactory extends Disconnectable {
Store create(UUID sessionId);
void shutdown();
}

10
src/main/java/com/corundumstudio/socketio/scheduler/CancelableScheduler.java

@ -22,8 +22,13 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CancelableScheduler {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<SchedulerKey, Future<?>> scheduledFutures = new ConcurrentHashMap<SchedulerKey, Future<?>>();
private final ScheduledExecutorService executorService;
@ -58,6 +63,11 @@ public class CancelableScheduler {
public void shutdown() {
executorService.shutdownNow();
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}

4
src/main/java/com/corundumstudio/socketio/store/MemoryStoreFactory.java

@ -36,4 +36,8 @@ public class MemoryStoreFactory extends BaseStoreFactory {
return pubSubMemoryStore;
}
@Override
public void shutdown() {
}
}

9
src/main/java/com/corundumstudio/socketio/store/RedisStoreFactory.java

@ -74,4 +74,13 @@ public class RedisStoreFactory extends BaseStoreFactory {
redisClient.del(client.getSessionId().toString());
}
@Override
public void shutdown() {
pubSubRedisStore.shutdown();
redisClient.disconnect();
redisPub.disconnect();
redisSub.disconnect();
}
}

4
src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubMemoryStore.java

@ -29,4 +29,8 @@ public class PubSubMemoryStore implements PubSubStore {
public void unsubscribe(String name) {
}
@Override
public void shutdown() {
}
}

14
src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubRedisStore.java

@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -35,8 +36,7 @@ import com.corundumstudio.socketio.parser.JsonSupport;
public class PubSubRedisStore implements PubSubStore {
// TODO destroy
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private final ExecutorService executorService = Executors.newFixedThreadPool(5);
private final Logger log = LoggerFactory.getLogger(getClass());
@ -159,4 +159,14 @@ public class PubSubRedisStore implements PubSubStore {
map.remove(name);
}
@Override
public void shutdown() {
executorService.shutdownNow();
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}

2
src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java

@ -31,4 +31,6 @@ public interface PubSubStore {
void unsubscribe(String name);
void shutdown();
}
Loading…
Cancel
Save