Browse Source

Ack timeout bugs fixed

master
Nikita 13 years ago
parent
commit
3924d2d265
  1. 17
      src/main/java/com/corundumstudio/socketio/AckCallback.java
  2. 1
      src/main/java/com/corundumstudio/socketio/PacketListener.java
  3. 1
      src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java
  4. 4
      src/main/java/com/corundumstudio/socketio/VoidAckCallback.java
  5. 21
      src/main/java/com/corundumstudio/socketio/ack/AckManager.java
  6. 57
      src/main/java/com/corundumstudio/socketio/ack/AckSchedulerKey.java
  7. 2
      src/main/java/com/corundumstudio/socketio/parser/Decoder.java
  8. 2
      src/main/java/com/corundumstudio/socketio/transport/BaseClient.java
  9. 2
      src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java
  10. 2
      src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java
  11. 2
      src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java
  12. 2
      src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java
  13. 2
      src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java
  14. 1
      src/test/java/com/corundumstudio/socketio/PacketHandlerTest.java
  15. 2
      src/test/java/com/corundumstudio/socketio/parser/DecoderBaseTest.java

17
src/main/java/com/corundumstudio/socketio/AckCallback.java

@ -25,19 +25,26 @@ package com.corundumstudio.socketio;
*/
public abstract class AckCallback<T> {
protected Class<T> resultClass;
protected int timeout = -1;
protected final Class<T> resultClass;
protected final int timeout;
/**
* Create AckCallback
*
* @param resultClass - result class
*/
public AckCallback(Class<T> resultClass) {
this.resultClass = resultClass;
this(resultClass, -1);
}
/**
* Creates AckCallback
* Creates AckCallback with timeout
*
* @param resultClass - result class
* @param timeout - callback timeout in seconds
*/
public AckCallback(int timeout) {
public AckCallback(Class<T> resultClass, int timeout) {
this.resultClass = resultClass;
this.timeout = timeout;
}

1
src/main/java/com/corundumstudio/socketio/PacketListener.java

@ -15,6 +15,7 @@
*/
package com.corundumstudio.socketio;
import com.corundumstudio.socketio.ack.AckManager;
import com.corundumstudio.socketio.namespace.Namespace;
import com.corundumstudio.socketio.namespace.NamespacesHub;
import com.corundumstudio.socketio.parser.Packet;

1
src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java

@ -34,6 +34,7 @@ import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.ack.AckManager;
import com.corundumstudio.socketio.handler.AuthorizeHandler;
import com.corundumstudio.socketio.handler.PacketHandler;
import com.corundumstudio.socketio.handler.ResourceHandler;

4
src/main/java/com/corundumstudio/socketio/VoidAckCallback.java

@ -25,6 +25,10 @@ public abstract class VoidAckCallback extends AckCallback<Void> {
super(Void.class);
}
public VoidAckCallback(int timeout) {
super(Void.class, timeout);
}
@Override
public final void onSuccess(Void result) {
onSuccess();

21
src/main/java/com/corundumstudio/socketio/AckManager.java → src/main/java/com/corundumstudio/socketio/ack/AckManager.java

@ -13,9 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio;
package com.corundumstudio.socketio.ack;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@ -24,6 +25,10 @@ import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.AckCallback;
import com.corundumstudio.socketio.Disconnectable;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.ack.AckManager.AckEntry;
import com.corundumstudio.socketio.parser.Packet;
import com.corundumstudio.socketio.scheduler.CancelableScheduler;
import com.corundumstudio.socketio.scheduler.SchedulerKey;
@ -43,6 +48,10 @@ public class AckManager implements Disconnectable {
return index;
}
public Set<Long> getAckIndexes() {
return ackCallbacks.keySet();
}
public AckCallback<?> getAckCallback(long index) {
return ackCallbacks.get(index);
}
@ -127,7 +136,7 @@ public class AckManager implements Disconnectable {
if (callback.getTimeout() == -1) {
return;
}
SchedulerKey key = new SchedulerKey(Type.ACK_TIMEOUT, sessionId);
SchedulerKey key = new AckSchedulerKey(Type.ACK_TIMEOUT, sessionId, index);
scheduler.schedule(key, new Runnable() {
@Override
public void run() {
@ -139,7 +148,13 @@ public class AckManager implements Disconnectable {
@Override
public void onDisconnect(BaseClient client) {
ackEntries.remove(client.getSessionId());
AckEntry entry = ackEntries.remove(client.getSessionId());
if (entry != null) {
for (Long index : entry.getAckIndexes()) {
SchedulerKey key = new AckSchedulerKey(Type.ACK_TIMEOUT, client.getSessionId(), index);
scheduler.cancel(key);
}
}
}
}

57
src/main/java/com/corundumstudio/socketio/ack/AckSchedulerKey.java

@ -0,0 +1,57 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.ack;
import java.util.UUID;
import com.corundumstudio.socketio.scheduler.SchedulerKey;
public class AckSchedulerKey extends SchedulerKey {
private final long index;
public AckSchedulerKey(Type type, UUID sessionId, long index) {
super(type, sessionId);
this.index = index;
}
public long getIndex() {
return index;
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (int) (index ^ (index >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (getClass() != obj.getClass())
return false;
AckSchedulerKey other = (AckSchedulerKey) obj;
if (index != other.index)
return false;
return true;
}
}

2
src/main/java/com/corundumstudio/socketio/parser/Decoder.java

@ -25,7 +25,7 @@ import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;
import com.corundumstudio.socketio.AckCallback;
import com.corundumstudio.socketio.AckManager;
import com.corundumstudio.socketio.ack.AckManager;
import com.corundumstudio.socketio.namespace.Namespace;
public class Decoder {

2
src/main/java/com/corundumstudio/socketio/transport/BaseClient.java

@ -25,9 +25,9 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import com.corundumstudio.socketio.AckManager;
import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.ack.AckManager;
import com.corundumstudio.socketio.namespace.Namespace;
import com.corundumstudio.socketio.parser.Packet;
import com.corundumstudio.socketio.parser.PacketType;

2
src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java

@ -18,10 +18,10 @@ package com.corundumstudio.socketio.transport;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import com.corundumstudio.socketio.AckManager;
import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.HeartbeatHandler;
import com.corundumstudio.socketio.SocketIOPipelineFactory;
import com.corundumstudio.socketio.ack.AckManager;
import com.corundumstudio.socketio.handler.AuthorizeHandler;
@Sharable

2
src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java

@ -20,8 +20,8 @@ import java.util.UUID;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import com.corundumstudio.socketio.AckManager;
import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.ack.AckManager;
import com.corundumstudio.socketio.messages.WebSocketPacketMessage;
import com.corundumstudio.socketio.parser.Packet;

2
src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java

@ -42,13 +42,13 @@ import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFa
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.AckManager;
import com.corundumstudio.socketio.CompositeIterable;
import com.corundumstudio.socketio.Disconnectable;
import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.HeartbeatHandler;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOPipelineFactory;
import com.corundumstudio.socketio.ack.AckManager;
import com.corundumstudio.socketio.handler.AuthorizeHandler;
import com.corundumstudio.socketio.messages.PacketsMessage;

2
src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java

@ -20,8 +20,8 @@ import java.util.UUID;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import com.corundumstudio.socketio.AckManager;
import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.ack.AckManager;
import com.corundumstudio.socketio.messages.XHRNewChannelMessage;
import com.corundumstudio.socketio.messages.XHRPacketMessage;
import com.corundumstudio.socketio.parser.Packet;

2
src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

@ -40,13 +40,13 @@ import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.AckManager;
import com.corundumstudio.socketio.CompositeIterable;
import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.Disconnectable;
import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOPipelineFactory;
import com.corundumstudio.socketio.ack.AckManager;
import com.corundumstudio.socketio.handler.AuthorizeHandler;
import com.corundumstudio.socketio.messages.PacketsMessage;
import com.corundumstudio.socketio.messages.XHRErrorMessage;

1
src/test/java/com/corundumstudio/socketio/PacketHandlerTest.java

@ -32,6 +32,7 @@ import org.jboss.netty.channel.UpstreamMessageEvent;
import org.junit.Before;
import org.junit.Test;
import com.corundumstudio.socketio.ack.AckManager;
import com.corundumstudio.socketio.handler.PacketHandler;
import com.corundumstudio.socketio.messages.PacketsMessage;
import com.corundumstudio.socketio.namespace.Namespace;

2
src/test/java/com/corundumstudio/socketio/parser/DecoderBaseTest.java

@ -19,8 +19,8 @@ import org.junit.Before;
import mockit.Mocked;
import com.corundumstudio.socketio.AckManager;
import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.ack.AckManager;
public class DecoderBaseTest {

Loading…
Cancel
Save