Browse Source

Broadcast ack sending fixed.

master
Nikita 13 years ago
parent
commit
2a372bab52
  1. 62
      src/main/java/com/corundumstudio/socketio/BroadcastOperations.java

62
src/main/java/com/corundumstudio/socketio/BroadcastOperations.java

@ -15,10 +15,56 @@
*/
package com.corundumstudio.socketio;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.corundumstudio.socketio.parser.Packet;
public class BroadcastOperations implements ClientOperations {
private class BroadcastAckCallback extends AckCallback {
final AtomicBoolean loopFinished = new AtomicBoolean();
final AtomicInteger counter = new AtomicInteger();
final AtomicBoolean timeoutExecuted = new AtomicBoolean();
final AtomicBoolean successExecuted = new AtomicBoolean();
final AckCallback ackCallback;
public BroadcastAckCallback(AckCallback ackCallback) {
this.ackCallback = ackCallback;
}
@Override
public void onSuccess() {
counter.getAndDecrement();
executeSuccess();
}
private void executeSuccess() {
if (loopFinished.get() && counter.get() == 0 && successExecuted.compareAndSet(false, true)) {
ackCallback.onSuccess();
}
}
@Override
public void onTimeout() {
// execute onTimeout once
if (timeoutExecuted.compareAndSet(false, true)) {
ackCallback.onTimeout();
}
}
void incrementCounter() {
counter.getAndIncrement();
}
void loopFinished() {
loopFinished.set(true);
executeSuccess();
}
}
private final Iterable<SocketIOClient> clients;
public BroadcastOperations(Iterable<SocketIOClient> clients) {
@ -35,9 +81,12 @@ public class BroadcastOperations implements ClientOperations {
@Override
public void sendMessage(String message, AckCallback ackCallback) {
BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback);
for (SocketIOClient client : clients) {
client.sendMessage(message, ackCallback);
clientCallback.incrementCounter();
client.sendMessage(message, clientCallback);
}
clientCallback.loopFinished();
}
@Override
@ -49,9 +98,12 @@ public class BroadcastOperations implements ClientOperations {
@Override
public void sendJsonObject(Object object, AckCallback ackCallback) {
BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback);
for (SocketIOClient client : clients) {
client.sendJsonObject(object, ackCallback);
clientCallback.incrementCounter();
client.sendJsonObject(object, clientCallback);
}
clientCallback.loopFinished();
}
@Override
@ -63,9 +115,12 @@ public class BroadcastOperations implements ClientOperations {
@Override
public void send(Packet packet, AckCallback ackCallback) {
BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback);
for (SocketIOClient client : clients) {
clientCallback.incrementCounter();
client.send(packet, ackCallback);
}
clientCallback.loopFinished();
}
@Override
@ -84,9 +139,12 @@ public class BroadcastOperations implements ClientOperations {
@Override
public void sendEvent(String name, Object data, AckCallback ackCallback) {
BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback);
for (SocketIOClient client : clients) {
clientCallback.incrementCounter();
client.sendEvent(name, data, ackCallback);
}
clientCallback.loopFinished();
}
}
Loading…
Cancel
Save