|
|
@ -24,6 +24,7 @@ import java.util.UUID; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
|
|
|
|
import org.jboss.netty.buffer.ChannelBuffer; |
|
|
|
import org.jboss.netty.channel.Channel; |
|
|
|
import org.jboss.netty.channel.ChannelFuture; |
|
|
|
import org.jboss.netty.channel.ChannelFutureListener; |
|
|
@ -88,30 +89,35 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements |
|
|
|
|
|
|
|
Channel channel = ctx.getChannel(); |
|
|
|
if (queryDecoder.getPath().startsWith(path)) { |
|
|
|
String[] parts = queryDecoder.getPath().split("/"); |
|
|
|
if (parts.length > 3) { |
|
|
|
UUID sessionId = UUID.fromString(parts[4]); |
|
|
|
|
|
|
|
if (HttpMethod.POST.equals(req.getMethod())) { |
|
|
|
onPost(sessionId, channel, req); |
|
|
|
} else if (HttpMethod.GET.equals(req.getMethod())) { |
|
|
|
onGet(sessionId, channel, req); |
|
|
|
} |
|
|
|
if (queryDecoder.getParameters().containsKey("disconnect")) { |
|
|
|
BaseClient client = sessionId2Client.get(sessionId); |
|
|
|
disconnectable.onDisconnect(client); |
|
|
|
} |
|
|
|
} else { |
|
|
|
log.warn("Wrong {} method request path: {}, from ip: {}. Channel closed!", |
|
|
|
new Object[] {req.getMethod(), path, channel.getRemoteAddress()}); |
|
|
|
channel.close(); |
|
|
|
} |
|
|
|
handleMessage(req, queryDecoder, channel); |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
ctx.sendUpstream(e); |
|
|
|
} |
|
|
|
|
|
|
|
private void handleMessage(HttpRequest req, QueryStringDecoder queryDecoder, Channel channel) throws IOException { |
|
|
|
String[] parts = queryDecoder.getPath().split("/"); |
|
|
|
if (parts.length > 3) { |
|
|
|
UUID sessionId = UUID.fromString(parts[4]); |
|
|
|
|
|
|
|
String origin = req.getHeader(HttpHeaders.Names.ORIGIN); |
|
|
|
if (HttpMethod.POST.equals(req.getMethod())) { |
|
|
|
onPost(sessionId, channel, origin, req.getContent()); |
|
|
|
} else if (HttpMethod.GET.equals(req.getMethod())) { |
|
|
|
onGet(sessionId, channel, origin); |
|
|
|
} |
|
|
|
if (queryDecoder.getParameters().containsKey("disconnect")) { |
|
|
|
BaseClient client = sessionId2Client.get(sessionId); |
|
|
|
disconnectable.onDisconnect(client); |
|
|
|
} |
|
|
|
} else { |
|
|
|
log.warn("Wrong {} method request path: {}, from ip: {}. Channel closed!", |
|
|
|
new Object[] {req.getMethod(), path, channel.getRemoteAddress()}); |
|
|
|
channel.close(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void scheduleNoop(Channel channel, final UUID sessionId) { |
|
|
|
SchedulerKey key = new SchedulerKey(Type.POLLING, sessionId); |
|
|
|
scheduler.cancel(key); |
|
|
@ -147,7 +153,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
private void onPost(UUID sessionId, Channel channel, HttpRequest req) throws IOException { |
|
|
|
private void onPost(UUID sessionId, Channel channel, String origin, ChannelBuffer content) throws IOException { |
|
|
|
XHRPollingClient client = sessionId2Client.get(sessionId); |
|
|
|
if (client == null) { |
|
|
|
log.debug("Client with sessionId: {} was already disconnected. Channel closed!", sessionId); |
|
|
@ -155,18 +161,16 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
String origin = req.getHeader(HttpHeaders.Names.ORIGIN); |
|
|
|
channel.write(new XHRPostMessage(origin)); |
|
|
|
Channels.fireMessageReceived(channel, new PacketsMessage(client, req.getContent())); |
|
|
|
Channels.fireMessageReceived(channel, new PacketsMessage(client, content)); |
|
|
|
} |
|
|
|
|
|
|
|
private void onGet(UUID sessionId, Channel channel, HttpRequest req) { |
|
|
|
private void onGet(UUID sessionId, Channel channel, String origin) { |
|
|
|
if (!authorizeHandler.isSessionAuthorized(sessionId)) { |
|
|
|
sendError(channel, req, sessionId); |
|
|
|
sendError(channel, origin, sessionId); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
String origin = req.getHeader(HttpHeaders.Names.ORIGIN); |
|
|
|
XHRPollingClient client = (XHRPollingClient)sessionId2Client.get(sessionId); |
|
|
|
if (client == null) { |
|
|
|
client = createClient(origin, channel, sessionId); |
|
|
@ -189,12 +193,12 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements |
|
|
|
return client; |
|
|
|
} |
|
|
|
|
|
|
|
private void sendError(Channel channel, HttpRequest req, UUID sessionId) { |
|
|
|
private void sendError(Channel channel, String origin, UUID sessionId) { |
|
|
|
log.debug("Client with sessionId: {} was not found! Reconnect error response sended", sessionId); |
|
|
|
Packet packet = new Packet(PacketType.ERROR); |
|
|
|
packet.setReason(ErrorReason.CLIENT_NOT_HANDSHAKEN); |
|
|
|
packet.setAdvice(ErrorAdvice.RECONNECT); |
|
|
|
channel.write(new XHRErrorMessage(packet, req.getHeader(HttpHeaders.Names.ORIGIN))); |
|
|
|
channel.write(new XHRErrorMessage(packet, origin)); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|