Browse Source

BroadcastAckCallback introduced.

Some checkstyle errors fixed.
master
Nikita 13 years ago
parent
commit
7dc89b56d7
  1. 4
      src/main/java/com/corundumstudio/socketio/AckCallback.java
  2. 8
      src/main/java/com/corundumstudio/socketio/AckManager.java
  3. 76
      src/main/java/com/corundumstudio/socketio/BroadcastAckCallback.java
  4. 83
      src/main/java/com/corundumstudio/socketio/BroadcastOperations.java
  5. 33
      src/main/java/com/corundumstudio/socketio/ClientOperations.java
  6. 35
      src/main/java/com/corundumstudio/socketio/SocketIOClient.java
  7. 2
      src/main/java/com/corundumstudio/socketio/SocketIONamespace.java
  8. 2
      src/main/java/com/corundumstudio/socketio/SocketIOServer.java
  9. 4
      src/main/java/com/corundumstudio/socketio/namespace/Namespace.java
  10. 1
      src/main/java/com/corundumstudio/socketio/parser/Decoder.java
  11. 5
      src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java
  12. 8
      src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java
  13. 11
      src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

4
src/main/java/com/corundumstudio/socketio/AckCallback.java

@ -25,8 +25,8 @@ package com.corundumstudio.socketio;
*/
public abstract class AckCallback<T> {
private Class<T> resultClass;
private int timeout = -1;
protected Class<T> resultClass;
protected int timeout = -1;
public AckCallback(Class<T> resultClass) {
this.resultClass = resultClass;

8
src/main/java/com/corundumstudio/socketio/AckManager.java

@ -34,20 +34,20 @@ public class AckManager implements Disconnectable {
class AckEntry {
final Map<Long, AckCallback> ackCallbacks = new ConcurrentHashMap<Long, AckCallback>();
final Map<Long, AckCallback<?>> ackCallbacks = new ConcurrentHashMap<Long, AckCallback<?>>();
final AtomicLong ackIndex = new AtomicLong(-1);
public long addAckCallback(AckCallback callback) {
public long addAckCallback(AckCallback<?> callback) {
long index = ackIndex.incrementAndGet();
ackCallbacks.put(index, callback);
return index;
}
public AckCallback getAckCallback(long index) {
public AckCallback<?> getAckCallback(long index) {
return ackCallbacks.get(index);
}
public AckCallback removeCallback(long index) {
public AckCallback<?> removeCallback(long index) {
return ackCallbacks.remove(index);
}

76
src/main/java/com/corundumstudio/socketio/BroadcastAckCallback.java

@ -0,0 +1,76 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class BroadcastAckCallback<T> {
final AtomicBoolean loopFinished = new AtomicBoolean();
final AtomicInteger counter = new AtomicInteger();
final AtomicBoolean successExecuted = new AtomicBoolean();
final Class<T> resultClass;
public BroadcastAckCallback(Class<T> resultClass) {
this.resultClass = resultClass;
}
final AckCallback<T> createClientCallback(final SocketIOClient client) {
counter.getAndIncrement();
return new AckCallback<T>(resultClass) {
@Override
public void onSuccess(T result) {
counter.getAndDecrement();
onClientSuccess(client, result);
executeSuccess();
}
@Override
public void onTimeout() {
onClientTimeout(client);
}
};
}
protected void onClientTimeout(SocketIOClient client) {
}
protected void onClientSuccess(SocketIOClient client, T result) {
}
protected void onAllSuccess() {
}
private void executeSuccess() {
if (loopFinished.get()
&& counter.get() == 0
&& successExecuted.compareAndSet(false, true)) {
onAllSuccess();
}
}
void loopFinished() {
loopFinished.set(true);
executeSuccess();
}
}

83
src/main/java/com/corundumstudio/socketio/BroadcastOperations.java

@ -15,57 +15,10 @@
*/
package com.corundumstudio.socketio;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.corundumstudio.socketio.parser.Packet;
public class BroadcastOperations implements ClientOperations {
private class BroadcastAckCallback extends AckCallback {
final AtomicBoolean loopFinished = new AtomicBoolean();
final AtomicInteger counter = new AtomicInteger();
final AtomicBoolean timeoutExecuted = new AtomicBoolean();
final AtomicBoolean successExecuted = new AtomicBoolean();
final AckCallback ackCallback;
public BroadcastAckCallback(AckCallback ackCallback) {
super(Void.class);
this.ackCallback = ackCallback;
}
@Override
public void onSuccess(Object result) {
counter.getAndDecrement();
executeSuccess();
}
private void executeSuccess() {
if (loopFinished.get() && counter.get() == 0 && successExecuted.compareAndSet(false, true)) {
ackCallback.onSuccess(null);
}
}
@Override
public void onTimeout() {
// execute onTimeout once
if (timeoutExecuted.compareAndSet(false, true)) {
ackCallback.onTimeout();
}
}
void incrementCounter() {
counter.getAndIncrement();
}
void loopFinished() {
loopFinished.set(true);
executeSuccess();
}
}
private final Iterable<SocketIOClient> clients;
public BroadcastOperations(Iterable<SocketIOClient> clients) {
@ -80,14 +33,11 @@ public class BroadcastOperations implements ClientOperations {
}
}
@Override
public void sendMessage(String message, AckCallback ackCallback) {
BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback);
public <T> void sendMessage(String message, BroadcastAckCallback<T> ackCallback) {
for (SocketIOClient client : clients) {
clientCallback.incrementCounter();
client.sendMessage(message, clientCallback);
client.sendMessage(message, ackCallback.createClientCallback(client));
}
clientCallback.loopFinished();
ackCallback.loopFinished();
}
@Override
@ -97,14 +47,11 @@ public class BroadcastOperations implements ClientOperations {
}
}
@Override
public void sendJsonObject(Object object, AckCallback ackCallback) {
BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback);
public <T> void sendJsonObject(Object object, BroadcastAckCallback<T> ackCallback) {
for (SocketIOClient client : clients) {
clientCallback.incrementCounter();
client.sendJsonObject(object, clientCallback);
client.sendJsonObject(object, ackCallback.createClientCallback(client));
}
clientCallback.loopFinished();
ackCallback.loopFinished();
}
@Override
@ -114,14 +61,11 @@ public class BroadcastOperations implements ClientOperations {
}
}
@Override
public void send(Packet packet, AckCallback ackCallback) {
BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback);
public <T> void send(Packet packet, BroadcastAckCallback<T> ackCallback) {
for (SocketIOClient client : clients) {
clientCallback.incrementCounter();
client.send(packet, ackCallback);
client.send(packet, ackCallback.createClientCallback(client));
}
clientCallback.loopFinished();
ackCallback.loopFinished();
}
@Override
@ -138,14 +82,11 @@ public class BroadcastOperations implements ClientOperations {
}
}
@Override
public void sendEvent(String name, Object data, AckCallback ackCallback) {
BroadcastAckCallback clientCallback = new BroadcastAckCallback(ackCallback);
public <T> void sendEvent(String name, Object data, BroadcastAckCallback<T> ackCallback) {
for (SocketIOClient client : clients) {
clientCallback.incrementCounter();
client.sendEvent(name, data, ackCallback);
client.sendEvent(name, data, ackCallback.createClientCallback(client));
}
clientCallback.loopFinished();
ackCallback.loopFinished();
}
}

33
src/main/java/com/corundumstudio/socketio/ClientOperations.java

@ -30,14 +30,6 @@ public interface ClientOperations {
*/
void sendMessage(String message);
/**
* Send message with ack callback
*
* @param message - message to send
* @param ackCallback - ack callback
*/
void sendMessage(String message, AckCallback ackCallback);
/**
* Send object. Object will be encoded to json-format.
*
@ -45,14 +37,6 @@ public interface ClientOperations {
*/
void sendJsonObject(Object object);
/**
* Send object with ack callback
*
* @param object - object to send
* @param ackCallback - ack callback
*/
void sendJsonObject(Object object, AckCallback ackCallback);
/**
* Send packet
*
@ -60,14 +44,6 @@ public interface ClientOperations {
*/
void send(Packet packet);
/**
* Send packet with ack callback
*
* @param packet - packet to send
* @param ackCallback - ack callback
*/
void send(Packet packet, AckCallback ackCallback);
/**
* Disconnect client
*
@ -82,13 +58,4 @@ public interface ClientOperations {
*/
void sendEvent(String name, Object data);
/**
* Send event with ack callback
*
* @param name - event name
* @param data - event data
* @param ackCallback - ack callback
*/
void sendEvent(String name, Object data, AckCallback ackCallback);
}

35
src/main/java/com/corundumstudio/socketio/SocketIOClient.java

@ -18,9 +18,44 @@ package com.corundumstudio.socketio;
import java.net.SocketAddress;
import java.util.UUID;
import com.corundumstudio.socketio.parser.Packet;
public interface SocketIOClient extends ClientOperations {
/**
* Send event with ack callback
*
* @param name - event name
* @param data - event data
* @param ackCallback - ack callback
*/
void sendEvent(String name, Object data, AckCallback<?> ackCallback);
/**
* Send packet with ack callback
*
* @param packet - packet to send
* @param ackCallback - ack callback
*/
void send(Packet packet, AckCallback<?> ackCallback);
/**
* Send object with ack callback
*
* @param object - object to send
* @param ackCallback - ack callback
*/
void sendJsonObject(Object object, AckCallback<?> ackCallback);
/**
* Send message with ack callback
*
* @param message - message to send
* @param ackCallback - ack callback
*/
void sendMessage(String message, AckCallback<?> ackCallback);
/**
* Client namespace
*

2
src/main/java/com/corundumstudio/socketio/SocketIONamespace.java

@ -19,6 +19,6 @@ import com.corundumstudio.socketio.listener.ClientListeners;
public interface SocketIONamespace extends ClientListeners {
ClientOperations getBroadcastOperations();
BroadcastOperations getBroadcastOperations();
}

2
src/main/java/com/corundumstudio/socketio/SocketIOServer.java

@ -65,7 +65,7 @@ public class SocketIOServer implements ClientListeners {
return pipelineFactory.getAllClients();
}
public ClientOperations getBroadcastOperations() {
public BroadcastOperations getBroadcastOperations() {
return new BroadcastOperations(pipelineFactory.getAllClients());
}

4
src/main/java/com/corundumstudio/socketio/namespace/Namespace.java

@ -24,14 +24,12 @@ import java.util.concurrent.ConcurrentMap;
import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.BroadcastOperations;
import com.corundumstudio.socketio.ClientOperations;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIONamespace;
import com.corundumstudio.socketio.listener.ConnectListener;
import com.corundumstudio.socketio.listener.DataListener;
import com.corundumstudio.socketio.listener.DisconnectListener;
import com.corundumstudio.socketio.parser.JsonSupport;
import com.corundumstudio.socketio.transport.NamespaceClient;
public class Namespace implements SocketIONamespace {
@ -153,7 +151,7 @@ public class Namespace implements SocketIONamespace {
}
@Override
public ClientOperations getBroadcastOperations() {
public BroadcastOperations getBroadcastOperations() {
return new BroadcastOperations(clients);
}

1
src/main/java/com/corundumstudio/socketio/parser/Decoder.java

@ -16,7 +16,6 @@
package com.corundumstudio.socketio.parser;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import org.jboss.netty.buffer.ChannelBuffer;

5
src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java

@ -19,14 +19,12 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.util.CharsetUtil;
import com.corundumstudio.socketio.SocketIOPipelineFactory;
@Sharable
public class FlashPolicyHandler extends SimpleChannelUpstreamHandler {
@ -47,7 +45,6 @@ public class FlashPolicyHandler extends SimpleChannelUpstreamHandler {
if (data.equals(requestBuffer)) {
ChannelFuture f = e.getChannel().write(responseBuffer);
f.addListener(ChannelFutureListener.CLOSE);
ctx.getPipeline().remove(SocketIOPipelineFactory.FLASH_POLICY_HANDLER);
return;
}
super.messageReceived(ctx, e);

8
src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java

@ -53,7 +53,7 @@ public class NamespaceClient implements SocketIOClient {
}
@Override
public void sendEvent(String name, Object data, AckCallback ackCallback) {
public void sendEvent(String name, Object data, AckCallback<?> ackCallback) {
Packet packet = new Packet(PacketType.EVENT);
packet.setName(name);
packet.setArgs(Collections.singletonList(data));
@ -61,7 +61,7 @@ public class NamespaceClient implements SocketIOClient {
}
@Override
public void sendMessage(String message, AckCallback ackCallback) {
public void sendMessage(String message, AckCallback<?> ackCallback) {
Packet packet = new Packet(PacketType.MESSAGE);
packet.setData(message);
send(packet, ackCallback);
@ -82,7 +82,7 @@ public class NamespaceClient implements SocketIOClient {
}
@Override
public void send(Packet packet, AckCallback ackCallback) {
public void send(Packet packet, AckCallback<?> ackCallback) {
long index = baseClient.getAckManager().registerAck(getSessionId(), ackCallback);
packet.setId(index);
if (!ackCallback.getResultClass().equals(Void.class)) {
@ -98,7 +98,7 @@ public class NamespaceClient implements SocketIOClient {
}
@Override
public void sendJsonObject(Object object, AckCallback ackCallback) {
public void sendJsonObject(Object object, AckCallback<?> ackCallback) {
Packet packet = new Packet(PacketType.JSON);
packet.setData(object);
send(packet, ackCallback);

11
src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

@ -100,7 +100,8 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
ctx.sendUpstream(e);
}
private void handleMessage(HttpRequest req, QueryStringDecoder queryDecoder, Channel channel) throws IOException {
private void handleMessage(HttpRequest req, QueryStringDecoder queryDecoder, Channel channel)
throws IOException {
channel.getPipeline().remove(SocketIOPipelineFactory.FLASH_POLICY_HANDLER);
String[] parts = queryDecoder.getPath().split("/");
@ -159,7 +160,8 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
});
}
private void onPost(UUID sessionId, Channel channel, String origin, ChannelBuffer content) throws IOException {
private void onPost(UUID sessionId, Channel channel, String origin, ChannelBuffer content)
throws IOException {
XHRPollingClient client = sessionId2Client.get(sessionId);
if (client == null) {
log.debug("Client with sessionId: {} was already disconnected. Channel closed!", sessionId);
@ -177,7 +179,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
return;
}
XHRPollingClient client = (XHRPollingClient)sessionId2Client.get(sessionId);
XHRPollingClient client = (XHRPollingClient) sessionId2Client.get(sessionId);
if (client == null) {
client = createClient(origin, channel, sessionId);
}
@ -210,8 +212,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
@Override
public void onDisconnect(BaseClient client) {
if (client instanceof XHRPollingClient) {
XHRPollingClient xhrClient = (XHRPollingClient) client;
UUID sessionId = xhrClient.getSessionId();
UUID sessionId = client.getSessionId();
sessionId2Client.remove(sessionId);
SchedulerKey noopKey = new SchedulerKey(Type.POLLING, sessionId);

Loading…
Cancel
Save