Browse Source

WebSocket transport support added, heartbeat refactoring

master
Nikita 13 years ago
parent
commit
38dafd30cd
  1. 12
      README.md
  2. 18
      src/main/java/com/corundumstudio/socketio/Configuration.java
  3. 29
      src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java
  4. 11
      src/main/java/com/corundumstudio/socketio/PacketListener.java
  5. 63
      src/main/java/com/corundumstudio/socketio/SocketIORouter.java
  6. 1
      src/main/java/com/corundumstudio/socketio/parser/Decoder.java
  7. 4
      src/main/java/com/corundumstudio/socketio/transport/SocketIOTransport.java
  8. 111
      src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java
  9. 163
      src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java
  10. 25
      src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java
  11. 19
      src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

12
README.md

@ -7,8 +7,9 @@ Licensed under the Apache License 2.0.
### Features
* Supports 0.8+ version of [Socket.IO-client](https://github.com/LearnBoost/socket.io-client) up to latest - 0.9.5
* Supports 0.7+ version of [Socket.IO-client](https://github.com/LearnBoost/socket.io-client) up to latest - 0.9.6
* Supports xhr-polling transport
* Supports websocket transport (Hixie-75/76/Hybi-00, Hybi-10..Hybi-13)
#Usage example
@ -40,10 +41,10 @@ Licensed under the Apache License 2.0.
}
};
Configuration config = new Configuration();
config.setHostname("localhost");
config.setPort(81);
config.setListener(handler);
Configuration config = new Configuration();
config.setHostname("localhost");
config.setPort(81);
config.setListener(handler);
SocketIOServer server = new SocketIOServer(config);
server.start();
@ -57,7 +58,6 @@ Licensed under the Apache License 2.0.
<script type="text/javascript">
var socket = io.connect('http://localhost:81', {
'transports' : [ 'xhr-polling' ],
'reconnection delay' : 2000,
'force new connection' : true
});

18
src/main/java/com/corundumstudio/socketio/Configuration.java

