diff --git a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java index f0189f6..0de6348 100644 --- a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java +++ b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java @@ -15,10 +15,56 @@ */ package com.corundumstudio.socketio; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + import com.corundumstudio.socketio.parser.Packet; public class BroadcastOperations implements ClientOperations { + private class BroadcastAckCallback extends AckCallback { + + final AtomicBoolean loopFinished = new AtomicBoolean(); + final AtomicInteger counter = new AtomicInteger(); + final AtomicBoolean timeoutExecuted = new AtomicBoolean(); + final AtomicBoolean successExecuted = new AtomicBoolean(); + final AckCallback ackCallback; + + public BroadcastAckCallback(AckCallback ackCallback) { + this.ackCallback = ackCallback; + } + + @Override + public void onSuccess() { + counter.getAndDecrement(); + executeSuccess(); + } + + private void executeSuccess() { + if (loopFinished.get() && counter.get() == 0 && successExecuted.compareAndSet(false, true)) { + ackCallback.onSuccess(); + } + } + + @Override + public void onTimeout() { + // execute onTimeout once + if (timeoutExecuted.compareAndSet(false, true)) { + ackCallback.onTimeout(); + } + } + + void incrementCounter() { + counter.getAndIncrement(); + } + + void loopFinished() { + loopFinished.set(true); + executeSuccess(); + } + + } + private final Iterable clients; public BroadcastOperations(Iterable clients) { @@ -35,9 +81,12 @@ public class BroadcastOperations implements ClientOperations { @Override public void sendMessage(String message, AckCallback ackCallback) { + BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback); for (SocketIOClient client : clients) { - client.sendMessage(message, ackCallback); + clientCallback.incrementCounter(); + client.sendMessage(message, clientCallback); } + clientCallback.loopFinished(); } @Override @@ -49,9 +98,12 @@ public class BroadcastOperations implements ClientOperations { @Override public void sendJsonObject(Object object, AckCallback ackCallback) { + BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback); for (SocketIOClient client : clients) { - client.sendJsonObject(object, ackCallback); + clientCallback.incrementCounter(); + client.sendJsonObject(object, clientCallback); } + clientCallback.loopFinished(); } @Override @@ -63,9 +115,12 @@ public class BroadcastOperations implements ClientOperations { @Override public void send(Packet packet, AckCallback ackCallback) { + BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback); for (SocketIOClient client : clients) { + clientCallback.incrementCounter(); client.send(packet, ackCallback); } + clientCallback.loopFinished(); } @Override @@ -84,9 +139,12 @@ public class BroadcastOperations implements ClientOperations { @Override public void sendEvent(String name, Object data, AckCallback ackCallback) { + BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback); for (SocketIOClient client : clients) { + clientCallback.incrementCounter(); client.sendEvent(name, data, ackCallback); } + clientCallback.loopFinished(); } } diff --git a/src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java b/src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java index b54dfc5..c2a7fd5 100644 --- a/src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java +++ b/src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java @@ -44,17 +44,20 @@ public class HeartbeatHandler implements Disconnectable { return; } - scheduler.cancel(new SchedulerKey(Type.HEARBEAT_TIMEOUT, client.getSessionId())); + final SchedulerKey key = new SchedulerKey(Type.HEARBEAT_TIMEOUT, client.getSessionId()); + // cancel heartbeat check because the client answered + scheduler.cancel(key); scheduler.schedule(new Runnable() { public void run() { client.send(new Packet(PacketType.HEARTBEAT)); - scheduleClientHeartbeatCheck(client); + scheduleClientHeartbeatCheck(client, key); } }, configuration.getHeartbeatInterval(), TimeUnit.SECONDS); } - private void scheduleClientHeartbeatCheck(final BaseClient client) { - SchedulerKey key = new SchedulerKey(Type.HEARBEAT_TIMEOUT, client.getSessionId()); + private void scheduleClientHeartbeatCheck(final BaseClient client, SchedulerKey key) { + // cancel previous heartbeat check + scheduler.cancel(key); scheduler.schedule(key, new Runnable() { public void run() { client.disconnect(); diff --git a/src/main/java/com/corundumstudio/socketio/scheduler/CancelableScheduler.java b/src/main/java/com/corundumstudio/socketio/scheduler/CancelableScheduler.java index ede3d42..71c68c8 100644 --- a/src/main/java/com/corundumstudio/socketio/scheduler/CancelableScheduler.java +++ b/src/main/java/com/corundumstudio/socketio/scheduler/CancelableScheduler.java @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit; public class CancelableScheduler { - private final Map> scheduledFutures = new ConcurrentHashMap>(); + private final Map> scheduledFutures = new ConcurrentHashMap>(); private final ScheduledExecutorService executorService; public CancelableScheduler(int threadPoolSize) {