Browse Source

hazelcast support added. Issue #66

master
Nikita 12 years ago
parent
commit
c3e69c7202
  1. 6
      pom.xml
  2. 52
      src/main/java/com/corundumstudio/socketio/store/HazelcastStore.java
  3. 52
      src/main/java/com/corundumstudio/socketio/store/HazelcastStoreFactory.java
  4. 84
      src/main/java/com/corundumstudio/socketio/store/PubSubHazelcastStore.java
  5. 2
      src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubMessage.java

6
pom.xml

@ -159,6 +159,12 @@
<version>2.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast-client</artifactId>
<version>3.1.3</version>
<scope>provided</scope>
</dependency>
</dependencies>

52
src/main/java/com/corundumstudio/socketio/store/HazelcastStore.java

@ -0,0 +1,52 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.store;
import java.util.UUID;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
public class HazelcastStore implements Store {
private final IMap<String, String> map;
public HazelcastStore(UUID sessionId, HazelcastInstance hazelcastInstance) {
map = hazelcastInstance.getMap(sessionId.toString());
}
@Override
public void set(String key, String val) {
map.put(key, val);
}
@Override
public String get(String key) {
return map.get(key);
}
@Override
public boolean has(String key) {
return map.containsKey(key);
}
@Override
public void del(String key) {
map.delete(key);
}
}

52
src/main/java/com/corundumstudio/socketio/store/HazelcastStoreFactory.java

@ -0,0 +1,52 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.store;
import java.util.UUID;
import com.corundumstudio.socketio.store.pubsub.BaseStoreFactory;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.hazelcast.core.HazelcastInstance;
public class HazelcastStoreFactory extends BaseStoreFactory {
private HazelcastInstance hazelcastClient;
private HazelcastInstance hazelcastPub;
private HazelcastInstance hazelcastSub;
public HazelcastStoreFactory(HazelcastInstance hazelcastClient, HazelcastInstance hazelcastPub, HazelcastInstance hazelcastSub) {
super();
this.hazelcastClient = hazelcastClient;
this.hazelcastPub = hazelcastPub;
this.hazelcastSub = hazelcastSub;
}
@Override
public Store create(UUID sessionId) {
return new HazelcastStore(sessionId, hazelcastClient);
}
@Override
public void shutdown() {
hazelcastClient.shutdown();
}
@Override
public PubSubStore getPubSubStore() {
return new PubSubHazelcastStore(hazelcastPub, hazelcastSub);
}
}

84
src/main/java/com/corundumstudio/socketio/store/PubSubHazelcastStore.java

@ -0,0 +1,84 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.store;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
public class PubSubHazelcastStore implements PubSubStore {
private final HazelcastInstance hazelcastPub;
private final HazelcastInstance hazelcastSub;
private final ConcurrentMap<String, Queue<String>> map =
new ConcurrentHashMap<String, Queue<String>>();
public PubSubHazelcastStore(HazelcastInstance hazelcastPub, HazelcastInstance hazelcastSub) {
this.hazelcastPub = hazelcastPub;
this.hazelcastSub = hazelcastSub;
}
@Override
public void publish(String name, PubSubMessage msg) {
hazelcastPub.getTopic(name).publish(msg);
}
@Override
public <T> void subscribe(String name, final PubSubListener<T> listener, Class<T> clazz) {
ITopic<T> topic = hazelcastSub.getTopic(name);
String regId = topic.addMessageListener(new MessageListener<T>() {
@Override
public void onMessage(Message<T> message) {
listener.onMessage(message.getMessageObject());
}
});
Queue<String> list = map.get(name);
if (list == null) {
list = new ConcurrentLinkedQueue<String>();
Queue<String> oldList = map.putIfAbsent(name, list);
if (oldList != null) {
list = oldList;
}
}
list.add(regId);
}
@Override
public void unsubscribe(String name) {
Queue<String> regIds = map.remove(name);
ITopic<Object> topic = hazelcastSub.getTopic(name);
for (String id : regIds) {
topic.removeMessageListener(id);
}
}
@Override
public void shutdown() {
}
}

2
src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubMessage.java

@ -15,7 +15,7 @@
*/
package com.corundumstudio.socketio.store.pubsub;
public class PubSubMessage {
public abstract class PubSubMessage {
private Long nodeId;

Loading…
Cancel
Save