diff --git a/pom.xml b/pom.xml index 183041e..4885944 100644 --- a/pom.xml +++ b/pom.xml @@ -159,6 +159,12 @@ 2.2.1 provided + + com.hazelcast + hazelcast-client + 3.1.3 + provided + diff --git a/src/main/java/com/corundumstudio/socketio/store/HazelcastStore.java b/src/main/java/com/corundumstudio/socketio/store/HazelcastStore.java new file mode 100644 index 0000000..04e2e09 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/store/HazelcastStore.java @@ -0,0 +1,52 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio.store; + +import java.util.UUID; + +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.IMap; + + +public class HazelcastStore implements Store { + + private final IMap map; + + public HazelcastStore(UUID sessionId, HazelcastInstance hazelcastInstance) { + map = hazelcastInstance.getMap(sessionId.toString()); + } + + @Override + public void set(String key, String val) { + map.put(key, val); + } + + @Override + public String get(String key) { + return map.get(key); + } + + @Override + public boolean has(String key) { + return map.containsKey(key); + } + + @Override + public void del(String key) { + map.delete(key); + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/store/HazelcastStoreFactory.java b/src/main/java/com/corundumstudio/socketio/store/HazelcastStoreFactory.java new file mode 100644 index 0000000..5aef1d8 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/store/HazelcastStoreFactory.java @@ -0,0 +1,52 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio.store; + +import java.util.UUID; + +import com.corundumstudio.socketio.store.pubsub.BaseStoreFactory; +import com.corundumstudio.socketio.store.pubsub.PubSubStore; +import com.hazelcast.core.HazelcastInstance; + +public class HazelcastStoreFactory extends BaseStoreFactory { + + private HazelcastInstance hazelcastClient; + private HazelcastInstance hazelcastPub; + private HazelcastInstance hazelcastSub; + + public HazelcastStoreFactory(HazelcastInstance hazelcastClient, HazelcastInstance hazelcastPub, HazelcastInstance hazelcastSub) { + super(); + this.hazelcastClient = hazelcastClient; + this.hazelcastPub = hazelcastPub; + this.hazelcastSub = hazelcastSub; + } + + @Override + public Store create(UUID sessionId) { + return new HazelcastStore(sessionId, hazelcastClient); + } + + @Override + public void shutdown() { + hazelcastClient.shutdown(); + } + + @Override + public PubSubStore getPubSubStore() { + return new PubSubHazelcastStore(hazelcastPub, hazelcastSub); + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/store/PubSubHazelcastStore.java b/src/main/java/com/corundumstudio/socketio/store/PubSubHazelcastStore.java new file mode 100644 index 0000000..15ab7a6 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/store/PubSubHazelcastStore.java @@ -0,0 +1,84 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio.store; + +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; + +import com.corundumstudio.socketio.store.pubsub.PubSubListener; +import com.corundumstudio.socketio.store.pubsub.PubSubMessage; +import com.corundumstudio.socketio.store.pubsub.PubSubStore; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.ITopic; +import com.hazelcast.core.Message; +import com.hazelcast.core.MessageListener; + + +public class PubSubHazelcastStore implements PubSubStore { + + private final HazelcastInstance hazelcastPub; + private final HazelcastInstance hazelcastSub; + + private final ConcurrentMap> map = + new ConcurrentHashMap>(); + + public PubSubHazelcastStore(HazelcastInstance hazelcastPub, HazelcastInstance hazelcastSub) { + this.hazelcastPub = hazelcastPub; + this.hazelcastSub = hazelcastSub; + } + + @Override + public void publish(String name, PubSubMessage msg) { + hazelcastPub.getTopic(name).publish(msg); + } + + @Override + public void subscribe(String name, final PubSubListener listener, Class clazz) { + ITopic topic = hazelcastSub.getTopic(name); + String regId = topic.addMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + listener.onMessage(message.getMessageObject()); + } + }); + + Queue list = map.get(name); + if (list == null) { + list = new ConcurrentLinkedQueue(); + Queue oldList = map.putIfAbsent(name, list); + if (oldList != null) { + list = oldList; + } + } + list.add(regId); + } + + @Override + public void unsubscribe(String name) { + Queue regIds = map.remove(name); + ITopic topic = hazelcastSub.getTopic(name); + for (String id : regIds) { + topic.removeMessageListener(id); + } + } + + @Override + public void shutdown() { + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubMessage.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubMessage.java index 73ddf07..dbba38a 100644 --- a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubMessage.java +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubMessage.java @@ -15,7 +15,7 @@ */ package com.corundumstudio.socketio.store.pubsub; -public class PubSubMessage { +public abstract class PubSubMessage { private Long nodeId;