Browse Source

MultiTypeAckCallback added

master
Nikita 11 years ago
parent
commit
a3b296ccd2
  1. 2
      src/main/java/com/corundumstudio/socketio/AckCallback.java
  2. 8
      src/main/java/com/corundumstudio/socketio/JsonSupportWrapper.java
  3. 35
      src/main/java/com/corundumstudio/socketio/MultiTypeAckCallback.java
  4. 37
      src/main/java/com/corundumstudio/socketio/MultiTypeArgs.java
  5. 13
      src/main/java/com/corundumstudio/socketio/ack/AckManager.java
  6. 6
      src/main/java/com/corundumstudio/socketio/parser/AckArgs.java
  7. 2
      src/main/java/com/corundumstudio/socketio/parser/Decoder.java
  8. 2
      src/main/java/com/corundumstudio/socketio/parser/Encoder.java
  9. 6
      src/main/java/com/corundumstudio/socketio/parser/Event.java
  10. 23
      src/main/java/com/corundumstudio/socketio/parser/JacksonJsonSupport.java
  11. 4
      src/main/java/com/corundumstudio/socketio/parser/JsonSupport.java
  12. 6
      src/main/java/com/corundumstudio/socketio/parser/Packet.java
  13. 2
      src/test/java/com/corundumstudio/socketio/parser/DecoderAckPacketTest.java
  14. 2
      src/test/java/com/corundumstudio/socketio/parser/EncoderAckPacketTest.java

2
src/main/java/com/corundumstudio/socketio/AckCallback.java

