From 2a372bab52382a1b756834900dd5a287c0945545 Mon Sep 17 00:00:00 2001 From: Nikita Date: Sun, 9 Sep 2012 20:16:26 +0400 Subject: [PATCH] Broadcast ack sending fixed. --- .../socketio/BroadcastOperations.java | 62 ++++++++++++++++++- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java index f0189f6..0de6348 100644 --- a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java +++ b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java @@ -15,10 +15,56 @@ */ package com.corundumstudio.socketio; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + import com.corundumstudio.socketio.parser.Packet; public class BroadcastOperations implements ClientOperations { + private class BroadcastAckCallback extends AckCallback { + + final AtomicBoolean loopFinished = new AtomicBoolean(); + final AtomicInteger counter = new AtomicInteger(); + final AtomicBoolean timeoutExecuted = new AtomicBoolean(); + final AtomicBoolean successExecuted = new AtomicBoolean(); + final AckCallback ackCallback; + + public BroadcastAckCallback(AckCallback ackCallback) { + this.ackCallback = ackCallback; + } + + @Override + public void onSuccess() { + counter.getAndDecrement(); + executeSuccess(); + } + + private void executeSuccess() { + if (loopFinished.get() && counter.get() == 0 && successExecuted.compareAndSet(false, true)) { + ackCallback.onSuccess(); + } + } + + @Override + public void onTimeout() { + // execute onTimeout once + if (timeoutExecuted.compareAndSet(false, true)) { + ackCallback.onTimeout(); + } + } + + void incrementCounter() { + counter.getAndIncrement(); + } + + void loopFinished() { + loopFinished.set(true); + executeSuccess(); + } + + } + private final Iterable clients; public BroadcastOperations(Iterable clients) { @@ -35,9 +81,12 @@ public class BroadcastOperations implements ClientOperations { @Override public void sendMessage(String message, AckCallback ackCallback) { + BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback); for (SocketIOClient client : clients) { - client.sendMessage(message, ackCallback); + clientCallback.incrementCounter(); + client.sendMessage(message, clientCallback); } + clientCallback.loopFinished(); } @Override @@ -49,9 +98,12 @@ public class BroadcastOperations implements ClientOperations { @Override public void sendJsonObject(Object object, AckCallback ackCallback) { + BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback); for (SocketIOClient client : clients) { - client.sendJsonObject(object, ackCallback); + clientCallback.incrementCounter(); + client.sendJsonObject(object, clientCallback); } + clientCallback.loopFinished(); } @Override @@ -63,9 +115,12 @@ public class BroadcastOperations implements ClientOperations { @Override public void send(Packet packet, AckCallback ackCallback) { + BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback); for (SocketIOClient client : clients) { + clientCallback.incrementCounter(); client.send(packet, ackCallback); } + clientCallback.loopFinished(); } @Override @@ -84,9 +139,12 @@ public class BroadcastOperations implements ClientOperations { @Override public void sendEvent(String name, Object data, AckCallback ackCallback) { + BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback); for (SocketIOClient client : clients) { + clientCallback.incrementCounter(); client.sendEvent(name, data, ackCallback); } + clientCallback.loopFinished(); } }