Browse Source

XHR polling optimization & refactoring.

master
Nikita 12 years ago
parent
commit
104931092a
  1. 1
      src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java
  2. 87
      src/main/java/com/corundumstudio/socketio/SocketIOEncoder.java
  3. 26
      src/main/java/com/corundumstudio/socketio/messages/XHRNewChannelMessage.java
  4. 40
      src/main/java/com/corundumstudio/socketio/messages/XHRPacketMessage.java
  5. 21
      src/main/java/com/corundumstudio/socketio/messages/XHRSendPacketsMessage.java
  6. 40
      src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java

1
src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java

@ -193,7 +193,6 @@ public class SocketIOChannelInitializer extends ChannelInitializer<Channel> impl
webSocketTransport.onDisconnect(client);
flashSocketTransport.onDisconnect(client);
authorizeHandler.onDisconnect(client);
socketIOEncoder.onDisconnect(client);
log.debug("Client with sessionId: {} disconnected", client.getSessionId());
}

87
src/main/java/com/corundumstudio/socketio/SocketIOEncoder.java

@ -36,15 +36,11 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
import java.io.IOException;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -55,81 +51,37 @@ import com.corundumstudio.socketio.messages.HttpMessage;
import com.corundumstudio.socketio.messages.WebSocketPacketMessage;
import com.corundumstudio.socketio.messages.WebsocketErrorMessage;
import com.corundumstudio.socketio.messages.XHRErrorMessage;
import com.corundumstudio.socketio.messages.XHRNewChannelMessage;
import com.corundumstudio.socketio.messages.XHROutMessage;
import com.corundumstudio.socketio.messages.XHRPacketMessage;
import com.corundumstudio.socketio.messages.XHRSendPacketsMessage;
import com.corundumstudio.socketio.parser.Encoder;
import com.corundumstudio.socketio.parser.Packet;
import com.corundumstudio.socketio.transport.MainBaseClient;
@Sharable
public class SocketIOEncoder extends ChannelOutboundHandlerAdapter implements Disconnectable {
public class SocketIOEncoder extends ChannelOutboundHandlerAdapter {
class XHRClientEntry {
// works faster than locking
final AtomicReference<Channel> lastChannel = new AtomicReference<Channel>();
final Queue<Packet> packets = new ConcurrentLinkedQueue<Packet>();
public void addPacket(Packet packet) {
packets.add(packet);
}
public Queue<Packet> getPackets() {
return packets;
}
/**
* We can write to channel only once.
*
* @param channel
* @return true - can write
*/
public boolean writeOnce(Channel channel) {
Channel prevVal = lastChannel.get();
return !channel.equals(prevVal)
&& lastChannel.compareAndSet(prevVal, channel);
}
}
private static final AttributeKey<Boolean> WRITE_ONCE = AttributeKey.<Boolean>valueOf("writeOnce");
private final Logger log = LoggerFactory.getLogger(getClass());
private final ConcurrentMap<UUID, XHRClientEntry> sessionId2ActiveChannelId = new ConcurrentHashMap<UUID, XHRClientEntry>();
private final Encoder encoder;
public SocketIOEncoder(Encoder encoder) {
this.encoder = encoder;
}
private XHRClientEntry getXHRClientEntry(UUID sessionId) {
XHRClientEntry clientEntry = sessionId2ActiveChannelId.get(sessionId);
if (clientEntry == null) {
clientEntry = new XHRClientEntry();
XHRClientEntry old = sessionId2ActiveChannelId.putIfAbsent(sessionId, clientEntry);
if (old != null) {
clientEntry = old;
}
}
return clientEntry;
}
private void write(HttpMessage xhrMessage, Packet packet,
ChannelHandlerContext ctx, ByteBuf out) throws IOException {
XHRClientEntry clientEntry = getXHRClientEntry(xhrMessage.getSessionId());
if (packet != null) {
clientEntry.addPacket(packet);
}
private void write(XHRSendPacketsMessage msg, ChannelHandlerContext ctx, ByteBuf out) throws IOException {
Channel channel = ctx.channel();
if (!channel.isActive() || clientEntry.getPackets().isEmpty()
|| !clientEntry.writeOnce(channel)) {
Attribute<Boolean> attr = channel.attr(WRITE_ONCE);
if (!channel.isActive()
|| msg.getPacketQueue().isEmpty()
|| !attr.compareAndSet(null, true)) {
out.release();
return;
}
encoder.encodePackets(clientEntry.getPackets(), out, ctx.alloc());
sendMessage(xhrMessage, channel, out);
encoder.encodePackets(msg.getPacketQueue(), out, ctx.alloc());
sendMessage(msg, channel, out);
}
private void sendMessage(HttpMessage msg, Channel channel, ByteBuf out) {
@ -177,12 +129,8 @@ public class SocketIOEncoder extends ChannelOutboundHandlerAdapter implements Di
handle((AuthorizeMessage) msg, ctx.channel(), out);
}
if (msg instanceof XHRNewChannelMessage) {
write((XHRNewChannelMessage) msg, null, ctx, out);
}
if (msg instanceof XHRPacketMessage) {
XHRPacketMessage m = (XHRPacketMessage) msg;
write(m, m.getPacket(), ctx, out);
if (msg instanceof XHRSendPacketsMessage) {
write((XHRSendPacketsMessage) msg, ctx, out);
}
if (msg instanceof XHROutMessage) {
sendMessage((XHROutMessage) msg, ctx.channel(), out);
@ -228,9 +176,4 @@ public class SocketIOEncoder extends ChannelOutboundHandlerAdapter implements Di
channel.writeAndFlush(frame);
}
@Override
public void onDisconnect(MainBaseClient client) {
sessionId2ActiveChannelId.remove(client.getSessionId());
}
}

26
src/main/java/com/corundumstudio/socketio/messages/XHRNewChannelMessage.java

@ -1,26 +0,0 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.messages;
import java.util.UUID;
public class XHRNewChannelMessage extends HttpMessage {
public XHRNewChannelMessage(String origin, UUID sessionId) {
super(origin, sessionId);
}
}

40
src/main/java/com/corundumstudio/socketio/messages/XHRPacketMessage.java

@ -1,40 +0,0 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.messages;
import java.util.UUID;
import com.corundumstudio.socketio.parser.Packet;
public class XHRPacketMessage extends HttpMessage {
private final Packet packet;
public XHRPacketMessage(UUID sessionId, String origin, Packet packet) {
super(origin, sessionId);
this.packet = packet;
}
public Packet getPacket() {
return packet;
}
@Override
public String toString() {
return "XHRPacketMessage [packet=" + packet + "]";
}
}

21
src/main/java/com/corundumstudio/socketio/messages/XHRSendPacketsMessage.java

@ -0,0 +1,21 @@
package com.corundumstudio.socketio.messages;
import java.util.Queue;
import java.util.UUID;
import com.corundumstudio.socketio.parser.Packet;
public class XHRSendPacketsMessage extends HttpMessage {
private final Queue<Packet> packetQueue;
public XHRSendPacketsMessage(UUID sessionId, String origin, Queue<Packet> packetQueue) {
super(origin, sessionId);
this.packetQueue = packetQueue;
}
public Queue<Packet> getPacketQueue() {
return packetQueue;
}
}

40
src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java

@ -17,8 +17,6 @@ package com.corundumstudio.socketio.transport;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Queue;
import java.util.UUID;
@ -28,8 +26,7 @@ import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.StoreFactory;
import com.corundumstudio.socketio.Transport;
import com.corundumstudio.socketio.ack.AckManager;
import com.corundumstudio.socketio.messages.XHRNewChannelMessage;
import com.corundumstudio.socketio.messages.XHRPacketMessage;
import com.corundumstudio.socketio.messages.XHRSendPacketsMessage;
import com.corundumstudio.socketio.parser.Packet;
public class XHRPollingClient extends MainBaseClient {
@ -44,8 +41,7 @@ public class XHRPollingClient extends MainBaseClient {
public void bindChannel(Channel channel, String origin) {
this.origin = origin;
setChannel(channel);
sendPackets();
channel.write(new XHRNewChannelMessage(origin, getSessionId()));
channel.write(new XHRSendPacketsMessage(getSessionId(), origin, packetQueue));
}
public String getOrigin() {
@ -53,36 +49,8 @@ public class XHRPollingClient extends MainBaseClient {
}
public ChannelFuture send(final Packet packet) {
final Channel currChannel = getChannel();
ChannelFuture res = currChannel.write(new XHRPacketMessage(getSessionId(), origin, packet));
res.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (future.isSuccess()) {
return;
}
// channel could be closed because new channel was bound
// so we need to resend packet
if (!getChannel().equals(currChannel)) {
send(packet);
sendPackets();
} else {
packetQueue.add(packet);
}
}
});
return res;
packetQueue.add(packet);
return getChannel().write(new XHRSendPacketsMessage(getSessionId(), origin, packetQueue));
}
private void sendPackets() {
while (true) {
Packet p = packetQueue.poll();
if (p == null) {
break;
}
send(p);
}
}
}
Loading…
Cancel
Save