From 7dc89b56d798c57acc6d9f82382dbf3272ffde39 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 25 Sep 2012 20:44:49 +0400 Subject: [PATCH] BroadcastAckCallback introduced. Some checkstyle errors fixed. --- .../corundumstudio/socketio/AckCallback.java | 4 +- .../corundumstudio/socketio/AckManager.java | 8 +- .../socketio/BroadcastAckCallback.java | 76 +++++++++++++++++ .../socketio/BroadcastOperations.java | 83 +++---------------- .../socketio/ClientOperations.java | 33 -------- .../socketio/SocketIOClient.java | 35 ++++++++ .../socketio/SocketIONamespace.java | 2 +- .../socketio/SocketIOServer.java | 2 +- .../socketio/namespace/Namespace.java | 4 +- .../socketio/parser/Decoder.java | 1 - .../transport/FlashPolicyHandler.java | 5 +- .../socketio/transport/NamespaceClient.java | 8 +- .../transport/XHRPollingTransport.java | 11 +-- 13 files changed, 143 insertions(+), 129 deletions(-) create mode 100644 src/main/java/com/corundumstudio/socketio/BroadcastAckCallback.java diff --git a/src/main/java/com/corundumstudio/socketio/AckCallback.java b/src/main/java/com/corundumstudio/socketio/AckCallback.java index 86a3ebd..12677cc 100644 --- a/src/main/java/com/corundumstudio/socketio/AckCallback.java +++ b/src/main/java/com/corundumstudio/socketio/AckCallback.java @@ -25,8 +25,8 @@ package com.corundumstudio.socketio; */ public abstract class AckCallback { - private Class resultClass; - private int timeout = -1; + protected Class resultClass; + protected int timeout = -1; public AckCallback(Class resultClass) { this.resultClass = resultClass; diff --git a/src/main/java/com/corundumstudio/socketio/AckManager.java b/src/main/java/com/corundumstudio/socketio/AckManager.java index 688bcd3..9c3a4e7 100644 --- a/src/main/java/com/corundumstudio/socketio/AckManager.java +++ b/src/main/java/com/corundumstudio/socketio/AckManager.java @@ -34,20 +34,20 @@ public class AckManager implements Disconnectable { class AckEntry { - final Map ackCallbacks = new ConcurrentHashMap(); + final Map> ackCallbacks = new ConcurrentHashMap>(); final AtomicLong ackIndex = new AtomicLong(-1); - public long addAckCallback(AckCallback callback) { + public long addAckCallback(AckCallback callback) { long index = ackIndex.incrementAndGet(); ackCallbacks.put(index, callback); return index; } - public AckCallback getAckCallback(long index) { + public AckCallback getAckCallback(long index) { return ackCallbacks.get(index); } - public AckCallback removeCallback(long index) { + public AckCallback removeCallback(long index) { return ackCallbacks.remove(index); } diff --git a/src/main/java/com/corundumstudio/socketio/BroadcastAckCallback.java b/src/main/java/com/corundumstudio/socketio/BroadcastAckCallback.java new file mode 100644 index 0000000..e75438a --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/BroadcastAckCallback.java @@ -0,0 +1,76 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class BroadcastAckCallback { + + final AtomicBoolean loopFinished = new AtomicBoolean(); + final AtomicInteger counter = new AtomicInteger(); + final AtomicBoolean successExecuted = new AtomicBoolean(); + final Class resultClass; + + public BroadcastAckCallback(Class resultClass) { + this.resultClass = resultClass; + } + + final AckCallback createClientCallback(final SocketIOClient client) { + counter.getAndIncrement(); + return new AckCallback(resultClass) { + @Override + public void onSuccess(T result) { + counter.getAndDecrement(); + onClientSuccess(client, result); + executeSuccess(); + } + + @Override + public void onTimeout() { + onClientTimeout(client); + } + + }; + } + + protected void onClientTimeout(SocketIOClient client) { + + } + + protected void onClientSuccess(SocketIOClient client, T result) { + + } + + protected void onAllSuccess() { + + } + + private void executeSuccess() { + if (loopFinished.get() + && counter.get() == 0 + && successExecuted.compareAndSet(false, true)) { + onAllSuccess(); + } + } + + void loopFinished() { + loopFinished.set(true); + executeSuccess(); + } + +} + diff --git a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java index 8e676f5..7f0d3d7 100644 --- a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java +++ b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java @@ -15,57 +15,10 @@ */ package com.corundumstudio.socketio; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - import com.corundumstudio.socketio.parser.Packet; public class BroadcastOperations implements ClientOperations { - private class BroadcastAckCallback extends AckCallback { - - final AtomicBoolean loopFinished = new AtomicBoolean(); - final AtomicInteger counter = new AtomicInteger(); - final AtomicBoolean timeoutExecuted = new AtomicBoolean(); - final AtomicBoolean successExecuted = new AtomicBoolean(); - final AckCallback ackCallback; - - public BroadcastAckCallback(AckCallback ackCallback) { - super(Void.class); - this.ackCallback = ackCallback; - } - - @Override - public void onSuccess(Object result) { - counter.getAndDecrement(); - executeSuccess(); - } - - private void executeSuccess() { - if (loopFinished.get() && counter.get() == 0 && successExecuted.compareAndSet(false, true)) { - ackCallback.onSuccess(null); - } - } - - @Override - public void onTimeout() { - // execute onTimeout once - if (timeoutExecuted.compareAndSet(false, true)) { - ackCallback.onTimeout(); - } - } - - void incrementCounter() { - counter.getAndIncrement(); - } - - void loopFinished() { - loopFinished.set(true); - executeSuccess(); - } - - } - private final Iterable clients; public BroadcastOperations(Iterable clients) { @@ -80,14 +33,11 @@ public class BroadcastOperations implements ClientOperations { } } - @Override - public void sendMessage(String message, AckCallback ackCallback) { - BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback); + public void sendMessage(String message, BroadcastAckCallback ackCallback) { for (SocketIOClient client : clients) { - clientCallback.incrementCounter(); - client.sendMessage(message, clientCallback); + client.sendMessage(message, ackCallback.createClientCallback(client)); } - clientCallback.loopFinished(); + ackCallback.loopFinished(); } @Override @@ -97,14 +47,11 @@ public class BroadcastOperations implements ClientOperations { } } - @Override - public void sendJsonObject(Object object, AckCallback ackCallback) { - BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback); + public void sendJsonObject(Object object, BroadcastAckCallback ackCallback) { for (SocketIOClient client : clients) { - clientCallback.incrementCounter(); - client.sendJsonObject(object, clientCallback); + client.sendJsonObject(object, ackCallback.createClientCallback(client)); } - clientCallback.loopFinished(); + ackCallback.loopFinished(); } @Override @@ -114,14 +61,11 @@ public class BroadcastOperations implements ClientOperations { } } - @Override - public void send(Packet packet, AckCallback ackCallback) { - BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback); + public void send(Packet packet, BroadcastAckCallback ackCallback) { for (SocketIOClient client : clients) { - clientCallback.incrementCounter(); - client.send(packet, ackCallback); + client.send(packet, ackCallback.createClientCallback(client)); } - clientCallback.loopFinished(); + ackCallback.loopFinished(); } @Override @@ -138,14 +82,11 @@ public class BroadcastOperations implements ClientOperations { } } - @Override - public void sendEvent(String name, Object data, AckCallback ackCallback) { - BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback); + public void sendEvent(String name, Object data, BroadcastAckCallback ackCallback) { for (SocketIOClient client : clients) { - clientCallback.incrementCounter(); - client.sendEvent(name, data, ackCallback); + client.sendEvent(name, data, ackCallback.createClientCallback(client)); } - clientCallback.loopFinished(); + ackCallback.loopFinished(); } } diff --git a/src/main/java/com/corundumstudio/socketio/ClientOperations.java b/src/main/java/com/corundumstudio/socketio/ClientOperations.java index e3143d9..0732956 100644 --- a/src/main/java/com/corundumstudio/socketio/ClientOperations.java +++ b/src/main/java/com/corundumstudio/socketio/ClientOperations.java @@ -30,14 +30,6 @@ public interface ClientOperations { */ void sendMessage(String message); - /** - * Send message with ack callback - * - * @param message - message to send - * @param ackCallback - ack callback - */ - void sendMessage(String message, AckCallback ackCallback); - /** * Send object. Object will be encoded to json-format. * @@ -45,14 +37,6 @@ public interface ClientOperations { */ void sendJsonObject(Object object); - /** - * Send object with ack callback - * - * @param object - object to send - * @param ackCallback - ack callback - */ - void sendJsonObject(Object object, AckCallback ackCallback); - /** * Send packet * @@ -60,14 +44,6 @@ public interface ClientOperations { */ void send(Packet packet); - /** - * Send packet with ack callback - * - * @param packet - packet to send - * @param ackCallback - ack callback - */ - void send(Packet packet, AckCallback ackCallback); - /** * Disconnect client * @@ -82,13 +58,4 @@ public interface ClientOperations { */ void sendEvent(String name, Object data); - /** - * Send event with ack callback - * - * @param name - event name - * @param data - event data - * @param ackCallback - ack callback - */ - void sendEvent(String name, Object data, AckCallback ackCallback); - } diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOClient.java b/src/main/java/com/corundumstudio/socketio/SocketIOClient.java index 0e112ab..d11b9b6 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOClient.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOClient.java @@ -18,9 +18,44 @@ package com.corundumstudio.socketio; import java.net.SocketAddress; import java.util.UUID; +import com.corundumstudio.socketio.parser.Packet; + public interface SocketIOClient extends ClientOperations { + /** + * Send event with ack callback + * + * @param name - event name + * @param data - event data + * @param ackCallback - ack callback + */ + void sendEvent(String name, Object data, AckCallback ackCallback); + + /** + * Send packet with ack callback + * + * @param packet - packet to send + * @param ackCallback - ack callback + */ + void send(Packet packet, AckCallback ackCallback); + + /** + * Send object with ack callback + * + * @param object - object to send + * @param ackCallback - ack callback + */ + void sendJsonObject(Object object, AckCallback ackCallback); + + /** + * Send message with ack callback + * + * @param message - message to send + * @param ackCallback - ack callback + */ + void sendMessage(String message, AckCallback ackCallback); + /** * Client namespace * diff --git a/src/main/java/com/corundumstudio/socketio/SocketIONamespace.java b/src/main/java/com/corundumstudio/socketio/SocketIONamespace.java index a27cc08..6d229aa 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIONamespace.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIONamespace.java @@ -19,6 +19,6 @@ import com.corundumstudio.socketio.listener.ClientListeners; public interface SocketIONamespace extends ClientListeners { - ClientOperations getBroadcastOperations(); + BroadcastOperations getBroadcastOperations(); } diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java index d8564a0..bd6ee50 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java @@ -65,7 +65,7 @@ public class SocketIOServer implements ClientListeners { return pipelineFactory.getAllClients(); } - public ClientOperations getBroadcastOperations() { + public BroadcastOperations getBroadcastOperations() { return new BroadcastOperations(pipelineFactory.getAllClients()); } diff --git a/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java b/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java index d0cc4a2..b540949 100644 --- a/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java +++ b/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java @@ -24,14 +24,12 @@ import java.util.concurrent.ConcurrentMap; import com.corundumstudio.socketio.AckRequest; import com.corundumstudio.socketio.BroadcastOperations; -import com.corundumstudio.socketio.ClientOperations; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.SocketIONamespace; import com.corundumstudio.socketio.listener.ConnectListener; import com.corundumstudio.socketio.listener.DataListener; import com.corundumstudio.socketio.listener.DisconnectListener; import com.corundumstudio.socketio.parser.JsonSupport; -import com.corundumstudio.socketio.transport.NamespaceClient; public class Namespace implements SocketIONamespace { @@ -153,7 +151,7 @@ public class Namespace implements SocketIONamespace { } @Override - public ClientOperations getBroadcastOperations() { + public BroadcastOperations getBroadcastOperations() { return new BroadcastOperations(clients); } diff --git a/src/main/java/com/corundumstudio/socketio/parser/Decoder.java b/src/main/java/com/corundumstudio/socketio/parser/Decoder.java index 5c27f48..b73f781 100644 --- a/src/main/java/com/corundumstudio/socketio/parser/Decoder.java +++ b/src/main/java/com/corundumstudio/socketio/parser/Decoder.java @@ -16,7 +16,6 @@ package com.corundumstudio.socketio.parser; import java.io.IOException; -import java.util.List; import java.util.UUID; import org.jboss.netty.buffer.ChannelBuffer; diff --git a/src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java b/src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java index 941e7c0..8a2653f 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java +++ b/src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java @@ -19,14 +19,12 @@ import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandler.Sharable; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.channel.ChannelHandler.Sharable; import org.jboss.netty.util.CharsetUtil; -import com.corundumstudio.socketio.SocketIOPipelineFactory; - @Sharable public class FlashPolicyHandler extends SimpleChannelUpstreamHandler { @@ -47,7 +45,6 @@ public class FlashPolicyHandler extends SimpleChannelUpstreamHandler { if (data.equals(requestBuffer)) { ChannelFuture f = e.getChannel().write(responseBuffer); f.addListener(ChannelFutureListener.CLOSE); - ctx.getPipeline().remove(SocketIOPipelineFactory.FLASH_POLICY_HANDLER); return; } super.messageReceived(ctx, e); diff --git a/src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java b/src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java index 7ec94d4..26f677b 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java +++ b/src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java @@ -53,7 +53,7 @@ public class NamespaceClient implements SocketIOClient { } @Override - public void sendEvent(String name, Object data, AckCallback ackCallback) { + public void sendEvent(String name, Object data, AckCallback ackCallback) { Packet packet = new Packet(PacketType.EVENT); packet.setName(name); packet.setArgs(Collections.singletonList(data)); @@ -61,7 +61,7 @@ public class NamespaceClient implements SocketIOClient { } @Override - public void sendMessage(String message, AckCallback ackCallback) { + public void sendMessage(String message, AckCallback ackCallback) { Packet packet = new Packet(PacketType.MESSAGE); packet.setData(message); send(packet, ackCallback); @@ -82,7 +82,7 @@ public class NamespaceClient implements SocketIOClient { } @Override - public void send(Packet packet, AckCallback ackCallback) { + public void send(Packet packet, AckCallback ackCallback) { long index = baseClient.getAckManager().registerAck(getSessionId(), ackCallback); packet.setId(index); if (!ackCallback.getResultClass().equals(Void.class)) { @@ -98,7 +98,7 @@ public class NamespaceClient implements SocketIOClient { } @Override - public void sendJsonObject(Object object, AckCallback ackCallback) { + public void sendJsonObject(Object object, AckCallback ackCallback) { Packet packet = new Packet(PacketType.JSON); packet.setData(object); send(packet, ackCallback); diff --git a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java index a0f3248..1ed9902 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java @@ -100,7 +100,8 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements ctx.sendUpstream(e); } - private void handleMessage(HttpRequest req, QueryStringDecoder queryDecoder, Channel channel) throws IOException { + private void handleMessage(HttpRequest req, QueryStringDecoder queryDecoder, Channel channel) + throws IOException { channel.getPipeline().remove(SocketIOPipelineFactory.FLASH_POLICY_HANDLER); String[] parts = queryDecoder.getPath().split("/"); @@ -159,7 +160,8 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements }); } - private void onPost(UUID sessionId, Channel channel, String origin, ChannelBuffer content) throws IOException { + private void onPost(UUID sessionId, Channel channel, String origin, ChannelBuffer content) + throws IOException { XHRPollingClient client = sessionId2Client.get(sessionId); if (client == null) { log.debug("Client with sessionId: {} was already disconnected. Channel closed!", sessionId); @@ -177,7 +179,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements return; } - XHRPollingClient client = (XHRPollingClient)sessionId2Client.get(sessionId); + XHRPollingClient client = (XHRPollingClient) sessionId2Client.get(sessionId); if (client == null) { client = createClient(origin, channel, sessionId); } @@ -210,8 +212,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements @Override public void onDisconnect(BaseClient client) { if (client instanceof XHRPollingClient) { - XHRPollingClient xhrClient = (XHRPollingClient) client; - UUID sessionId = xhrClient.getSessionId(); + UUID sessionId = client.getSessionId(); sessionId2Client.remove(sessionId); SchedulerKey noopKey = new SchedulerKey(Type.POLLING, sessionId);