Browse Source

Issue #5 fixed

master
Nikita 13 years ago
parent
commit
097aa99853
  1. 107
      src/main/java/com/corundumstudio/socketio/NullChannelFuture.java
  2. 47
      src/main/java/com/corundumstudio/socketio/PacketHandler.java
  3. 102
      src/test/java/com/corundumstudio/socketio/PacketHandlerTest.java
  4. 7
      src/test/java/com/corundumstudio/socketio/parser/DecoderJsonPacketTest.java

107
src/main/java/com/corundumstudio/socketio/NullChannelFuture.java

@ -1,107 +0,0 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NullChannelFuture implements ChannelFuture {
public static final ChannelFuture INSTANCE = new NullChannelFuture();
private final Logger log = LoggerFactory.getLogger(getClass());
public Channel getChannel() {
throw new UnsupportedOperationException();
}
public boolean isDone() {
return true;
}
public boolean isCancelled() {
return false;
}
public boolean isSuccess() {
return false;
}
public Throwable getCause() {
return null;
}
public boolean cancel() {
return false;
}
public boolean setSuccess() {
return false;
}
public boolean setFailure(Throwable cause) {
return false;
}
public boolean setProgress(long amount, long current, long total) {
return false;
}
public void addListener(ChannelFutureListener listener) {
try {
listener.operationComplete(this);
} catch (Exception e) {
log.error("Can't execute ChannelFutureListener ", e);
}
}
public void removeListener(ChannelFutureListener listener) {
}
public ChannelFuture await() throws InterruptedException {
return this;
}
public ChannelFuture awaitUninterruptibly() {
return this;
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return true;
}
public boolean await(long timeoutMillis) throws InterruptedException {
return true;
}
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
return true;
}
public boolean awaitUninterruptibly(long timeoutMillis) {
return true;
}
public ChannelFuture rethrowIfFailed() throws Exception {
return this;
}
}

47
src/main/java/com/corundumstudio/socketio/PacketHandler.java

