Browse Source

JSON typed serialization support. Issue #23

master
Nikita 13 years ago
parent
commit
61b33edb7b
  1. 10
      README.md
  2. 34
      src/main/java/com/corundumstudio/socketio/Configuration.java
  3. 10
      src/main/java/com/corundumstudio/socketio/SocketIOServer.java
  4. 2
      src/main/java/com/corundumstudio/socketio/listener/ClientListeners.java
  5. 35
      src/main/java/com/corundumstudio/socketio/namespace/Namespace.java
  6. 10
      src/main/java/com/corundumstudio/socketio/parser/Decoder.java
  7. 70
      src/main/java/com/corundumstudio/socketio/parser/JacksonJsonSupport.java
  8. 31
      src/main/java/com/corundumstudio/socketio/parser/JsonObject.java
  9. 8
      src/main/java/com/corundumstudio/socketio/parser/JsonSupport.java
  10. 2
      src/test/java/com/corundumstudio/socketio/PacketHandlerTest.java
  11. 4
      src/test/java/com/corundumstudio/socketio/parser/DecoderBaseTest.java
  12. 4
      src/test/java/com/corundumstudio/socketio/parser/DecoderEventPacketTest.java
  13. 1
      src/test/java/com/corundumstudio/socketio/parser/DecoderJsonPacketTest.java
  14. 4
      src/test/java/com/corundumstudio/socketio/parser/EncoderBaseTest.java
  15. 7
      src/test/java/com/corundumstudio/socketio/parser/PayloadTest.java

10
README.md

