Browse Source

Merge branch 'master' of git://github.com/RaduCirstoiu/netty-socketio

master
daedalushammer 13 years ago
parent
commit
2e9b5ef41e
  1. 62
      src/main/java/com/corundumstudio/socketio/BroadcastOperations.java
  2. 11
      src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java
  3. 2
      src/main/java/com/corundumstudio/socketio/scheduler/CancelableScheduler.java

62
src/main/java/com/corundumstudio/socketio/BroadcastOperations.java

@ -15,10 +15,56 @@
*/
package com.corundumstudio.socketio;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.corundumstudio.socketio.parser.Packet;
public class BroadcastOperations implements ClientOperations {
private class BroadcastAckCallback extends AckCallback {
final AtomicBoolean loopFinished = new AtomicBoolean();
final AtomicInteger counter = new AtomicInteger();
final AtomicBoolean timeoutExecuted = new AtomicBoolean();
final AtomicBoolean successExecuted = new AtomicBoolean();
final AckCallback ackCallback;
public BroadcastAckCallback(AckCallback ackCallback) {
this.ackCallback = ackCallback;
}
@Override
public void onSuccess() {
counter.getAndDecrement();
executeSuccess();
}
private void executeSuccess() {
if (loopFinished.get() && counter.get() == 0 && successExecuted.compareAndSet(false, true)) {
ackCallback.onSuccess();
}
}
@Override
public void onTimeout() {
// execute onTimeout once
if (timeoutExecuted.compareAndSet(false, true)) {
ackCallback.onTimeout();
}
}
void incrementCounter() {
counter.getAndIncrement();
}
void loopFinished() {
loopFinished.set(true);
executeSuccess();
}
}
private final Iterable<SocketIOClient> clients;
public BroadcastOperations(Iterable<SocketIOClient> clients) {
@ -35,9 +81,12 @@ public class BroadcastOperations implements ClientOperations {
@Override
public void sendMessage(String message, AckCallback ackCallback) {
BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback);
for (SocketIOClient client : clients) {
client.sendMessage(message, ackCallback);
clientCallback.incrementCounter();
client.sendMessage(message, clientCallback);
}
clientCallback.loopFinished();
}
@Override
@ -49,9 +98,12 @@ public class BroadcastOperations implements ClientOperations {
@Override
public void sendJsonObject(Object object, AckCallback ackCallback) {
BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback);
for (SocketIOClient client : clients) {
client.sendJsonObject(object, ackCallback);
clientCallback.incrementCounter();
client.sendJsonObject(object, clientCallback);
}
clientCallback.loopFinished();
}
@Override
@ -63,9 +115,12 @@ public class BroadcastOperations implements ClientOperations {
@Override
public void send(Packet packet, AckCallback ackCallback) {
BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback);
for (SocketIOClient client : clients) {
clientCallback.incrementCounter();
client.send(packet, ackCallback);
}
clientCallback.loopFinished();
}
@Override
@ -84,9 +139,12 @@ public class BroadcastOperations implements ClientOperations {
@Override
public void sendEvent(String name, Object data, AckCallback ackCallback) {
BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback);
for (SocketIOClient client : clients) {
clientCallback.incrementCounter();
client.sendEvent(name, data, ackCallback);
}
clientCallback.loopFinished();
}
}

11
src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java

@ -44,17 +44,20 @@ public class HeartbeatHandler implements Disconnectable {
return;
}
scheduler.cancel(new SchedulerKey(Type.HEARBEAT_TIMEOUT, client.getSessionId()));
final SchedulerKey key = new SchedulerKey(Type.HEARBEAT_TIMEOUT, client.getSessionId());
// cancel heartbeat check because the client answered
scheduler.cancel(key);
scheduler.schedule(new Runnable() {
public void run() {
client.send(new Packet(PacketType.HEARTBEAT));
scheduleClientHeartbeatCheck(client);
scheduleClientHeartbeatCheck(client, key);
}
}, configuration.getHeartbeatInterval(), TimeUnit.SECONDS);
}
private void scheduleClientHeartbeatCheck(final BaseClient client) {
SchedulerKey key = new SchedulerKey(Type.HEARBEAT_TIMEOUT, client.getSessionId());
private void scheduleClientHeartbeatCheck(final BaseClient client, SchedulerKey key) {
// cancel previous heartbeat check
scheduler.cancel(key);
scheduler.schedule(key, new Runnable() {
public void run() {
client.disconnect();

2
src/main/java/com/corundumstudio/socketio/scheduler/CancelableScheduler.java

@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
public class CancelableScheduler {
private final Map<Object, Future<?>> scheduledFutures = new ConcurrentHashMap<Object, Future<?>>();
private final Map<SchedulerKey, Future<?>> scheduledFutures = new ConcurrentHashMap<SchedulerKey, Future<?>>();
private final ScheduledExecutorService executorService;
public CancelableScheduler(int threadPoolSize) {

Loading…
Cancel
Save