@ -18,6 +18,8 @@ package com.corundumstudio.socketio;
import java.io.IOException;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferIndexFinder;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
@ -28,6 +30,7 @@ import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.messages.PacketsMessage;
import com.corundumstudio.socketio.parser.Decoder;
import com.corundumstudio.socketio.parser.DecoderException;
import com.corundumstudio.socketio.parser.Packet;
@Sharable
@ -35,6 +38,13 @@ public class PacketHandler extends SimpleChannelUpstreamHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
private final ChannelBufferIndexFinder delimiterFinder = new ChannelBufferIndexFinder() {
@Override
public boolean find(ChannelBuffer buffer, int guessedIndex) {
return isCurrentDelimiter(buffer, guessedIndex);
}
};
private final PacketListener packetListener;
private final Decoder decoder;
@ -56,7 +66,7 @@ public class PacketHandler extends SimpleChannelUpstreamHandler {
log.trace("In message: {} sessionId: {}", new Object[] {content.toString(CharsetUtil.UTF_8), message.getClient().getSessionId()});
}
while (content.readable()) {
Packet packet = decode(content);
Packet packet = decodePacket(content);
packetListener.onPacket(packet, message.getClient());
}
} else {
@ -64,11 +74,11 @@ public class PacketHandler extends SimpleChannelUpstreamHandler {
}
}
private Packet decode(ChannelBuffer buffer) throws IOException {
private Packet decodePacket(ChannelBuffer buffer) throws IOException {
if (isCurrentDelimiter(buffer, buffer.readerIndex())) {
buffer.readerIndex(buffer.readerIndex() + Packet.DELIMITER_BYTES.length);
Integer len = parseLength(buffer);
Integer len = extractLength(buffer);
ChannelBuffer frame = buffer.slice(buffer.readerIndex(), len);
Packet packet = decoder.decodePacket(frame);
@ -81,15 +91,30 @@ public class PacketHandler extends SimpleChannelUpstreamHandler {
}
}
private Integer parseLength(ChannelBuffer buffer) {
byte[] digits = null;
for (int i = buffer.readerIndex(); i < buffer.readerIndex() + buffer.readableBytes(); i++) {
if (isCurrentDelimiter(buffer, i)) {
digits = new byte[i - buffer.readerIndex()];
buffer.getBytes(buffer.readerIndex(), digits);
break;
}
private Integer extractLength(ChannelBuffer buffer) {
Integer len = parseLengthHeader(buffer);
// to read utf8 symbols
if (buffer.capacity() > buffer.readerIndex() + len
&& !isCurrentDelimiter(buffer, buffer.readerIndex() + len)) {
int index = ChannelBuffers.indexOf(buffer, buffer.readerIndex() + len, buffer.capacity(), delimiterFinder);
if (index != -1) {
len = index - buffer.readerIndex();
} else {
len = buffer.capacity() - buffer.readerIndex();
}
}
return len;
}
private Integer parseLengthHeader(ChannelBuffer buffer) {
int delimiterIndex = ChannelBuffers.indexOf(buffer, buffer.readerIndex(), buffer.capacity(), delimiterFinder);
if (delimiterIndex == -1) {
throw new DecoderException("Can't find tail delimiter");
}
byte[] digits = new byte[delimiterIndex - buffer.readerIndex()];;
buffer.getBytes(buffer.readerIndex(), digits);
buffer.readerIndex(buffer.readerIndex() + digits.length + Packet.DELIMITER_BYTES.length);
return decoder.parseInt(digits);
}

102
src/test/java/com/corundumstudio/socketio/PacketHandlerTest.java

@ -17,10 +17,7 @@ package com.corundumstudio.socketio;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
@ -31,6 +28,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.UpstreamMessageEvent;
import org.junit.Before;
import org.junit.Test;
import com.corundumstudio.socketio.messages.PacketsMessage;
@ -48,69 +46,77 @@ public class PacketHandlerTest {
private Channel channel;
@Mocked
private SocketIOClient client;
private final AtomicInteger invocations = new AtomicInteger();
@Test
public void testOnePacket() throws Exception {
final AtomicInteger invocations = new AtomicInteger();
@Before
public void before() {
invocations.set(0);
}
private PacketListener createTestListener(final List<Packet> packets) {
PacketListener listener = new PacketListener(null, null, null) {
@Override
public void onPacket(Packet packet, SocketIOClient client) {
invocations.incrementAndGet();
Assert.assertEquals(PacketType.JSON, packet.getType());
Map<String, String> map = (Map<String, String>) packet.getData();
Assert.assertTrue(map.keySet().size() == 1);
Assert.assertTrue(map.keySet().contains("test1"));
int index = invocations.incrementAndGet();
Packet currentPacket = packets.get(index-1);
Assert.assertEquals(currentPacket.getType(), packet.getType());
Assert.assertEquals(currentPacket.getData(), packet.getData());
}
};
PacketHandler handler = new PacketHandler(listener, decoder);
return listener;
}
List<Packet> packets = new ArrayList<Packet>();
Packet packet = new Packet(PacketType.JSON);
packet.setData(Collections.singletonMap("test1", "test2"));
packets.add(packet);
@Test
public void testOnePacket() throws Exception {
List<Packet> packets = new ArrayList<Packet>();
Packet packet = new Packet(PacketType.JSON);
packet.setData(Collections.singletonMap("test1", "test2"));
packets.add(packet);
testHandler(invocations, handler, packets);
PacketListener listener = createTestListener(packets);
PacketHandler handler = new PacketHandler(listener, decoder);
testHandler(handler, packets);
}
@Test
public void testMultiplePackets() throws Exception {
final AtomicInteger invocations = new AtomicInteger();
PacketListener listener = new PacketListener(null, null, null) {
@Override
public void onPacket(Packet packet, SocketIOClient client) {
if (packet.getType() == PacketType.CONNECT) {
invocations.incrementAndGet();
return;
}
Assert.assertEquals(PacketType.JSON, packet.getType());
Map<String, String> map = (Map<String, String>) packet.getData();
Set<String> keys = new HashSet<String>();
keys.add("test1");
keys.add("fsdfdf");
Assert.assertTrue(map.keySet().size() == 1);
Assert.assertTrue(map.keySet().removeAll(keys));
invocations.incrementAndGet();
}
};
public void testUTF8MultiplePackets() throws Exception {
List<Packet> packets = new ArrayList<Packet>();
Packet packet3 = new Packet(PacketType.CONNECT);
packets.add(packet3);
Packet packet = new Packet(PacketType.JSON);
packet.setData(Collections.singletonMap("test1", "Данные"));
packets.add(packet);
Packet packet1 = new Packet(PacketType.JSON);
packet1.setData(Collections.singletonMap("Привет", "wqeq"));
packets.add(packet1);
PacketListener listener = createTestListener(packets);
PacketHandler handler = new PacketHandler(listener, decoder);
testHandler(handler, packets);
}
List<Packet> packets = new ArrayList<Packet>();
Packet packet3 = new Packet(PacketType.CONNECT);
packets.add(packet3);
@Test
public void testMultiplePackets() throws Exception {
List<Packet> packets = new ArrayList<Packet>();
Packet packet3 = new Packet(PacketType.CONNECT);
packets.add(packet3);
Packet packet = new Packet(PacketType.JSON);
packet.setData(Collections.singletonMap("test1", "test2"));
packets.add(packet);
Packet packet = new Packet(PacketType.JSON);
packet.setData(Collections.singletonMap("test1", "test2"));
packets.add(packet);
Packet packet1 = new Packet(PacketType.JSON);
packet1.setData(Collections.singletonMap("fsdfdf", "wqeq"));
packets.add(packet1);
Packet packet1 = new Packet(PacketType.JSON);
packet1.setData(Collections.singletonMap("fsdfdf", "wqeq"));
packets.add(packet1);
testHandler(invocations, handler, packets);
PacketListener listener = createTestListener(packets);
PacketHandler handler = new PacketHandler(listener, decoder);
testHandler(handler, packets);
}
private void testHandler(final AtomicInteger invocations,
PacketHandler handler, List<Packet> packets) throws Exception {
private void testHandler(PacketHandler handler, List<Packet> packets) throws Exception {
String str = encoder.encodePackets(packets);
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(str.getBytes());
handler.messageReceived(null, new UpstreamMessageEvent(channel, new PacketsMessage(client, buffer), null));

7
src/test/java/com/corundumstudio/socketio/parser/DecoderJsonPacketTest.java

@ -26,6 +26,13 @@ public class DecoderJsonPacketTest {
private final Decoder decoder = new Decoder(new ObjectMapper());
@Test
public void testUTF8Decode() throws IOException {
Packet packet = decoder.decodePacket("4:::\"Привет\"");
Assert.assertEquals(PacketType.JSON, packet.getType());
Assert.assertEquals("Привет", packet.getData());
}
@Test
public void testDecode() throws IOException {
Packet packet = decoder.decodePacket("4:::\"2\"");

Loading…
Cancel
Save