@ -54,9 +54,15 @@ Licensed under the Apache License 2.0.
}
});
server.addJsonObjectListener(new DataListener<Object>() {
// Don't forget to include type field, it named '@class' by default,
// with class full name.
//
// TIP: you can customize type name field via Configuration.jsonTypeFieldName property
server.addJsonObjectListener(SomeClass.class, new DataListener<SomeClass>() {
@Override
public void onData(SocketIOClient client, Object data) {
public void onData(SocketIOClient client, SomeClass data) {
...

34
src/main/java/com/corundumstudio/socketio/Configuration.java

@ -23,6 +23,7 @@ import com.corundumstudio.socketio.parser.JsonSupport;
public class Configuration {
private String jsonTypeFieldName = "@class";
private String context = "/socket.io";
private Executor bossExecutor = Executors.newCachedThreadPool();
@ -40,7 +41,7 @@ public class Configuration {
private String hostname;
private int port;
private JsonSupport jsonSupport = new JacksonJsonSupport();
private JsonSupport jsonSupport = new JacksonJsonSupport(this);
public Configuration() {
}
@ -50,7 +51,7 @@ public class Configuration {
*
* @param configuration - Configuration object to clone
*/
public Configuration(Configuration conf) {
Configuration(Configuration conf) {
setBossExecutor(conf.getBossExecutor());
setCloseTimeout(conf.getCloseTimeout());
setHeartbeatInterval(conf.getHeartbeatInterval());
@ -63,11 +64,28 @@ public class Configuration {
setContext(conf.getContext());
setAllowCustomRequests(conf.isAllowCustomRequests());
setPollingDuration(conf.getPollingDuration());
setJsonTypeFieldName(conf.getJsonTypeFieldName());
}
public String getJsonTypeFieldName() {
return jsonTypeFieldName;
}
public void setJsonTypeFieldName(String jsonTypeFieldName) {
this.jsonTypeFieldName = jsonTypeFieldName;
}
public JsonSupport getJsonSupport() {
return jsonSupport;
}
/**
* Allows to setup custom implementation of
* JSON serialization/deserialization
*
* @param jsonSupport
*
* @see JsonSupport
*/
public void setJsonSupport(JsonSupport jsonSupport) {
this.jsonSupport = jsonSupport;
}
@ -99,6 +117,7 @@ public class Configuration {
public void setWorkerExecutor(Executor workerExecutor) {
this.workerExecutor = workerExecutor;
}
/**
* Heartbeat interval
*
@ -142,6 +161,11 @@ public class Configuration {
return heartbeatThreadPoolSize;
}
/**
* Channel close timeout due inactivity
*
* @param closeTimeout - time in seconds
*/
public void setCloseTimeout(int closeTimeout) {
this.closeTimeout = closeTimeout;
}
@ -174,6 +198,12 @@ public class Configuration {
public int getPollingDuration() {
return pollingDuration;
}
/**
* Polling interval for XHR transport
*
* @param pollingDuration - time in seconds
*/
public void setPollingDuration(int pollingDuration) {
this.pollingDuration = pollingDuration;
}

10
src/main/java/com/corundumstudio/socketio/SocketIOServer.java

@ -69,6 +69,9 @@ public class SocketIOServer implements ClientListeners {
return new BroadcastOperations(pipelineFactory.getAllClients());
}
/**
* Start server
*/
public void start() {
ChannelFactory factory = new NioServerSocketChannelFactory(config.getBossExecutor(), config.getWorkerExecutor());
bootstrap = new ServerBootstrap(factory);
@ -83,6 +86,9 @@ public class SocketIOServer implements ClientListeners {
log.info("SocketIO server started at port: {}", config.getPort());
}
/**
* Stop server
*/
public void stop() {
pipelineFactory.stop();
mainChannel.close();
@ -108,8 +114,8 @@ public class SocketIOServer implements ClientListeners {
}
@Override
public void addJsonObjectListener(DataListener<Object> listener) {
mainNamespace.addJsonObjectListener(listener);
public <T> void addJsonObjectListener(Class<T> clazz, DataListener<T> listener) {
mainNamespace.addJsonObjectListener(clazz, listener);
}
@Override

2
src/main/java/com/corundumstudio/socketio/listener/ClientListeners.java

@ -19,7 +19,7 @@ public interface ClientListeners {
<T> void addEventListener(String eventName, Class<T> eventClass, DataListener<T> listener);
void addJsonObjectListener(DataListener<Object> listener);
<T> void addJsonObjectListener(Class<T> clazz, DataListener<T> listener);
void addDisconnectListener(DisconnectListener listener);

35
src/main/java/com/corundumstudio/socketio/namespace/Namespace.java

@ -39,7 +39,8 @@ public class Namespace implements SocketIONamespace {
private final Set<SocketIOClient> clients = Collections.newSetFromMap(new ConcurrentHashMap<SocketIOClient, Boolean>());
private final ConcurrentMap<String, EventEntry<?>> eventListeners =
new ConcurrentHashMap<String, EventEntry<?>>();
private final Queue<DataListener<Object>> jsonObjectListeners = new ConcurrentLinkedQueue<DataListener<Object>>();
private final ConcurrentMap<Class<?>, Queue<DataListener<?>>> jsonObjectListeners =
new ConcurrentHashMap<Class<?>, Queue<DataListener<?>>>();
private final Queue<DataListener<String>> messageListeners = new ConcurrentLinkedQueue<DataListener<String>>();
private final Queue<ConnectListener> connectListeners = new ConcurrentLinkedQueue<ConnectListener>();
private final Queue<DisconnectListener> disconnectListeners = new ConcurrentLinkedQueue<DisconnectListener>();
@ -79,6 +80,9 @@ public class Namespace implements SocketIONamespace {
@SuppressWarnings({"rawtypes", "unchecked"})
public void onEvent(SocketIOClient client, String eventName, Object data) {
EventEntry entry = eventListeners.get(eventName);
if (entry == null) {
return;
}
Queue<DataListener> listeners = entry.getListeners();
for (DataListener dataListener : listeners) {
dataListener.onData(client, data);
@ -86,12 +90,27 @@ public class Namespace implements SocketIONamespace {
}
@Override
public void addJsonObjectListener(DataListener<Object> listener) {
jsonObjectListeners.add(listener);
public <T> void addJsonObjectListener(Class<T> clazz, DataListener<T> listener) {
Queue<DataListener<?>> queue = jsonObjectListeners.get(clazz);
if (queue == null) {
queue = new ConcurrentLinkedQueue<DataListener<?>>();
Queue<DataListener<?>> oldQueue = jsonObjectListeners.putIfAbsent(clazz, queue);
if (oldQueue != null) {
queue = oldQueue;
}
}
queue.add(listener);
jsonSupport.addJsonClass(clazz);
}
public Queue<DataListener<Object>> getJsonObjectListeners() {
return jsonObjectListeners;
public void onJsonObject(SocketIOClient client, Object data) {
Queue<DataListener<?>> queue = jsonObjectListeners.get(data.getClass());
if (queue == null) {
return;
}
for (DataListener dataListener : queue) {
dataListener.onData(client, data);
}
}
@Override
@ -133,12 +152,6 @@ public class Namespace implements SocketIONamespace {
}
}
public void onJsonObject(SocketIOClient client, Object data) {
for (DataListener<Object> listener : jsonObjectListeners) {
listener.onData(client, data);
}
}
@Override
public ClientOperations getBroadcastOperations() {
return new BroadcastOperations(clients);

10
src/main/java/com/corundumstudio/socketio/parser/Decoder.java

@ -163,8 +163,14 @@ public class Decoder {
case JSON: {
ChannelBufferInputStream in = new ChannelBufferInputStream(buffer);
Object obj = jsonSupport.readValue(in, Object.class);
packet.setData(obj);
JsonObject obj = jsonSupport.readValue(in, JsonObject.class);
if (obj != null) {
packet.setData(obj.getObject());
} else {
in.reset();
Object object = jsonSupport.readValue(in, Object.class);
packet.setData(object);
}
break;
}

70
src/main/java/com/corundumstudio/socketio/parser/JacksonJsonSupport.java

@ -19,27 +19,73 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.Version;
import org.codehaus.jackson.annotate.JsonTypeInfo;
import org.codehaus.jackson.map.DeserializationContext;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectMapper.DefaultTypeResolverBuilder;
import org.codehaus.jackson.map.ObjectMapper.DefaultTyping;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.codehaus.jackson.map.deser.BeanDeserializer;
import org.codehaus.jackson.map.deser.std.StdDeserializer;
import org.codehaus.jackson.map.jsontype.TypeResolverBuilder;
import org.codehaus.jackson.map.module.SimpleModule;
import org.codehaus.jackson.node.ObjectNode;
import com.corundumstudio.socketio.Configuration;
public class JacksonJsonSupport implements JsonSupport {
private class JsonObjectDeserializer extends StdDeserializer<JsonObject> {
final Set<Class<?>> classes = Collections.newSetFromMap(new ConcurrentHashMap<Class<?>, Boolean>());
protected JsonObjectDeserializer() {
super(JsonObject.class);
}
@Override
public JsonObject deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException,
JsonProcessingException {
ObjectMapper mapper = (ObjectMapper) jp.getCodec();
JsonNode rootNode = mapper.readTree(jp);
if (!rootNode.isObject()) {
return null;
}
Class<?> clazz = Object.class;
ObjectNode root = (ObjectNode) rootNode;
JsonNode node = root.remove(configuration.getJsonTypeFieldName());
if (node != null) {
try {
String typeName = node.asText();
Class<?> supportClazz = Class.forName(typeName);
if (classes.contains(supportClazz)) {
clazz = supportClazz;
}
} catch (ClassNotFoundException e) {
// skip it
}
}
Object val = mapper.readValue(root, clazz);
return new JsonObject(val);
}
}
private class EventDeserializer extends StdDeserializer<Event> {
Map<String, Class<?>> eventMapping = new ConcurrentHashMap<String, Class<?>>();
final Map<String, Class<?>> eventMapping = new ConcurrentHashMap<String, Class<?>>();
protected EventDeserializer() {
super(Event.class);
@ -74,20 +120,40 @@ public class JacksonJsonSupport implements JsonSupport {
}
private final Configuration configuration;
private final ObjectMapper objectMapper = new ObjectMapper();
private final EventDeserializer eventDeserializer = new EventDeserializer();
private final JsonObjectDeserializer jsonObjectDeserializer = new JsonObjectDeserializer();
public JacksonJsonSupport(Configuration configuration) {
this.configuration = configuration;
public JacksonJsonSupport() {
SimpleModule module = new SimpleModule("EventDeserializerModule", new Version(1, 0, 0, null));
module.addDeserializer(Event.class, eventDeserializer);
module.addDeserializer(JsonObject.class, jsonObjectDeserializer);
objectMapper.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
objectMapper.registerModule(module);
// TODO If jsonObjectDeserializer will be not enough
// TypeResolverBuilder<?> typer = new DefaultTypeResolverBuilder(DefaultTyping.NON_FINAL);
// typer.init(JsonTypeInfo.Id.CLASS, null);
// typer.inclusion(JsonTypeInfo.As.PROPERTY);
// typer.typeProperty(configuration.getJsonTypeFieldName());
// objectMapper.setDefaultTyping(typer);
}
@Override
public void addEventMapping(String eventName, Class<?> eventClass) {
eventDeserializer.eventMapping.put(eventName, eventClass);
}
@Override
public void addJsonClass(Class<?> clazz) {
jsonObjectDeserializer.classes.add(clazz);
}
@Override
public void removeEventMapping(String eventName) {
eventDeserializer.eventMapping.remove(eventName);
}

31
src/main/java/com/corundumstudio/socketio/parser/JsonObject.java

@ -0,0 +1,31 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.parser;
public class JsonObject {
private final Object object;
public JsonObject(Object object) {
super();
this.object = object;
}
public Object getObject() {
return object;
}
}

8
src/main/java/com/corundumstudio/socketio/parser/JsonSupport.java

@ -19,6 +19,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* JSON infrastructure interface.
* Allows to implement custom realizations
* to JSON support operations.
*
*/
public interface JsonSupport {
<T> T readValue(InputStream src, Class<T> valueType) throws IOException;
@ -29,6 +35,8 @@ public interface JsonSupport {
void addEventMapping(String eventName, Class<?> eventClass);
void addJsonClass(Class<?> clazz);
void removeEventMapping(String eventName);
}

2
src/test/java/com/corundumstudio/socketio/PacketHandlerTest.java

@ -45,7 +45,7 @@ import com.corundumstudio.socketio.transport.BaseClient;
public class PacketHandlerTest {
private JsonSupport map = new JacksonJsonSupport();
private JsonSupport map = new JacksonJsonSupport(new Configuration());
private Decoder decoder = new Decoder(map);
private Encoder encoder = new Encoder(map);
private NamespacesHub namespacesHub = new NamespacesHub(map);

4
src/test/java/com/corundumstudio/socketio/parser/DecoderBaseTest.java

@ -15,9 +15,11 @@
*/
package com.corundumstudio.socketio.parser;
import com.corundumstudio.socketio.Configuration;
public class DecoderBaseTest {
protected final Decoder decoder = new Decoder(new JacksonJsonSupport());
protected final Decoder decoder = new Decoder(new JacksonJsonSupport(new Configuration()));
}

4
src/test/java/com/corundumstudio/socketio/parser/DecoderEventPacketTest.java

@ -22,6 +22,8 @@ import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import com.corundumstudio.socketio.Configuration;
public class DecoderEventPacketTest extends DecoderBaseTest {
@Test
@ -42,7 +44,7 @@ public class DecoderEventPacketTest extends DecoderBaseTest {
@Test
public void testDecodeWithData() throws IOException {
JacksonJsonSupport jsonSupport = new JacksonJsonSupport();
JacksonJsonSupport jsonSupport = new JacksonJsonSupport(new Configuration());
Decoder decoder = new Decoder(jsonSupport);
jsonSupport.addEventMapping("edwald", HashMap.class);

1
src/test/java/com/corundumstudio/socketio/parser/DecoderJsonPacketTest.java

@ -43,6 +43,7 @@ public class DecoderJsonPacketTest extends DecoderBaseTest {
Assert.assertEquals(PacketType.JSON, packet.getType());
Assert.assertEquals(1, (long)packet.getId());
Assert.assertEquals("data", packet.getAck());
Map obj = (Map) packet.getData();
Assert.assertEquals("b", obj.get("a"));
Assert.assertEquals(1, obj.size());

4
src/test/java/com/corundumstudio/socketio/parser/EncoderBaseTest.java

@ -15,8 +15,10 @@
*/
package com.corundumstudio.socketio.parser;
import com.corundumstudio.socketio.Configuration;
public class EncoderBaseTest {
protected final Encoder encoder = new Encoder(new JacksonJsonSupport());
protected final Encoder encoder = new Encoder(new JacksonJsonSupport(new Configuration()));
}

7
src/test/java/com/corundumstudio/socketio/parser/PayloadTest.java

@ -27,10 +27,13 @@ import org.jboss.netty.util.CharsetUtil;
import org.junit.Assert;
import org.junit.Test;
import com.corundumstudio.socketio.Configuration;
public class PayloadTest {
private final Decoder decoder = new Decoder(new JacksonJsonSupport());
private final Encoder encoder = new Encoder(new JacksonJsonSupport());
private final JacksonJsonSupport support = new JacksonJsonSupport(new Configuration());
private final Decoder decoder = new Decoder(support);
private final Encoder encoder = new Encoder(support);
@Test
public void testPayloadDecode() throws IOException {

Loading…
Cancel
Save