Browse Source

Hazelcast integration fixed. Issue #66

master
Nikita 12 years ago
parent
commit
6265637e4c
  1. 1
      src/main/java/com/corundumstudio/socketio/SocketIOClient.java
  2. 10
      src/main/java/com/corundumstudio/socketio/parser/Packet.java
  3. 16
      src/main/java/com/corundumstudio/socketio/store/HazelcastStoreFactory.java
  4. 13
      src/main/java/com/corundumstudio/socketio/store/PubSubHazelcastStore.java
  5. 2
      src/main/java/com/corundumstudio/socketio/store/PubSubMemoryStore.java
  6. 5
      src/main/java/com/corundumstudio/socketio/store/PubSubRedisStore.java
  7. 8
      src/main/java/com/corundumstudio/socketio/store/RedisStoreFactory.java
  8. 6
      src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java
  9. 2
      src/main/java/com/corundumstudio/socketio/store/pubsub/DispatchMessage.java
  10. 2
      src/main/java/com/corundumstudio/socketio/store/pubsub/JoinLeaveMessage.java
  11. 6
      src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubMessage.java
  12. 2
      src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java

1
src/main/java/com/corundumstudio/socketio/SocketIOClient.java

@ -16,7 +16,6 @@
package com.corundumstudio.socketio;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.UUID;

10
src/main/java/com/corundumstudio/socketio/parser/Packet.java

