|
|
@ -53,12 +53,12 @@ public class RedissonPubSubStore implements PubSubStore { |
|
|
|
@Override |
|
|
|
public <T extends PubSubMessage> void subscribe(PubSubType type, final PubSubListener<T> listener, Class<T> clazz) { |
|
|
|
String name = type.toString(); |
|
|
|
RTopic<T> topic = redissonSub.getTopic(name); |
|
|
|
int regId = topic.addListener(new MessageListener<T>() { |
|
|
|
RTopic topic = redissonSub.getTopic(name); |
|
|
|
int regId = topic.addListener(PubSubMessage.class, new MessageListener<PubSubMessage>() { |
|
|
|
@Override |
|
|
|
public void onMessage(CharSequence channel, T msg) { |
|
|
|
public void onMessage(CharSequence channel, PubSubMessage msg) { |
|
|
|
if (!nodeId.equals(msg.getNodeId())) { |
|
|
|
listener.onMessage(msg); |
|
|
|
listener.onMessage((T)msg); |
|
|
|
} |
|
|
|
} |
|
|
|
}); |
|
|
@ -78,7 +78,7 @@ public class RedissonPubSubStore implements PubSubStore { |
|
|
|
public void unsubscribe(PubSubType type) { |
|
|
|
String name = type.toString(); |
|
|
|
Queue<Integer> regIds = map.remove(name); |
|
|
|
RTopic<Object> topic = redissonSub.getTopic(name); |
|
|
|
RTopic topic = redissonSub.getTopic(name); |
|
|
|
for (Integer id : regIds) { |
|
|
|
topic.removeListener(id); |
|
|
|
} |
|
|
|