@ -1,3 +1,18 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio;
import java.util.concurrent.Executor;
@ -109,6 +124,9 @@ public class Configuration {
public int getHeartbeatTimeout() {
return heartbeatTimeout;
}
public boolean isHeartbeatsEnabled() {
return heartbeatTimeout != 0;
}
/**
* Heartbeat thread pool size

29
src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java

@ -21,7 +21,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
@ -44,10 +43,6 @@ public class HeartbeatHandler {
}
public void onHeartbeat(final SocketIOClient client) {
if (configuration.getHeartbeatTimeout() == 0) {
return;
}
cancelClientHeartbeatCheck(client);
executorService.schedule(new Runnable() {
@ -57,16 +52,20 @@ public class HeartbeatHandler {
}, configuration.getHeartbeatInterval(), TimeUnit.SECONDS);
}
public void cancelClientHeartbeatCheck(SocketIOClient client) {
public void sendHeartbeat(SocketIOClient client) {
client.send(new Packet(PacketType.HEARTBEAT));
scheduleClientHeartbeatCheck(client);
}
private void cancelClientHeartbeatCheck(SocketIOClient client) {
Future<?> future = scheduledHeartbeatFutures.remove(client.getSessionId());
if (future != null) {
future.cancel(false);
}
}
public void sendHeartbeat(final SocketIOClient client) {
client.send(new Packet(PacketType.HEARTBEAT));
scheduleClientHeartbeatCheck(client.getSessionId(), new Runnable() {
private void scheduleClientHeartbeatCheck(final SocketIOClient client) {
Future<?> future = executorService.schedule(new Runnable() {
public void run() {
try {
client.disconnect();
@ -76,16 +75,8 @@ public class HeartbeatHandler {
log.debug("Client with sessionId: {} disconnected due to heartbeat timeout", sessionId);
}
}
});
}
public void scheduleClientHeartbeatCheck(UUID sessionId, Runnable runnable) {
if (configuration.getHeartbeatTimeout() == 0) {
return;
}
Future<?> future = executorService.schedule(runnable, configuration.getHeartbeatTimeout(), TimeUnit.SECONDS);
scheduledHeartbeatFutures.put(sessionId, future);
}, configuration.getHeartbeatTimeout(), TimeUnit.SECONDS);
scheduledHeartbeatFutures.put(client.getSessionId(), future);
}
public void shutdown() {

11
src/main/java/com/corundumstudio/socketio/PacketListener.java

@ -15,16 +15,10 @@
*/
package com.corundumstudio.socketio;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.parser.Packet;
import com.corundumstudio.socketio.transport.XHRPollingClient;
public class PacketListener {
private final Logger log = LoggerFactory.getLogger(getClass());
private final SocketIORouter socketIORouter;
private final HeartbeatHandler heartbeatHandler;
private final SocketIOListener socketIOHandler;
@ -36,7 +30,7 @@ public class PacketListener {
this.heartbeatHandler = heartbeatHandler;
}
public void onPacket(Packet packet, XHRPollingClient client) {
public void onPacket(Packet packet, SocketIOClient client) {
switch (packet.getType()) {
case HEARTBEAT:
heartbeatHandler.onHeartbeat(client);
@ -51,8 +45,7 @@ public class PacketListener {
break;
case DISCONNECT:
log.debug("Client with sessionId: {} disconnected by client request", client.getSessionId());
socketIORouter.disconnect(client.getSessionId());
socketIORouter.onDisconnect(client);
break;
}
}

63
src/main/java/com/corundumstudio/socketio/SocketIORouter.java

@ -17,8 +17,10 @@ package com.corundumstudio.socketio;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@ -37,6 +39,9 @@ import com.corundumstudio.socketio.parser.Decoder;
import com.corundumstudio.socketio.parser.Encoder;
import com.corundumstudio.socketio.parser.Packet;
import com.corundumstudio.socketio.parser.PacketType;
import com.corundumstudio.socketio.transport.SocketIOTransport;
import com.corundumstudio.socketio.transport.WebSocketClient;
import com.corundumstudio.socketio.transport.WebSocketTransport;
import com.corundumstudio.socketio.transport.XHRPollingClient;
import com.corundumstudio.socketio.transport.XHRPollingTransport;
@ -44,18 +49,25 @@ public class SocketIORouter {
private final Logger log = LoggerFactory.getLogger(getClass());
// 'UUID' to 'timestamp' mapping
// this map will be always smaller than 'connectedSessionIds'
private final Map<UUID, Long> authorizedSessionIds = new ConcurrentHashMap<UUID, Long>();
private final Set<UUID> connectedSessionIds = Collections.newSetFromMap(new ConcurrentHashMap<UUID, Boolean>());
private final int protocol = 1;
private final String connectPath = "/socket.io/" + protocol + "/";
private final ObjectMapper objectMapper;
private final Decoder decoder;
private final Encoder encoder;
private final Set<UUID> authorizedSessionIds = Collections.newSetFromMap(new ConcurrentHashMap<UUID, Boolean>());
private final Configuration configuration;
private final SocketIOListener socketIOHandler;
private HeartbeatHandler heartbeatHandler;
private XHRPollingTransport xhrPollingTransport;
private SocketIOTransport xhrPollingTransport;
private SocketIOTransport webSocketTransport;
public SocketIORouter(Configuration configuration) {
this.configuration = configuration;
@ -69,6 +81,7 @@ public class SocketIORouter {
heartbeatHandler = new HeartbeatHandler(configuration);
PacketListener packetListener = new PacketListener(socketIOHandler, this, heartbeatHandler);
xhrPollingTransport = new XHRPollingTransport(connectPath, decoder, encoder, this, packetListener);
webSocketTransport = new WebSocketTransport(connectPath, decoder, encoder, this, packetListener);
}
public void stop() {
@ -87,29 +100,35 @@ public class SocketIORouter {
}
}
xhrPollingTransport.messageReceived(ctx, e);
webSocketTransport.messageReceived(ctx, e);
}
public boolean isSessionAuthorized(UUID sessionId) {
return authorizedSessionIds.contains(sessionId);
return connectedSessionIds.contains(sessionId)
|| authorizedSessionIds.containsKey(sessionId);
}
public void connect(SocketIOClient client) {
// cancel heartbeat check scheduled after 'authorize' method
heartbeatHandler.cancelClientHeartbeatCheck(client);
authorizedSessionIds.remove(client.getSessionId());
connectedSessionIds.add(client.getSessionId());
client.send(new Packet(PacketType.CONNECT));
heartbeatHandler.sendHeartbeat(client);
if (configuration.isHeartbeatsEnabled()
&& !(client instanceof WebSocketClient)) {
heartbeatHandler.sendHeartbeat(client);
}
socketIOHandler.onConnect(client);
}
private void authorize(Channel channel, HttpRequest msg, Map<String, List<String>> params)
throws IOException {
removeStaleAuthorizedIds();
// TODO use common client
final UUID sessionId = UUID.randomUUID();
XHRPollingClient client = new XHRPollingClient(encoder, this, null);
authorizedSessionIds.add(sessionId);
authorizedSessionIds.put(sessionId, System.currentTimeMillis());
String transports = "xhr-polling";
String transports = "xhr-polling,websocket";
String heartbeatTimeoutVal = String.valueOf(configuration.getHeartbeatTimeout());
if (configuration.getHeartbeatTimeout() == 0) {
heartbeatTimeoutVal = "";
@ -127,12 +146,27 @@ public class SocketIORouter {
}
client.doReconnect(channel, msg);
log.debug("New sessionId: {} authorized", sessionId);
heartbeatHandler.scheduleClientHeartbeatCheck(sessionId, new Runnable() {
public void run() {
authorizedSessionIds.remove(sessionId);
log.debug("Authorized sessionId: {} cleared due to connect timeout", sessionId);
}
/**
* Remove stale authorized client ids which
* has not connected during some timeout
*/
private void removeStaleAuthorizedIds() {
for (Iterator<Entry<UUID, Long>> iterator = authorizedSessionIds.entrySet().iterator(); iterator.hasNext();) {
Entry<UUID, Long> entry = iterator.next();
if (System.currentTimeMillis() - entry.getValue() > 60*1000) {
iterator.remove();
log.debug("Authorized sessionId: {} cleared due to connection timeout", entry.getKey());
}
});
}
}
public void onDisconnect(SocketIOClient client) {
log.debug("Client with sessionId: {} disconnected by client request", client.getSessionId());
xhrPollingTransport.onDisconnect(client);
webSocketTransport.onDisconnect(client);
disconnect(client);
}
public void disconnect(SocketIOClient client) {
@ -140,8 +174,9 @@ public class SocketIORouter {
}
public void disconnect(UUID sessionId) {
authorizedSessionIds.remove(sessionId);
connectedSessionIds.remove(sessionId);
xhrPollingTransport.disconnect(sessionId);
webSocketTransport.disconnect(sessionId);
}
}

1
src/main/java/com/corundumstudio/socketio/parser/Decoder.java

@ -132,6 +132,7 @@ public class Decoder {
packet.setAckId(ackMatcher.group(1));
String ackArgsJSON = extract(ackMatcher, 3);
if (ackArgsJSON != null && ackArgsJSON.trim().length() > 0) {
@SuppressWarnings("unchecked")
List<Object> args = objectMapper.readValue(ackArgsJSON, List.class);
packet.setArgs(args);
}

4
src/main/java/com/corundumstudio/socketio/transport/SocketIOTransport.java

@ -20,10 +20,14 @@ import java.util.UUID;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import com.corundumstudio.socketio.SocketIOClient;
public interface SocketIOTransport {
void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception;
void onDisconnect(SocketIOClient client);
void disconnect(UUID sessionId);
}

111
src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java

@ -0,0 +1,111 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.transport;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.NullChannelFuture;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIORouter;
import com.corundumstudio.socketio.parser.Encoder;
import com.corundumstudio.socketio.parser.Packet;
import com.corundumstudio.socketio.parser.PacketType;
public class WebSocketClient implements SocketIOClient {
private final Logger log = LoggerFactory.getLogger(getClass());
private final List<String> messages = new LinkedList<String>();
private final UUID sessionId;
private Channel channel;
private final SocketIORouter socketIORouter;
private final Encoder encoder;
public WebSocketClient(Channel channel, Encoder encoder, SocketIORouter socketIORouter, UUID sessionId) {
this.channel = channel;
this.encoder = encoder;
this.socketIORouter = socketIORouter;
this.sessionId = sessionId;
}
public Channel getChannel() {
return channel;
}
public UUID getSessionId() {
return sessionId;
}
private ChannelFuture sendPayload() {
CharSequence data = encoder.encodePayload(messages);
messages.clear();
return write(data);
}
private ChannelFuture write(CharSequence message) {
WebSocketFrame res = new TextWebSocketFrame(message.toString());
if (channel.isConnected()) {
log.trace("Out message: {} sessionId: {}", new Object[] {message, sessionId});
ChannelFuture f = channel.write(res);
return f;
}
return NullChannelFuture.INSTANCE;
}
public ChannelFuture sendJsonObject(Object object) {
Packet packet = new Packet(PacketType.JSON);
packet.setData(object);
return send(packet);
}
public ChannelFuture send(Packet packet) {
try {
String message = encoder.encodePacket(packet);
return sendUnencoded(message);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public synchronized ChannelFuture sendUnencoded(String message) {
messages.add(message);
return sendPayload();
}
public void disconnect() {
socketIORouter.disconnect(sessionId);
}
public SocketAddress getRemoteAddress() {
return channel.getRemoteAddress();
}
}

163
src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java

@ -0,0 +1,163 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio.transport;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.SEC_WEBSOCKET_KEY;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.WEBSOCKET;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
import org.jboss.netty.handler.codec.http.HttpHeaders.Values;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker00;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker13;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.PacketListener;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIORouter;
import com.corundumstudio.socketio.parser.Decoder;
import com.corundumstudio.socketio.parser.Encoder;
import com.corundumstudio.socketio.parser.Packet;
import com.corundumstudio.socketio.parser.PacketType;
public class WebSocketTransport implements SocketIOTransport {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<UUID, WebSocketClient> sessionId2Client = new ConcurrentHashMap<UUID, WebSocketClient>();
private final Map<Integer, WebSocketClient> channelId2Client = new ConcurrentHashMap<Integer, WebSocketClient>();
private final SocketIORouter socketIORouter;
private final PacketListener packetListener;
private final Decoder decoder;
private final Encoder encoder;
private final String path;
public WebSocketTransport(String connectPath, Decoder decoder, Encoder encoder,
SocketIORouter socketIORouter, PacketListener packetListener) {
this.path = connectPath + "websocket";
this.decoder = decoder;
this.encoder = encoder;
this.socketIORouter = socketIORouter;
this.packetListener = packetListener;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object msg = e.getMessage();
if (msg instanceof CloseWebSocketFrame) {
ctx.getChannel().close();
} else if (msg instanceof TextWebSocketFrame) {
TextWebSocketFrame frame = (TextWebSocketFrame) msg;
WebSocketClient client = channelId2Client.get(ctx.getChannel().getId());
String content = frame.getText();
log.trace("In message: {} sessionId: {}", new Object[] {content, client.getSessionId()});
List<Packet> packets = decoder.decodePayload(content);
for (Packet packet : packets) {
packetListener.onPacket(packet, client);
}
} else if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
if (req.containsHeader(CONNECTION) && req.getHeader(CONNECTION).contains(Values.UPGRADE)
&& req.containsHeader(WEBSOCKET) && WEBSOCKET.equalsIgnoreCase(req.getHeader(Names.UPGRADE))) {
WebSocketServerHandshaker handshaker = null;
if (req.containsHeader(SEC_WEBSOCKET_KEY)) {
handshaker = new WebSocketServerHandshaker13(getWebSocketLocation(req), null, false);
} else {
handshaker = new WebSocketServerHandshaker00(getWebSocketLocation(req), null);
}
handshaker.handshake(ctx.getChannel(), req);
QueryStringDecoder queryDecoder = new QueryStringDecoder(req.getUri());
connectClient(ctx.getChannel(), queryDecoder);
}
}
}
private void connectClient(Channel channel, QueryStringDecoder queryDecoder) {
String path = queryDecoder.getPath();
if (!path.startsWith(this.path)) {
return;
}
String[] parts = path.split("/");
if (parts.length > 3) {
UUID sessionId = UUID.fromString(parts[4]);
if (!socketIORouter.isSessionAuthorized(sessionId)) {
log.warn("Unauthorized client with sessionId: {}, from ip: {}. Channel closed!",
new Object[] {sessionId, channel.getRemoteAddress()});
channel.close();
return;
}
WebSocketClient client = new WebSocketClient(channel, encoder, socketIORouter, sessionId);
channelId2Client.put(channel.getId(), client);
sessionId2Client.put(sessionId, client);
socketIORouter.connect(client);
} else {
log.warn("Wrong GET request path: {}, from ip: {}. Channel closed!",
new Object[] {path, channel.getRemoteAddress()});
channel.close();
}
}
@Override
public void disconnect(UUID sessionId) {
WebSocketClient client = sessionId2Client.remove(sessionId);
if (client != null) {
ChannelFuture future = client.send(new Packet(PacketType.DISCONNECT));
future.addListener(ChannelFutureListener.CLOSE);
socketIORouter.disconnect(client);
channelId2Client.remove(client.getChannel().getId());
}
}
private String getWebSocketLocation(HttpRequest req) {
return "ws://" + req.getHeader(HttpHeaders.Names.HOST) + path;
}
@Override
public void onDisconnect(SocketIOClient client) {
if (client instanceof WebSocketClient) {
WebSocketClient webClient = (WebSocketClient) client;
sessionId2Client.remove(webClient.getSessionId());
channelId2Client.remove(webClient.getChannel().getId());
}
}
}

25
src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java

@ -15,8 +15,9 @@
*/
package com.corundumstudio.socketio.transport;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.*;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.KEEP_ALIVE;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import java.io.IOException;
@ -25,19 +26,15 @@ import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -93,18 +90,18 @@ public class XHRPollingClient implements SocketIOClient {
}
private ChannelFuture write(CharSequence message) {
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
addHeaders(res);
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
addHeaders(res);
res.setContent(ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
HttpHeaders.setContentLength(res, res.getContent().readableBytes());
res.setContent(ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
HttpHeaders.setContentLength(res, res.getContent().readableBytes());
connected = false;
jsonp = false;
origin = null;
if (channel.isConnected()) {
log.trace("Sending message: {} to client with sessionId: {}", new Object[] {message, sessionId});
log.trace("Out message: {} sessionId: {}", new Object[] {message, sessionId});
ChannelFuture f = channel.write(res);
if (!isKeepAlive) {
f.addListener(ChannelFutureListener.CLOSE);
@ -114,8 +111,8 @@ public class XHRPollingClient implements SocketIOClient {
return NullChannelFuture.INSTANCE;
}
private void addHeaders(HttpResponse res) {
res.addHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
private void addHeaders(HttpResponse res) {
res.addHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
res.addHeader(CONNECTION, KEEP_ALIVE);
if (origin != null) {
res.addHeader("Access-Control-Allow-Origin", origin);
@ -124,7 +121,7 @@ public class XHRPollingClient implements SocketIOClient {
if (jsonp) {
res.addHeader(CONTENT_TYPE, "application/javascript");
}
}
}
public ChannelFuture sendJsonObject(Object object) {
Packet packet = new Packet(PacketType.JSON);

19
src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

@ -40,6 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.PacketListener;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIORouter;
import com.corundumstudio.socketio.parser.Decoder;
import com.corundumstudio.socketio.parser.Encoder;
@ -58,11 +59,11 @@ public class XHRPollingTransport implements SocketIOTransport {
private final PacketListener packetListener;
private final Decoder decoder;
private final Encoder encoder;
private final String pollingPath;
private final String path;
public XHRPollingTransport(String connectPath, Decoder decoder, Encoder encoder,
SocketIORouter socketIORouter, PacketListener packetListener) {
this.pollingPath = connectPath + "xhr-polling/";
this.path = connectPath + "xhr-polling/";
this.decoder = decoder;
this.encoder = encoder;
this.socketIORouter = socketIORouter;
@ -86,7 +87,7 @@ public class XHRPollingTransport implements SocketIOTransport {
private void onPost(QueryStringDecoder queryDecoder, Channel channel, HttpRequest req) throws IOException {
String path = queryDecoder.getPath();
if (!path.startsWith(pollingPath)) {
if (!path.startsWith(path)) {
log.warn("Wrong POST request path: {}, from ip: {}. Channel closed!",
new Object[] {path, channel.getRemoteAddress()});
channel.close();
@ -104,7 +105,7 @@ public class XHRPollingTransport implements SocketIOTransport {
}
String content = req.getContent().toString(CharsetUtil.UTF_8);
log.trace("Request content: {}", content);
log.trace("In message: {} sessionId: {}", new Object[] {content, sessionId});
List<Packet> packets = decoder.decodePayload(content);
for (Packet packet : packets) {
packetListener.onPacket(packet, client);
@ -121,10 +122,7 @@ public class XHRPollingTransport implements SocketIOTransport {
private void onGet(QueryStringDecoder queryDecoder, Channel channel, HttpRequest req) throws IOException {
String path = queryDecoder.getPath();
if (!path.startsWith(pollingPath)) {
log.warn("Wrong GET request path: {}, from ip: {}. Channel closed!",
new Object[] {path, channel.getRemoteAddress()});
channel.close();
if (!path.startsWith(this.path)) {
return;
}
@ -201,4 +199,9 @@ public class XHRPollingTransport implements SocketIOTransport {
}
}
@Override
public void onDisconnect(SocketIOClient client) {
sessionId2Client.remove(client.getSessionId());
}
}
Loading…
Cancel
Save