1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd.continuation;
16
17 import java.io.IOException;
18 import java.nio.ByteBuffer;
19
20 import javax.servlet.ServletException;
21 import javax.servlet.http.HttpServletRequest;
22 import javax.servlet.http.HttpServletResponse;
23
24 import org.cometd.Bayeux;
25 import org.cometd.Client;
26 import org.cometd.Extension;
27 import org.cometd.Message;
28 import org.mortbay.cometd.AbstractBayeux;
29 import org.mortbay.cometd.AbstractCometdServlet;
30 import org.mortbay.cometd.ClientImpl;
31 import org.mortbay.cometd.JSONTransport;
32 import org.mortbay.cometd.MessageImpl;
33 import org.mortbay.cometd.Transport;
34 import org.mortbay.util.ArrayQueue;
35 import org.mortbay.util.StringUtil;
36 import org.mortbay.util.ajax.Continuation;
37 import org.mortbay.util.ajax.ContinuationSupport;
38
39
40 public class ContinuationCometdServlet extends AbstractCometdServlet
41 {
42
43 @Override
44 protected AbstractBayeux newBayeux()
45 {
46 return new ContinuationBayeux();
47 }
48
49
50 @Override
51 protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
52 {
53
54 Object clientObj=request.getAttribute(CLIENT_ATTR);
55 Transport transport=null;
56 int received=-1;
57 boolean metaConnectDeliveryOnly=false;
58 boolean pendingResponse=false;
59 boolean metaConnect=false;
60
61
62 ContinuationClient client=(clientObj instanceof ClientImpl)?(ContinuationClient)clientObj:null;
63 if (client!=null)
64 {
65
66 transport=(Transport)request.getAttribute(TRANSPORT_ATTR);
67 transport.setResponse(response);
68 metaConnectDeliveryOnly=client.isMetaConnectDeliveryOnly() || transport.isMetaConnectDeliveryOnly();
69 metaConnect=true;
70 }
71 else
72 {
73 Message[] messages = getMessages(request);
74 received=messages.length;
75
76
77 String jsonpParam=request.getParameter("jsonp");
78
79
80 try
81 {
82 for (Message message : messages)
83 {
84 if (jsonpParam!=null)
85 message.put("jsonp",jsonpParam);
86
87 if (client==null)
88 {
89 client=(ContinuationClient)_bayeux.getClient((String)message.get(AbstractBayeux.CLIENT_FIELD));
90
91
92 if (client==null)
93 {
94
95 String browser_id=findBrowserId(request);
96 if (browser_id==null)
97 browser_id=setBrowserId(request,response);
98
99 if (transport==null)
100 {
101 transport=_bayeux.newTransport(client,message);
102 transport.setResponse(response);
103 metaConnectDeliveryOnly=transport.isMetaConnectDeliveryOnly();
104 }
105 _bayeux.handle(null,transport,message);
106 message=null;
107 continue;
108 }
109 }
110
111 String browser_id=findBrowserId(request);
112 if (browser_id!=null && (client.getBrowserId()==null || !client.getBrowserId().equals(browser_id)))
113 client.setBrowserId(browser_id);
114
115
116 if (transport==null)
117 {
118 transport=_bayeux.newTransport(client,message);
119 transport.setResponse(response);
120 metaConnectDeliveryOnly=client.isMetaConnectDeliveryOnly() || transport.isMetaConnectDeliveryOnly();
121 }
122
123
124 if (!metaConnectDeliveryOnly && !pendingResponse)
125 {
126 pendingResponse=true;
127 client.responsePending();
128 }
129
130 if (Bayeux.META_CONNECT.equals(message.getChannel()))
131 metaConnect=true;
132
133 _bayeux.handle(client,transport,message);
134 }
135 }
136 finally
137 {
138 for (Message message : messages)
139 ((MessageImpl)message).decRef();
140 if (pendingResponse)
141 {
142 client.responded();
143 }
144 }
145 }
146
147 Message metaConnectReply=null;
148
149
150 if (transport!=null)
151 {
152 metaConnectReply=transport.getMetaConnectReply();
153 if (metaConnectReply!=null)
154 {
155 long timeout=client.getTimeout();
156 if (timeout==0)
157 timeout=_bayeux.getTimeout();
158
159 Continuation continuation=ContinuationSupport.getContinuation(request,client);
160
161
162 synchronized (client)
163 {
164 if (!client.hasNonLazyMessages() && !continuation.isPending()&& received<=1)
165 {
166
167 ((ContinuationClient)client).setContinuation(continuation);
168 request.setAttribute(CLIENT_ATTR,client);
169 request.setAttribute(TRANSPORT_ATTR,transport);
170 continuation.suspend(timeout);
171 }
172
173 if (!continuation.isPending())
174 client.access();
175
176 continuation.reset();
177 }
178
179 ((ContinuationClient)client).setContinuation(null);
180 transport.setMetaConnnectReply(null);
181
182 }
183 else if (client!=null)
184 {
185 client.access();
186 }
187 }
188
189
190 if (client!=null && (!metaConnectDeliveryOnly || metaConnect))
191 {
192 synchronized(client)
193 {
194 client.doDeliverListeners();
195
196 final ArrayQueue<Message> messages= (ArrayQueue)client.getQueue();
197 final int size=messages.size();
198
199 try
200 {
201 for (int i=0;i<size;i++)
202 {
203 final Message message=messages.getUnsafe(i);
204 final MessageImpl mesgImpl=(message instanceof MessageImpl)?(MessageImpl)message:null;
205
206
207 if (i==0 && size==1 && mesgImpl!=null && _refsThreshold>0 && metaConnectReply!=null && transport instanceof JSONTransport)
208 {
209
210 ByteBuffer buffer = mesgImpl.getBuffer();
211 if (buffer!=null)
212 {
213
214 request.setAttribute("org.mortbay.jetty.ResponseBuffer",buffer);
215 if (metaConnectReply instanceof MessageImpl)
216 ((MessageImpl)metaConnectReply).decRef();
217 metaConnectReply=null;
218 transport=null;
219 mesgImpl.decRef();
220 continue;
221 }
222 else if (mesgImpl.getRefs()>=_refsThreshold)
223 {
224
225 byte[] contentBytes = ("["+mesgImpl.getJSON()+",{\""+Bayeux.SUCCESSFUL_FIELD+"\":true,\""+
226 Bayeux.CHANNEL_FIELD+"\":\""+Bayeux.META_CONNECT+"\"}]")
227 .getBytes(StringUtil.__UTF8);
228 int contentLength = contentBytes.length;
229
230 String headerString = "HTTP/1.1 200 OK\r\n"+
231 "Content-Type: text/json; charset=utf-8\r\n" +
232 "Content-Length: " + contentLength + "\r\n" +
233 "\r\n";
234
235 byte[] headerBytes = headerString.getBytes(StringUtil.__UTF8);
236
237 buffer = ByteBuffer.allocateDirect(headerBytes.length+contentLength);
238 buffer.put(headerBytes);
239 buffer.put(contentBytes);
240 buffer.flip();
241
242 mesgImpl.setBuffer(buffer);
243 request.setAttribute("org.mortbay.jetty.ResponseBuffer",buffer);
244 metaConnectReply=null;
245 if (metaConnectReply instanceof MessageImpl)
246 ((MessageImpl)metaConnectReply).decRef();
247 transport=null;
248 mesgImpl.decRef();
249 continue;
250 }
251 }
252
253 if (message!=null)
254 transport.send(message);
255 if (mesgImpl!=null)
256 mesgImpl.decRef();
257 }
258 }
259 finally
260 {
261 messages.clear();
262 }
263 }
264
265 if (metaConnectReply!=null)
266 {
267 metaConnectReply=_bayeux.extendSendMeta(client,metaConnectReply);
268 transport.send(metaConnectReply);
269 if (metaConnectReply instanceof MessageImpl)
270 ((MessageImpl)metaConnectReply).decRef();
271 }
272 }
273
274 if (transport!=null)
275 transport.complete();
276 }
277 }