Browse Source

XHR-pooling packet resend improvements.

master
Nikita 12 years ago
parent
commit
b1843bc6ad
  1. 48
      src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java

48
src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java

@ -17,11 +17,12 @@ package com.corundumstudio.socketio.transport;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.StoreFactory;
@ -30,9 +31,11 @@ import com.corundumstudio.socketio.ack.AckManager;
import com.corundumstudio.socketio.messages.XHRNewChannelMessage;
import com.corundumstudio.socketio.messages.XHRPacketMessage;
import com.corundumstudio.socketio.parser.Packet;
import com.corundumstudio.socketio.parser.PacketType;
public class XHRPollingClient extends MainBaseClient {
private final Queue<Packet> packetQueue = new ConcurrentLinkedQueue<Packet>();
private String origin;
public XHRPollingClient(AckManager ackManager, DisconnectableHub disconnectable, UUID sessionId, Transport transport, StoreFactory storeFactory) {
@ -42,6 +45,7 @@ public class XHRPollingClient extends MainBaseClient {
public void bindChannel(Channel channel, String origin) {
this.origin = origin;
setChannel(channel);
sendPackets();
channel.write(new XHRNewChannelMessage(origin, getSessionId()));
}
@ -50,20 +54,38 @@ public class XHRPollingClient extends MainBaseClient {
}
public ChannelFuture send(final Packet packet) {
Channel currChannel = getChannel();
ChannelPromise promise = currChannel.newPromise();
promise.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
// channel could be closed because new channel was bound
// so we need to resend packet
if (!future.isSuccess()) {
send(packet);
final Channel currChannel = getChannel();
ChannelFuture res = currChannel.write(new XHRPacketMessage(getSessionId(), origin, packet));
if (!packet.getType().equals(PacketType.NOOP)) {
res.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (future.isSuccess()) {
return;
}
// channel could be closed because new channel was bound
// so we need to resend packet
if (!getChannel().equals(currChannel)) {
send(packet);
sendPackets();
} else {
packetQueue.add(packet);
}
}
}
});
});
}
return currChannel.write(new XHRPacketMessage(getSessionId(), origin, packet), promise);
return res;
}
private void sendPackets() {
while (true) {
Packet p = packetQueue.poll();
if (p == null) {
break;
}
send(p);
}
}
}
Loading…
Cancel
Save