Browse Source

AckCallback with timeout introduced

master
Nikita 13 years ago
parent
commit
3baf706a41
  1. 48
      src/main/java/com/corundumstudio/socketio/AckCallback.java
  2. 49
      src/main/java/com/corundumstudio/socketio/AckManager.java
  3. 8
      src/main/java/com/corundumstudio/socketio/SocketIOClient.java
  4. 3
      src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java
  5. 2
      src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java
  6. 9
      src/main/java/com/corundumstudio/socketio/transport/BaseClient.java
  7. 4
      src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

48
src/main/java/com/corundumstudio/socketio/AckCallback.java

@ -0,0 +1,48 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio;
public abstract class AckCallback {
private int timeout = -1;
public AckCallback() {
}
/**
* Creates AckCallback
*
* @param timeout - callback timeout in seconds
*/
public AckCallback(int timeout) {
this.timeout = timeout;
}
public int getTimeout() {
return timeout;
}
public abstract void onSuccess();
/**
* Invoked only once then <code>timeout</code> defined
*
*/
public void onTimeout() {
}
}

49
src/main/java/com/corundumstudio/socketio/AckManager.java

@ -21,28 +21,48 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.corundumstudio.socketio.parser.Packet;
import com.corundumstudio.socketio.scheduler.CancelableScheduler;
import com.corundumstudio.socketio.scheduler.SchedulerKey;
import com.corundumstudio.socketio.scheduler.SchedulerKey.Type;
public class AckManager implements Disconnectable {
private final AtomicLong ackIndex = new AtomicLong();
private final Map<Long, Runnable> ackCallbacks = new ConcurrentHashMap<Long, Runnable>();
private final Map<Long, AckCallback> ackCallbacks = new ConcurrentHashMap<Long, AckCallback>();
private final ConcurrentMap<UUID, Set<Long>> clientCallbackIds = new ConcurrentHashMap<UUID, Set<Long>>();
private final CancelableScheduler scheduler;
public AckManager(CancelableScheduler scheduler) {
super();
this.scheduler = scheduler;
}
public void onAck(SocketIOClient client, Packet packet) {
Runnable callback = ackCallbacks.remove(packet.getAckId());
SchedulerKey key = new SchedulerKey(Type.ACK_TIMEOUT, client.getSessionId());
scheduler.cancel(key);
AckCallback callback = removeCallback(client.getSessionId(), packet.getAckId());
if (callback != null) {
callback.onSuccess();
}
}
private AckCallback removeCallback(UUID sessionId, long index) {
AckCallback callback = ackCallbacks.remove(index);
if (callback != null) {
Set<Long> callbackIds = clientCallbackIds.get(client.getSessionId());
Set<Long> callbackIds = clientCallbackIds.get(sessionId);
if (callbackIds != null) {
callbackIds.remove(packet.getAckId());
callbackIds.remove(index);
}
callback.run();
}
return callback;
}
public long registerAck(UUID sessionId, Runnable callback) {
public long registerAck(UUID sessionId, final AckCallback callback) {
Set<Long> callbackIds = clientCallbackIds.get(sessionId);
if (callbackIds == null) {
callbackIds = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
@ -54,9 +74,26 @@ public class AckManager implements Disconnectable {
long index = ackIndex.incrementAndGet();
callbackIds.add(index);
ackCallbacks.put(index, callback);
scheduleTimeout(index, sessionId, callback);
return index;
}
private void scheduleTimeout(final long index, final UUID sessionId, final AckCallback callback) {
if (callback.getTimeout() == -1) {
return;
}
SchedulerKey key = new SchedulerKey(Type.ACK_TIMEOUT, sessionId);
scheduler.schedule(key, new Runnable() {
@Override
public void run() {
removeCallback(sessionId, index);
callback.onTimeout();
}
}, callback.getTimeout(), TimeUnit.SECONDS);
}
@Override
public void onDisconnect(SocketIOClient client) {
Set<Long> callbackIds = clientCallbackIds.remove(client.getSessionId());

8
src/main/java/com/corundumstudio/socketio/SocketIOClient.java

@ -42,7 +42,7 @@ public interface SocketIOClient {
* @param message - message to send
* @param ackCallback - ack callback
*/
void sendMessage(String message, Runnable ackCallback);
void sendMessage(String message, AckCallback ackCallback);
/**
* Send object. Object will be encoded to json-format.
@ -57,7 +57,7 @@ public interface SocketIOClient {
* @param object - object to send
* @param ackCallback - ack callback
*/
void sendJsonObject(Object object, Runnable ackCallback);
void sendJsonObject(Object object, AckCallback ackCallback);
/**
* Send packet
@ -72,7 +72,7 @@ public interface SocketIOClient {
* @param packet - packet to send
* @param ackCallback - ack callback
*/
void send(Packet packet, Runnable ackCallback);
void send(Packet packet, AckCallback ackCallback);
/**
* Disconnect client
@ -95,7 +95,7 @@ public interface SocketIOClient {
* @param data - event data
* @param ackCallback - ack callback
*/
void sendEvent(String name, Object data, Runnable ackCallback);
void sendEvent(String name, Object data, AckCallback ackCallback);
SocketAddress getRemoteAddress();

3
src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java

@ -47,7 +47,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne
private final int protocol = 1;
private AckManager ackManager = new AckManager();
private AckManager ackManager;
private AuthorizeHandler authorizeHandler;
private XHRPollingTransport xhrPollingTransport;
@ -68,6 +68,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne
Encoder encoder = new Encoder(objectMapper);
Decoder decoder = new Decoder(objectMapper);
ackManager = new AckManager(scheduler);
heartbeatHandler = new HeartbeatHandler(configuration, scheduler);
PacketListener packetListener = new PacketListener(socketIOHandler, this, heartbeatHandler, ackManager);

2
src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java

@ -19,7 +19,7 @@ import java.util.UUID;
public class SchedulerKey {
public enum Type {NOOP, HEARBEAT_TIMEOUT, CLOSE_TIMEOUT, AUTHORIZE};
public enum Type {POLLING, HEARBEAT_TIMEOUT, CLOSE_TIMEOUT, AUTHORIZE, ACK_TIMEOUT};
private Type type;
private UUID sessionId;

9
src/main/java/com/corundumstudio/socketio/transport/BaseClient.java

@ -21,6 +21,7 @@ import java.util.UUID;
import org.jboss.netty.channel.Channel;
import com.corundumstudio.socketio.AckCallback;
import com.corundumstudio.socketio.AckManager;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.parser.Packet;
@ -51,7 +52,7 @@ abstract class BaseClient implements SocketIOClient {
}
@Override
public void sendEvent(String name, Object data, Runnable ackCallback) {
public void sendEvent(String name, Object data, AckCallback ackCallback) {
Packet packet = new Packet(PacketType.EVENT);
packet.setName(name);
packet.setArgs(Collections.singletonList(data));
@ -59,7 +60,7 @@ abstract class BaseClient implements SocketIOClient {
}
@Override
public void sendMessage(String message, Runnable ackCallback) {
public void sendMessage(String message, AckCallback ackCallback) {
Packet packet = new Packet(PacketType.MESSAGE);
packet.setData(message);
send(packet, ackCallback);
@ -85,14 +86,14 @@ abstract class BaseClient implements SocketIOClient {
}
@Override
public void send(Packet packet, Runnable ackCallback) {
public void send(Packet packet, AckCallback ackCallback) {
long index = ackManager.registerAck(sessionId, ackCallback);
packet.setId(index);
send(packet);
}
@Override
public void sendJsonObject(Object object, Runnable ackCallback) {
public void sendJsonObject(Object object, AckCallback ackCallback) {
Packet packet = new Packet(PacketType.JSON);
packet.setData(object);
send(packet, ackCallback);

4
src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

@ -109,7 +109,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
}
private void scheduleNoop(Channel channel, final UUID sessionId) {
SchedulerKey key = new SchedulerKey(Type.NOOP, sessionId);
SchedulerKey key = new SchedulerKey(Type.POLLING, sessionId);
scheduler.cancel(key);
scheduler.schedule(key, new Runnable() {
@Override
@ -199,7 +199,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
UUID sessionId = xhrClient.getSessionId();
sessionId2Client.remove(sessionId);
SchedulerKey noopKey = new SchedulerKey(Type.NOOP, sessionId);
SchedulerKey noopKey = new SchedulerKey(Type.POLLING, sessionId);
scheduler.cancel(noopKey);
SchedulerKey closeTimeoutKey = new SchedulerKey(Type.CLOSE_TIMEOUT, sessionId);
scheduler.cancel(closeTimeoutKey);

Loading…
Cancel
Save