1 package org.cometd.oort;
2
3 import java.util.Collection;
4 import java.util.List;
5 import java.util.Map;
6 import java.util.Set;
7 import java.util.concurrent.ConcurrentHashMap;
8 import java.util.concurrent.ConcurrentMap;
9
10 import org.cometd.Channel;
11 import org.cometd.Client;
12 import org.cometd.Message;
13 import org.cometd.MessageListener;
14 import org.mortbay.cometd.ClientImpl;
15 import org.mortbay.component.AbstractLifeCycle;
16 import org.mortbay.util.LazyList;
17 import org.mortbay.util.MultiMap;
18 import org.mortbay.util.ajax.JSON;
19 import org.mortbay.util.ajax.JSON.Output;
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 public class Seti extends AbstractLifeCycle
44 {
45 public final static String SETI_ATTRIBUTE="org.cometd.oort.Seti";
46 public final static String SETI_SHARD="seti.shard";
47
48 final String _setiId;
49 final String _setiChannelId;
50 final String _shardId;
51 final Oort _oort;
52 final Client _client;
53 final ShardLocation _allShardLocation;
54 final Channel _setiIdChannel;
55 final Channel _setiAllChannel;
56 final Channel _setiShardChannel;
57
58 final ConcurrentMap<String, Location> _uid2Location = new ConcurrentHashMap<String, Location>();
59
60
61 public Seti(Oort oort, String shardId)
62 {
63 _oort=oort;
64 _client = _oort.getBayeux().newClient("seti");
65 _setiId=_oort.getURL().replace("://","_").replace("/","_").replace(":","_");
66 _shardId=shardId;
67
68 _setiChannelId="/seti/"+_setiId;
69 _setiIdChannel=_oort.getBayeux().getChannel(_setiChannelId,true);
70 _setiIdChannel.setPersistent(true);
71 _oort.observeChannel(_setiIdChannel.getId());
72 _setiIdChannel.subscribe(_client);
73
74 _setiAllChannel=_oort.getBayeux().getChannel("/seti/ALL",true);
75 _setiAllChannel.setPersistent(true);
76 _oort.observeChannel(_setiAllChannel.getId());
77 _setiAllChannel.subscribe(_client);
78
79 _setiShardChannel=_oort.getBayeux().getChannel("/seti/"+shardId,true);
80 _setiShardChannel.setPersistent(true);
81 _oort.observeChannel(_setiShardChannel.getId());
82 _setiShardChannel.subscribe(_client);
83
84 _allShardLocation = new ShardLocation(_setiAllChannel);
85
86 }
87
88
89 protected void doStart()
90 throws Exception
91 {
92 super.doStart();
93 _client.addListener(new MessageListener()
94 {
95 public void deliver(Client from, Client to, Message msg)
96 {
97 receive(from,to,msg);
98 }
99 });
100 }
101
102
103 protected void doStop()
104 throws Exception
105 {
106 _client.disconnect();
107 }
108
109
110 public void associate(final String userId,final Client client)
111 {
112 _uid2Location.put(userId,new LocalLocation(client));
113 userId2Shard(userId).associate(userId);
114 }
115
116
117 public void disassociate(final String userId)
118 {
119 _uid2Location.remove(userId);
120 userId2Shard(userId).disassociate(userId);
121 }
122
123
124 public void sendMessage(final String toUser,final String toChannel,final Object message)
125 {
126 Location location = _uid2Location.get(toUser);
127 if (location==null)
128 location = userId2Shard(toUser);
129
130 location.sendMessage(toUser,toChannel,message);
131 }
132
133
134 public void sendMessage(final Collection<String> toUsers,final String toChannel,final Object message)
135 {
136
137 MultiMap shard2users = new MultiMap();
138 for (String userId:toUsers)
139 {
140 ShardLocation shard = userId2Shard(userId);
141 shard2users.add(shard,userId);
142 }
143
144
145 for (Map.Entry<ShardLocation,Object> entry : (Set<Map.Entry<ShardLocation,Object>>)shard2users.entrySet())
146 {
147
148
149
150
151
152
153
154 ShardLocation shard = entry.getKey();
155 Object lazyUsers = entry.getValue();
156
157 if (LazyList.size(lazyUsers)==1)
158 shard.sendMessage((String)lazyUsers,toChannel,message);
159 else
160 shard.sendMessage((List<String>)lazyUsers,toChannel,message);
161 }
162 }
163
164
165 protected ShardLocation userId2Shard(final String userId)
166 {
167 return _allShardLocation;
168 }
169
170
171 protected void receive(final Client from, final Client to, final Message msg)
172 {
173
174
175 if (!(msg.getData() instanceof Map))
176 return;
177
178
179 Map<String,Object> data = (Map<String,Object>)msg.getData();
180 final String toUid=(String)data.get("to");
181 final String fromUid=(String)data.get("from");
182 final Object message = data.get("message");
183 final String on = (String)data.get("on");
184
185
186 if (fromUid!=null)
187 {
188 if (on!=null)
189 {
190
191 _uid2Location.put(fromUid,new SetiLocation("/seti/"+on));
192 }
193 else
194 {
195 final String off = (String)data.get("off");
196 if (off!=null)
197 {
198
199 _uid2Location.remove(fromUid,new SetiLocation("/seti/"+off));
200 }
201 }
202 }
203
204
205 if (message!=null && toUid!=null)
206 {
207 final String toChannel=(String)data.get("channel");
208 Location location=_uid2Location.get(toUid);
209
210 if (location==null && _setiChannelId.equals(msg.getChannel()))
211
212 location =userId2Shard(toUid);
213
214 if (location!=null)
215 location.receive(toUid,toChannel,message);
216 }
217
218 }
219
220
221
222
223 private interface Location
224 {
225 public void sendMessage(String toUser,String toChannel,Object message);
226 public void receive(String toUser,String toChannel,Object message);
227 }
228
229
230
231
232 class LocalLocation implements Location
233 {
234 Client _client;
235
236 LocalLocation(Client client)
237 {
238 _client=client;
239 }
240
241 public void sendMessage(String toUser, String toChannel, Object message)
242 {
243 _client.deliver(Seti.this._client,toChannel,message,null);
244 }
245
246 public void receive(String toUser, String toChannel, Object message)
247 {
248 _client.deliver(Seti.this._client,toChannel,message,null);
249 }
250 }
251
252
253
254 class SetiLocation implements Location
255 {
256 Channel _channel;
257
258 SetiLocation(String channelId)
259 {
260 _channel=_oort._bayeux.getChannel(channelId,true);
261 }
262
263 SetiLocation(Channel channel)
264 {
265 _channel=channel;
266 }
267
268 public void sendMessage(String toUser, String toChannel, Object message)
269 {
270 _channel.publish(Seti.this._client,new SetiMessage(toUser,toChannel,message),null);
271 }
272
273 public void receive(String toUser, String toChannel, Object message)
274 {
275
276 }
277
278 public boolean equals(Object o)
279 {
280 return o instanceof SetiLocation &&
281 ((SetiLocation)o)._channel.equals(_channel);
282 }
283
284 public int hashCode()
285 {
286 return _channel.hashCode();
287 }
288 }
289
290
291
292 class ShardLocation implements Location
293 {
294 Channel _channel;
295
296 ShardLocation(String shardId)
297 {
298 _channel=_oort._bayeux.getChannel("/seti/"+shardId,true);
299
300 }
301
302 ShardLocation(Channel channel)
303 {
304 _channel=channel;
305 }
306
307 public void sendMessage(final Collection<String> toUsers, final String toChannel, final Object message)
308 {
309 _channel.publish(Seti.this._client,new SetiMessage(toUsers,toChannel,message),null);
310 }
311
312 public void sendMessage(String toUser, String toChannel, Object message)
313 {
314 _channel.publish(Seti.this._client,new SetiMessage(toUser,toChannel,message),null);
315 }
316
317 public void receive(String toUser, String toChannel, Object message)
318 {
319
320 }
321
322 public void associate(final String user)
323 {
324 _channel.publish(Seti.this._client,new SetiPresence(user,true),null);
325 }
326
327 public void disassociate(final String user)
328 {
329 _channel.publish(Seti.this._client,new SetiPresence(user,false),null);
330 }
331 }
332
333
334
335 class SetiMessage implements JSON.Convertible
336 {
337 String _toUser;
338 Collection<String> _toUsers;
339 String _toChannel;
340 Object _message;
341
342 SetiMessage(String toUser,String toChannel, Object message)
343 {
344 _toUser=toUser;
345 _toChannel=toChannel;
346 _message=message;
347 }
348
349 SetiMessage(Collection<String> toUsers,String toChannel, Object message)
350 {
351 _toUsers=toUsers;
352 _toChannel=toChannel;
353 _message=message;
354 }
355
356 public void fromJSON(Map object)
357 {
358 throw new UnsupportedOperationException();
359 }
360
361 public void toJSON(Output out)
362 {
363 if (_toUser!=null)
364 out.add("to",_toUser);
365 else if (_toUsers!=null)
366 out.add("to",_toUsers);
367 out.add("channel",_toChannel);
368 out.add("from",_setiId);
369 out.add("message",_message);
370 }
371 }
372
373
374
375 class SetiPresence implements JSON.Convertible
376 {
377 String _user;
378 boolean _on;
379
380 SetiPresence(String user,boolean on)
381 {
382 _user=user;
383 _on=on;
384 }
385
386 public void fromJSON(Map object)
387 {
388 throw new UnsupportedOperationException();
389 }
390
391 public void toJSON(Output out)
392 {
393 out.add("from",_user);
394 out.add(_on?"on":"off",_setiId);
395 }
396 }
397
398 }