@ -15,15 +15,17 @@
*/
package com.corundumstudio.socketio.parser;
import io.netty.util.CharsetUtil;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.List;
public class Packet {
public class Packet implements Serializable {
private static final long serialVersionUID = 4560159536486711426L;
public static final char DELIMITER = '\ufffd';
public static final byte[] DELIMITER_BYTES = new String(new char[] {DELIMITER}).getBytes(CharsetUtil.UTF_8);
public static final byte[] DELIMITER_BYTES = new String(new char[] {DELIMITER}).getBytes(Charset.forName("UTF-8"));
public static final byte SEPARATOR = ':';
public static final String ACK_DATA = "data";

16
src/main/java/com/corundumstudio/socketio/store/HazelcastStoreFactory.java

@ -19,16 +19,22 @@ import java.util.UUID;
import com.corundumstudio.socketio.store.pubsub.BaseStoreFactory;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.core.HazelcastInstance;
public class HazelcastStoreFactory extends BaseStoreFactory {
private HazelcastInstance hazelcastClient;
private HazelcastInstance hazelcastPub;
private HazelcastInstance hazelcastSub;
private final HazelcastInstance hazelcastClient;
private final HazelcastInstance hazelcastPub;
private final HazelcastInstance hazelcastSub;
public HazelcastStoreFactory() {
hazelcastClient = HazelcastClient.newHazelcastClient();
hazelcastPub = hazelcastClient;
hazelcastSub = hazelcastClient;
}
public HazelcastStoreFactory(HazelcastInstance hazelcastClient, HazelcastInstance hazelcastPub, HazelcastInstance hazelcastSub) {
super();
this.hazelcastClient = hazelcastClient;
this.hazelcastPub = hazelcastPub;
this.hazelcastSub = hazelcastSub;
@ -46,7 +52,7 @@ public class HazelcastStoreFactory extends BaseStoreFactory {
@Override
public PubSubStore getPubSubStore() {
return new PubSubHazelcastStore(hazelcastPub, hazelcastSub);
return new PubSubHazelcastStore(hazelcastPub, hazelcastSub, getNodeId());
}
}

13
src/main/java/com/corundumstudio/socketio/store/PubSubHazelcastStore.java

@ -33,27 +33,34 @@ public class PubSubHazelcastStore implements PubSubStore {
private final HazelcastInstance hazelcastPub;
private final HazelcastInstance hazelcastSub;
private final Long nodeId;
private final ConcurrentMap<String, Queue<String>> map =
new ConcurrentHashMap<String, Queue<String>>();
public PubSubHazelcastStore(HazelcastInstance hazelcastPub, HazelcastInstance hazelcastSub) {
public PubSubHazelcastStore(HazelcastInstance hazelcastPub, HazelcastInstance hazelcastSub, Long nodeId) {
this.hazelcastPub = hazelcastPub;
this.hazelcastSub = hazelcastSub;
this.nodeId = nodeId;
}
@Override
public void publish(String name, PubSubMessage msg) {
msg.setNodeId(nodeId);
hazelcastPub.getTopic(name).publish(msg);
}
@Override
public <T> void subscribe(String name, final PubSubListener<T> listener, Class<T> clazz) {
public <T extends PubSubMessage> void subscribe(String name, final PubSubListener<T> listener, Class<T> clazz) {
ITopic<T> topic = hazelcastSub.getTopic(name);
String regId = topic.addMessageListener(new MessageListener<T>() {
@Override
public void onMessage(Message<T> message) {
listener.onMessage(message.getMessageObject());
PubSubMessage msg = message.getMessageObject();
System.out.println("current nodeId: " + nodeId + " msg: " + msg.getNodeId());
if (!nodeId.equals(msg.getNodeId())) {
listener.onMessage(message.getMessageObject());
}
}
});

2
src/main/java/com/corundumstudio/socketio/store/PubSubMemoryStore.java

@ -26,7 +26,7 @@ public class PubSubMemoryStore implements PubSubStore {
}
@Override
public <T> void subscribe(String name, PubSubListener<T> listener, Class<T> clazz) {
public <T extends PubSubMessage> void subscribe(String name, PubSubListener<T> listener, Class<T> clazz) {
}
@Override

5
src/main/java/com/corundumstudio/socketio/store/PubSubRedisStore.java

@ -127,7 +127,7 @@ public class PubSubRedisStore implements PubSubStore {
}
@Override
public <T> void subscribe(final String name, PubSubListener<T> listener, Class<T> clazz) {
public <T extends PubSubMessage> void subscribe(final String name, PubSubListener<T> listener, Class<T> clazz) {
Queue<PubSubListener> list = map.get(name);
if (list == null) {
list = new ConcurrentLinkedQueue<PubSubListener>();
@ -151,9 +151,6 @@ public class PubSubRedisStore implements PubSubStore {
}
}
});
s.acquireUninterruptibly();
s.release();
}
@Override

8
src/main/java/com/corundumstudio/socketio/store/RedisStoreFactory.java

@ -27,11 +27,9 @@ import com.corundumstudio.socketio.transport.MainBaseClient;
public class RedisStoreFactory extends BaseStoreFactory {
private final Long nodeId = (long) (Math.random() * 1000000);
private Jedis redisClient = new Jedis("127.0.0.1", 6379);
private Jedis redisPub = redisClient;
private Jedis redisSub = redisClient;
private Jedis redisPub = new Jedis("127.0.0.1", 6379);
private Jedis redisSub = new Jedis("127.0.0.1", 6379);
private PubSubRedisStore pubSubRedisStore;
@ -46,7 +44,7 @@ public class RedisStoreFactory extends BaseStoreFactory {
@Override
public void init(NamespacesHub namespacesHub, JsonSupport jsonSupport) {
pubSubRedisStore = new PubSubRedisStore(redisPub, redisSub, nodeId, jsonSupport);
pubSubRedisStore = new PubSubRedisStore(redisPub, redisSub, getNodeId(), jsonSupport);
redisClient.connect();
redisPub.connect();

6
src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java

@ -22,6 +22,12 @@ import com.corundumstudio.socketio.transport.MainBaseClient;
public abstract class BaseStoreFactory implements StoreFactory {
private Long nodeId = (long) (Math.random() * 1000000);
protected Long getNodeId() {
return nodeId;
}
public void init(final NamespacesHub namespacesHub, JsonSupport jsonSupport) {
getPubSubStore().subscribe(PubSubStore.DISPATCH, new PubSubListener<DispatchMessage>() {
@Override

2
src/main/java/com/corundumstudio/socketio/store/pubsub/DispatchMessage.java

@ -19,6 +19,8 @@ import com.corundumstudio.socketio.parser.Packet;
public class DispatchMessage extends PubSubMessage {
private static final long serialVersionUID = 6692047718303934349L;
private String room;
private Packet packet;

2
src/main/java/com/corundumstudio/socketio/store/pubsub/JoinLeaveMessage.java

@ -19,6 +19,8 @@ import java.util.UUID;
public class JoinLeaveMessage extends PubSubMessage {
private static final long serialVersionUID = -944515928988033174L;
private UUID sessionId;
private String room;

6
src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubMessage.java

@ -15,7 +15,11 @@
*/
package com.corundumstudio.socketio.store.pubsub;
public abstract class PubSubMessage {
import java.io.Serializable;
public abstract class PubSubMessage implements Serializable {
private static final long serialVersionUID = -8789343104393884987L;
private Long nodeId;

2
src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java

@ -27,7 +27,7 @@ public interface PubSubStore {
void publish(String name, PubSubMessage msg);
<T> void subscribe(String name, PubSubListener<T> listener, Class<T> clazz);
<T extends PubSubMessage> void subscribe(String name, PubSubListener<T> listener, Class<T> clazz);
void unsubscribe(String name);

Loading…
Cancel
Save