1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd;
16
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Queue;
20
21 import org.cometd.Bayeux;
22 import org.cometd.Client;
23 import org.cometd.ClientListener;
24 import org.cometd.DeliverListener;
25 import org.cometd.Extension;
26 import org.cometd.Message;
27 import org.cometd.MessageListener;
28 import org.cometd.QueueListener;
29 import org.cometd.RemoveListener;
30 import org.mortbay.util.ArrayQueue;
31 import org.mortbay.util.LazyList;
32 import org.mortbay.util.ajax.JSON;
33
34
35
36
37
38
39
40
41 public class ClientImpl implements Client
42 {
43 private String _id;
44 private String _type;
45 private int _responsesPending;
46 private ChannelImpl[] _subscriptions=new ChannelImpl[0];
47 private RemoveListener[] _rListeners;
48 private MessageListener[] _syncMListeners;
49 private MessageListener[] _asyncMListeners;
50 private QueueListener[] _qListeners;
51 private DeliverListener[] _dListeners;
52 protected AbstractBayeux _bayeux;
53 private String _browserId;
54 private JSON.Literal _advice;
55 private int _batch;
56 private int _maxQueue;
57 private ArrayQueue<Message> _queue=new ArrayQueue<Message>(8,16,this);
58 private long _timeout;
59 private int _lag;
60 private Extension[] _extensions;
61
62 private boolean _deliverViaMetaConnectOnly;
63
64
65 int _adviseVersion;
66
67
68 protected ClientImpl(AbstractBayeux bayeux)
69 {
70 _bayeux=bayeux;
71 _maxQueue=bayeux.getMaxClientQueue();
72 _bayeux.addClient(this,null);
73 if (_bayeux.isLogInfo())
74 _bayeux.logInfo("newClient: "+this);
75 }
76
77
78 protected ClientImpl(AbstractBayeux bayeux, String idPrefix)
79 {
80 _bayeux=bayeux;
81 _maxQueue=0;
82
83 _bayeux.addClient(this,idPrefix);
84
85 if (_bayeux.isLogInfo())
86 _bayeux.logInfo("newClient: "+this);
87 }
88
89
90 public void addExtension(Extension ext)
91 {
92 _extensions = (Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
93 }
94
95
96 Extension[] getExtensions()
97 {
98 return _extensions;
99 }
100
101
102 public void deliver(Client from, String toChannel, Object data, String id)
103 {
104 MessageImpl message=_bayeux.newMessage();
105 message.put(Bayeux.CHANNEL_FIELD,toChannel);
106 message.put(Bayeux.DATA_FIELD,data);
107 if (id!=null)
108 message.put(Bayeux.ID_FIELD,id);
109
110 Message m=_bayeux.extendSendBayeux(from,message);
111 if (m!=null)
112 doDelivery(from,m);
113 if (m instanceof MessageImpl)
114 ((MessageImpl)m).decRef();
115 }
116
117
118 public void deliverLazy(Client from, String toChannel, Object data, String id)
119 {
120 MessageImpl message=_bayeux.newMessage();
121 message.put(Bayeux.CHANNEL_FIELD,toChannel);
122 message.put(Bayeux.DATA_FIELD,data);
123 if (id!=null)
124 message.put(Bayeux.ID_FIELD,id);
125 message.setLazy(true);
126 Message m=_bayeux.extendSendBayeux(from,message);
127 if (m!=null)
128 doDelivery(from,m);
129 if (m instanceof MessageImpl)
130 ((MessageImpl)m).decRef();
131 }
132
133
134 protected void doDelivery(Client from, final Message msg)
135 {
136 final Message message=_bayeux.extendSendClient(from,this,msg);
137 if (message==null)
138 return;
139
140 MessageListener[] alisteners=null;
141 synchronized(this)
142 {
143 if (_maxQueue<0)
144 {
145 ((MessageImpl)message).incRef();
146 _queue.addUnsafe(message);
147 }
148 else
149 {
150 boolean add;
151 if (_queue.size()>=_maxQueue)
152 {
153 if (_qListeners!=null && _qListeners.length>0)
154 {
155 add=true;
156 for (QueueListener l : _qListeners)
157 add &= l.queueMaxed(from,this,message);
158 }
159 else
160 add=false;
161 }
162 else
163 add=true;
164
165 if (add)
166 {
167 ((MessageImpl)message).incRef();
168 _queue.addUnsafe(message);
169 }
170 }
171
172
173 if (_syncMListeners!=null)
174 for (MessageListener l:_syncMListeners)
175 l.deliver(from,this,message);
176 alisteners=_asyncMListeners;
177
178 if (_batch==0 && _responsesPending<1 && _queue.size()>0 && !((MessageImpl)message).isLazy())
179 resume();
180 }
181
182
183 if (alisteners!=null)
184 for (MessageListener l:alisteners)
185 l.deliver(from,this,message);
186 }
187
188
189 public void doDeliverListeners()
190 {
191 synchronized (this)
192 {
193 if (_dListeners!=null)
194 for (DeliverListener l:_dListeners)
195 l.deliver(this,_queue);
196 }
197 }
198
199
200 public void setMetaConnectDeliveryOnly(boolean deliverViaMetaConnectOnly)
201 {
202 _deliverViaMetaConnectOnly = deliverViaMetaConnectOnly;
203 }
204
205
206 public boolean isMetaConnectDeliveryOnly()
207 {
208 return _deliverViaMetaConnectOnly;
209 }
210
211
212 public void startBatch()
213 {
214 synchronized(this)
215 {
216 _batch++;
217 }
218 }
219
220
221 public void endBatch()
222 {
223 synchronized(this)
224 {
225 if (--_batch==0 && _responsesPending<1)
226 {
227 switch(_queue.size())
228 {
229 case 0:
230 break;
231 case 1:
232 if (!((MessageImpl)_queue.get(0)).isLazy())
233 resume();
234 break;
235 default:
236
237 resume();
238 }
239 }
240 }
241 }
242
243
244 public String getConnectionType()
245 {
246 return _type;
247 }
248
249
250
251
252
253 public String getId()
254 {
255 return _id;
256 }
257
258
259 public boolean hasMessages()
260 {
261 return _queue.size()>0;
262 }
263
264
265 public boolean hasNonLazyMessages()
266 {
267 synchronized (this)
268 {
269 for (int i=_queue.size();i-->0;)
270 {
271 if (!((MessageImpl)_queue.getUnsafe(i)).isLazy())
272 return true;
273 }
274 }
275 return false;
276 }
277
278
279 public boolean isLocal()
280 {
281 return true;
282 }
283
284
285
286
287
288 public void disconnect()
289 {
290 synchronized(this)
291 {
292 if (_bayeux.hasClient(_id))
293 remove(false);
294 }
295 }
296
297
298
299
300
301 public void remove(boolean timeout)
302 {
303 Client client=_bayeux.removeClient(_id);
304
305 if (client!=null && _bayeux.isLogInfo())
306 _bayeux.logInfo("Remove client "+client+" timeout="+timeout);
307
308 final String browser_id;
309 final RemoveListener[] listeners;
310 synchronized(this)
311 {
312 browser_id=_browserId;
313 _browserId=null;
314 listeners=_rListeners;
315 }
316
317 if (browser_id!=null)
318 _bayeux.clientOffBrowser(browser_id,_id);
319 if (listeners!=null)
320 for (RemoveListener l:listeners)
321 l.removed(_id, timeout);
322
323 resume();
324 }
325
326
327 public int responded()
328 {
329 synchronized(this)
330 {
331 return _responsesPending--;
332 }
333 }
334
335
336 public int responsePending()
337 {
338 synchronized(this)
339 {
340 return ++_responsesPending;
341 }
342 }
343
344
345
346
347 public void resume()
348 {
349 }
350
351
352
353
354
355 public int getMessages()
356 {
357 return _queue.size();
358 }
359
360
361 public List<Message> takeMessages()
362 {
363 synchronized(this)
364 {
365 ArrayList<Message> list = new ArrayList<Message>(_queue);
366 _queue.clear();
367 return list;
368 }
369 }
370
371
372
373 public void returnMessages(List<Message> messages)
374 {
375 synchronized(this)
376 {
377 _queue.addAll(0,messages);
378 }
379 }
380
381
382 @Override
383 public String toString()
384 {
385 return _id;
386 }
387
388
389 protected void addSubscription(ChannelImpl channel)
390 {
391 synchronized (this)
392 {
393 _subscriptions=(ChannelImpl[])LazyList.addToArray(_subscriptions,channel,null);
394 }
395 }
396
397
398 protected void removeSubscription(ChannelImpl channel)
399 {
400 synchronized (this)
401 {
402 _subscriptions=(ChannelImpl[])LazyList.removeFromArray(_subscriptions,channel);
403 }
404 }
405
406
407 protected void setConnectionType(String type)
408 {
409 synchronized (this)
410 {
411 _type=type;
412 }
413 }
414
415
416 protected void setId(String id)
417 {
418 synchronized (this)
419 {
420 _id=id;
421 }
422 }
423
424
425 protected void unsubscribeAll()
426 {
427 ChannelImpl[] subscriptions;
428 synchronized(this)
429 {
430 _queue.clear();
431 subscriptions=_subscriptions;
432 _subscriptions=new ChannelImpl[0];
433 }
434 for (ChannelImpl channel : subscriptions)
435 channel.unsubscribe(this);
436
437 }
438
439
440 public void setBrowserId(String id)
441 {
442 if (_browserId!=null && !_browserId.equals(id))
443 _bayeux.clientOffBrowser(_browserId,_id);
444 _browserId=id;
445 if (_browserId!=null)
446 _bayeux.clientOnBrowser(_browserId,_id);
447 }
448
449
450 public String getBrowserId()
451 {
452 return _browserId;
453 }
454
455
456 @Override
457 public boolean equals(Object o)
458 {
459 if (!(o instanceof Client))
460 return false;
461 return getId().equals(((Client)o).getId());
462 }
463
464
465
466
467
468
469 public JSON.Literal getAdvice()
470 {
471 return _advice;
472 }
473
474
475
476
477
478 public void setAdvice(JSON.Literal advice)
479 {
480 _advice=advice;
481 }
482
483
484
485 public void addListener(ClientListener listener)
486 {
487 synchronized(this)
488 {
489 if (listener instanceof MessageListener)
490 {
491 if (listener instanceof MessageListener.Synchronous)
492 _syncMListeners=(MessageListener[])LazyList.addToArray(_syncMListeners,listener,MessageListener.class);
493 else
494 _asyncMListeners=(MessageListener[])LazyList.addToArray(_asyncMListeners,listener,MessageListener.class);
495 }
496
497 if (listener instanceof RemoveListener)
498 _rListeners=(RemoveListener[])LazyList.addToArray(_rListeners,listener,RemoveListener.class);
499
500 if (listener instanceof QueueListener)
501 _qListeners=(QueueListener[])LazyList.addToArray(_qListeners,listener,QueueListener.class);
502
503 if (listener instanceof DeliverListener)
504 _dListeners=(DeliverListener[])LazyList.addToArray(_dListeners,listener,DeliverListener.class);
505 }
506 }
507
508
509 public void removeListener(ClientListener listener)
510 {
511 synchronized(this)
512 {
513 if (listener instanceof MessageListener)
514 {
515 _syncMListeners=(MessageListener[])LazyList.removeFromArray(_syncMListeners,listener);
516 _asyncMListeners=(MessageListener[])LazyList.removeFromArray(_asyncMListeners,listener);
517 }
518
519 if (listener instanceof RemoveListener)
520 _rListeners=(RemoveListener[])LazyList.removeFromArray(_rListeners,listener);
521
522 if (listener instanceof QueueListener)
523 _qListeners=(QueueListener[])LazyList.removeFromArray(_qListeners,listener);
524 }
525 }
526
527
528 public long getTimeout()
529 {
530 return _timeout;
531 }
532
533
534 public void setTimeout(long timeoutMS)
535 {
536 _timeout=timeoutMS;
537 }
538
539
540 public void setMaxQueue(int maxQueue)
541 {
542 _maxQueue=maxQueue;
543 }
544
545
546 public int getMaxQueue()
547 {
548 return _maxQueue;
549 }
550
551
552 public Queue<Message> getQueue()
553 {
554 return _queue;
555 }
556
557
558
559
560
561
562 public int getLag()
563 {
564 return _lag;
565 }
566
567
568
569
570
571
572 public void setLag(int lag)
573 {
574 _lag = lag;
575 }
576
577
578 }