@ -32,6 +32,8 @@ package com.corundumstudio.socketio;
* @param <T> - any serializable type
*
* @see com.corundumstudio.socketio.VoidAckCallback
* @see com.corundumstudio.socketio.MultiTypeAckCallback
*
*/
public abstract class AckCallback<T> {

8
src/main/java/com/corundumstudio/socketio/JsonSupportWrapper.java

@ -36,16 +36,16 @@ class JsonSupportWrapper implements JsonSupport {
this.delegate = delegate;
}
public AckArgs readAckArgs(ByteBufInputStream src, Class<?> argType) throws IOException {
public AckArgs readAckArgs(ByteBufInputStream src, AckCallback<?> callback) throws IOException {
try {
return delegate.readAckArgs(src, argType);
return delegate.readAckArgs(src, callback);
} catch (IOException e) {
src.reset();
log.error("Can't read ack args: " + src.readLine() + " for type: " + argType, e);
log.error("Can't read ack args: " + src.readLine() + " for type: " + callback.getResultClass(), e);
return null;
} catch (RuntimeException e) {
src.reset();
log.error("Can't read ack args: " + src.readLine() + " for type: " + argType, e);
log.error("Can't read ack args: " + src.readLine() + " for type: " + callback.getResultClass(), e);
return null;
}
}

35
src/main/java/com/corundumstudio/socketio/MultiTypeAckCallback.java

@ -0,0 +1,35 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio;
/**
* Multi type ack callback used in case of multiple ack arguments
*
*/
public abstract class MultiTypeAckCallback extends AckCallback<MultiTypeArgs> {
private Class<?>[] resultClasses;
public MultiTypeAckCallback(Class<?> ... resultClasses) {
super(MultiTypeArgs.class);
this.resultClasses = resultClasses;
}
public Class<?>[] getResultClasses() {
return resultClasses;
}
}

37
src/main/java/com/corundumstudio/socketio/MultiTypeArgs.java

@ -0,0 +1,37 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio;
import java.util.List;
public class MultiTypeArgs {
private final List<Object> args;
public MultiTypeArgs(List<Object> args) {
super();
this.args = args;
}
public List<Object> getArgs() {
return args;
}
public <T> T get(int index) {
return (T) args.get(0);
}
}

13
src/main/java/com/corundumstudio/socketio/ack/AckManager.java

@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.AckCallback;
import com.corundumstudio.socketio.Disconnectable;
import com.corundumstudio.socketio.MultiTypeAckCallback;
import com.corundumstudio.socketio.MultiTypeArgs;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.parser.Packet;
import com.corundumstudio.socketio.scheduler.CancelableScheduler;
@ -98,11 +100,20 @@ public class AckManager implements Disconnectable {
scheduler.cancel(key);
AckCallback callback = removeCallback(client.getSessionId(), packet.getAckId());
if (callback != null) {
if (callback == null) {
return;
}
if (callback instanceof MultiTypeAckCallback) {
callback.onSuccess(new MultiTypeArgs(packet.getArgs()));
} else {
Object param = null;
if (!packet.getArgs().isEmpty()) {
param = packet.getArgs().get(0);
}
if (packet.getArgs().size() > 1) {
log.error("Wrong ack args amount. Should be only one argument, but current amount is: {}. Ack id: {}, sessionId: {}",
packet.getArgs().size(), packet.getAckId(), client.getSessionId());
}
callback.onSuccess(param);
}
}

6
src/main/java/com/corundumstudio/socketio/parser/AckArgs.java

@ -19,14 +19,14 @@ import java.util.List;
public class AckArgs {
private List<?> args;
private List<Object> args;
public AckArgs(List<?> args) {
public AckArgs(List<Object> args) {
super();
this.args = args;
}
public List<?> getArgs() {
public List<Object> getArgs() {
return args;
}

2
src/main/java/com/corundumstudio/socketio/parser/Decoder.java

@ -209,7 +209,7 @@ public class Decoder {
ByteBufInputStream in = new ByteBufInputStream(buffer);
AckCallback<?> callback = ackManager.getCallback(uuid, packet.getAckId());
AckArgs args = jsonSupport.readAckArgs(in, callback.getResultClass());
AckArgs args = jsonSupport.readAckArgs(in, callback);
packet.setArgs(args.getArgs());
}
break;

2
src/main/java/com/corundumstudio/socketio/parser/Encoder.java

@ -193,7 +193,7 @@ public class Encoder {
break;
case EVENT:
List<?> args = packet.getArgs();
List<Object> args = packet.getArgs();
if (args.isEmpty()) {
args = null;
}

6
src/main/java/com/corundumstudio/socketio/parser/Event.java

@ -20,18 +20,18 @@ import java.util.List;
class Event {
private String name;
private List<?> args;
private List<Object> args;
public Event() {
}
public Event(String name, List<?> args) {
public Event(String name, List<Object> args) {
super();
this.name = name;
this.args = args;
}
public List<?> getArgs() {
public List<Object> getArgs() {
return args;
}

23
src/main/java/com/corundumstudio/socketio/parser/JacksonJsonSupport.java

@ -28,13 +28,14 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import com.corundumstudio.socketio.AckCallback;
import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.MultiTypeAckCallback;
import com.corundumstudio.socketio.misc.ConcurrentHashSet;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonMappingException;
@ -103,15 +104,24 @@ public class JacksonJsonSupport implements JsonSupport {
@Override
public AckArgs deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException,
JsonProcessingException {
List args = new ArrayList();
List<Object> args = new ArrayList<Object>();
AckArgs result = new AckArgs(args);
ObjectMapper mapper = (ObjectMapper) jp.getCodec();
JsonNode root = mapper.readTree(jp);
Class<?> clazz = currentAckClass.get();
AckCallback<?> callback = currentAckClass.get();
Iterator<JsonNode> iter = root.iterator();
int i = 0;
while (iter.hasNext()) {
Object val;
Class<?> clazz = callback.getResultClass();
if (callback instanceof MultiTypeAckCallback) {
MultiTypeAckCallback multiTypeAckCallback = (MultiTypeAckCallback) callback;
clazz = multiTypeAckCallback.getResultClasses()[i];
break;
}
JsonNode arg = iter.next();
if (arg.isTextual() || arg.isBoolean()) {
clazz = Object.class;
@ -137,6 +147,7 @@ public class JacksonJsonSupport implements JsonSupport {
}
val = mapper.treeToValue(arg, clazz);
args.add(val);
i++;
}
return result;
}
@ -183,7 +194,7 @@ public class JacksonJsonSupport implements JsonSupport {
}
private final ThreadLocal<Class<?>> currentAckClass = new ThreadLocal<Class<?>>();
private final ThreadLocal<AckCallback<?>> currentAckClass = new ThreadLocal<AckCallback<?>>();
private final Configuration configuration;
private final ObjectMapper objectMapper = new ObjectMapper();
private final EventDeserializer eventDeserializer = new EventDeserializer();
@ -243,8 +254,8 @@ public class JacksonJsonSupport implements JsonSupport {
}
@Override
public AckArgs readAckArgs(ByteBufInputStream src, Class<?> argType) throws IOException {
currentAckClass.set(argType);
public AckArgs readAckArgs(ByteBufInputStream src, AckCallback<?> callback) throws IOException {
currentAckClass.set(callback);
return objectMapper.readValue(src, AckArgs.class);
}

4
src/main/java/com/corundumstudio/socketio/parser/JsonSupport.java

@ -20,6 +20,8 @@ import io.netty.buffer.ByteBufOutputStream;
import java.io.IOException;
import com.corundumstudio.socketio.AckCallback;
/**
* JSON infrastructure interface.
* Allows to implement custom realizations
@ -28,7 +30,7 @@ import java.io.IOException;
*/
public interface JsonSupport {
AckArgs readAckArgs(ByteBufInputStream src, Class<?> argType) throws IOException;
AckArgs readAckArgs(ByteBufInputStream src, AckCallback<?> callback) throws IOException;
<T> T readValue(ByteBufInputStream src, Class<T> valueType) throws IOException;

6
src/main/java/com/corundumstudio/socketio/parser/Packet.java

@ -33,7 +33,7 @@ public class Packet implements Serializable {
public static final Packet NULL_INSTANCE = new Packet(null);
private PacketType type;
private List<?> args = Collections.emptyList();
private List<Object> args = Collections.emptyList();
private String qs;
private Object ack;
private Long ackId;
@ -103,11 +103,11 @@ public class Packet implements Serializable {
this.name = name;
}
public List<?> getArgs() {
public List<Object> getArgs() {
return args;
}
public void setArgs(List<?> args) {
public void setArgs(List<Object> args) {
this.args = args;
}

2
src/test/java/com/corundumstudio/socketio/parser/DecoderAckPacketTest.java

@ -44,7 +44,7 @@ public class DecoderAckPacketTest extends DecoderBaseTest {
Packet packet = decoder.decodePacket("6:::12+[\"woot\",\"wa\"]", null);
Assert.assertEquals(PacketType.ACK, packet.getType());
Assert.assertEquals(12, (long)packet.getAckId());
Assert.assertEquals(Arrays.asList("woot", "wa"), packet.getArgs());
Assert.assertEquals(Arrays.<Object>asList("woot", "wa"), packet.getArgs());
}
private void initExpectations() {

2
src/test/java/com/corundumstudio/socketio/parser/EncoderAckPacketTest.java

@ -40,7 +40,7 @@ public class EncoderAckPacketTest extends EncoderBaseTest {
public void testEncodeWithArgs() throws IOException {
Packet packet = new Packet(PacketType.ACK);
packet.setAckId(12L);
packet.setArgs(Arrays.asList("woot", "wa"));
packet.setArgs(Arrays.<Object>asList("woot", "wa"));
ByteBuf result = Unpooled.buffer();
encoder.encodePacket(packet, result);
Assert.assertEquals("6:::12+[\"woot\",\"wa\"]", result.toString(CharsetUtil.UTF_8));

Loading…
Cancel
Save