View Javadoc

1   package org.cometd.oort;
2   
3   import java.util.Collection;
4   import java.util.List;
5   import java.util.Map;
6   import java.util.Set;
7   import java.util.concurrent.ConcurrentHashMap;
8   import java.util.concurrent.ConcurrentMap;
9   
10  import org.cometd.Channel;
11  import org.cometd.Client;
12  import org.cometd.Message;
13  import org.cometd.MessageListener;
14  import org.mortbay.cometd.ClientImpl;
15  import org.mortbay.component.AbstractLifeCycle;
16  import org.mortbay.util.LazyList;
17  import org.mortbay.util.MultiMap;
18  import org.mortbay.util.ajax.JSON;
19  import org.mortbay.util.ajax.JSON.Output;
20  
21  /* ------------------------------------------------------------ */
22  /** The Search for Extra Terrestial Intelligence.
23   * 
24   * Well in this case, just the search for a user logged onto an
25   * Cometd node in an Oort cluster.
26   * <p>
27   * Seti allows an application to maintain a mapping from userId to 
28   * comet client ID using the {@link #associate(String, Client)} and
29   * {@link #disassociate(String)} methods. Each cometd node keeps its
30   * own associate mapping for clients connected to it.
31   * <p>
32   * The {@link #sendMessage(Collection, String, Object)} and 
33   * {@link #sendMessage(String, String, Object)} methods may be
34   * used to send a message to user(s) anywhere in the Oort cluster
35   * and Seti organizes the search of the distributed associate
36   * maps in order to locate the user(s)
37   * <p>
38   * If users can be directed to shards of cometd servers, then
39   * each Seti instance must be told it's shard ID and the {@link #userId2Shard(String)}
40   * method must be extended to map users to shards.
41   * 
42   */
43  public class Seti extends AbstractLifeCycle
44  {
45      public final static String SETI_ATTRIBUTE="org.cometd.oort.Seti";
46      public final static String SETI_SHARD="seti.shard";
47      
48      final String _setiId;
49      final String _setiChannelId;
50      final String _shardId;
51      final Oort _oort;
52      final Client _client;
53      final ShardLocation _allShardLocation;
54      final Channel _setiIdChannel;
55      final Channel _setiAllChannel;
56      final Channel _setiShardChannel;
57      
58      final ConcurrentMap<String, Location> _uid2Location = new ConcurrentHashMap<String, Location>();
59      
60      /* ------------------------------------------------------------ */
61      public Seti(Oort oort, String shardId)
62      {
63          _oort=oort;
64          _client = _oort.getBayeux().newClient("seti");
65          _setiId=_oort.getURL().replace("://","_").replace("/","_").replace(":","_");
66          _shardId=shardId;
67  
68          _setiChannelId="/seti/"+_setiId;
69          _setiIdChannel=_oort.getBayeux().getChannel(_setiChannelId,true);
70          _setiIdChannel.setPersistent(true);
71          _oort.observeChannel(_setiIdChannel.getId());
72          _setiIdChannel.subscribe(_client);
73          
74          _setiAllChannel=_oort.getBayeux().getChannel("/seti/ALL",true);
75          _setiAllChannel.setPersistent(true);
76          _oort.observeChannel(_setiAllChannel.getId());
77          _setiAllChannel.subscribe(_client);
78          
79          _setiShardChannel=_oort.getBayeux().getChannel("/seti/"+shardId,true);
80          _setiShardChannel.setPersistent(true);
81          _oort.observeChannel(_setiShardChannel.getId());
82          _setiShardChannel.subscribe(_client);
83          
84          _allShardLocation = new ShardLocation(_setiAllChannel);
85           
86      }
87  
88      /* ------------------------------------------------------------ */
89      protected void doStart()
90          throws Exception
91      {
92          super.doStart();
93          _client.addListener(new MessageListener()
94          {
95              public void deliver(Client from, Client to, Message msg)
96              {
97                  receive(from,to,msg);
98              }
99          });
100     }
101     
102     /* ------------------------------------------------------------ */
103     protected void doStop()
104         throws Exception
105     {
106         _client.disconnect();
107     }
108     
109     /* ------------------------------------------------------------ */
110     public void associate(final String userId,final Client client)
111     {
112         _uid2Location.put(userId,new LocalLocation(client));
113         userId2Shard(userId).associate(userId);
114     }
115 
116     /* ------------------------------------------------------------ */
117     public void disassociate(final String userId)
118     {
119         _uid2Location.remove(userId);
120         userId2Shard(userId).disassociate(userId);
121     }
122 
123     /* ------------------------------------------------------------ */
124     public void sendMessage(final String toUser,final String toChannel,final Object message)
125     {
126         Location location = _uid2Location.get(toUser);
127         if (location==null)
128             location = userId2Shard(toUser);
129         
130         location.sendMessage(toUser,toChannel,message);
131     }
132 
133     /* ------------------------------------------------------------ */
134     public void sendMessage(final Collection<String> toUsers,final String toChannel,final Object message)
135     {
136         // break toUsers in to shards
137         MultiMap shard2users = new MultiMap();
138         for (String userId:toUsers)
139         {       
140             ShardLocation shard = userId2Shard(userId);
141             shard2users.add(shard,userId);
142         }
143         
144         // for each shard
145         for (Map.Entry<ShardLocation,Object> entry : (Set<Map.Entry<ShardLocation,Object>>)shard2users.entrySet())
146         {
147             // TODO, we could look at all users in shard to see if we
148             // know a setiId for each, and if so, break the user list
149             // up into a message for each seti-id. BUT it is probably 
150             // more efficient just to send to the entire shard (unless
151             // the number of nodes in the shard is greater than the
152             // number of users).
153             
154             ShardLocation shard = entry.getKey();
155             Object lazyUsers = entry.getValue();
156             
157             if (LazyList.size(lazyUsers)==1)
158                 shard.sendMessage((String)lazyUsers,toChannel,message);
159             else
160                 shard.sendMessage((List<String>)lazyUsers,toChannel,message);
161         }
162     }
163     
164     /* ------------------------------------------------------------ */
165     protected ShardLocation userId2Shard(final String userId)
166     {
167         return _allShardLocation;
168     }
169 
170     /* ------------------------------------------------------------ */
171     protected void receive(final Client from, final Client to, final Message msg)
172     {
173         //System.err.println("SETI "+_oort+":: "+msg);
174 
175         if (!(msg.getData() instanceof Map))
176             return;
177         
178         // extract the message details
179         Map<String,Object> data = (Map<String,Object>)msg.getData();
180         final String toUid=(String)data.get("to");
181         final String fromUid=(String)data.get("from");
182         final Object message = data.get("message");
183         final String on = (String)data.get("on");
184         
185         // Handle any client locations contained in the message
186         if (fromUid!=null)
187         {
188             if (on!=null)
189             {
190                 //System.err.println(_oort+":: "+fromUid+" on "+on);
191                 _uid2Location.put(fromUid,new SetiLocation("/seti/"+on));
192             }
193             else 
194             {
195                 final String off = (String)data.get("off");
196                 if (off!=null)
197                 {
198                     //System.err.println(_oort+":: "+fromUid+" off ");
199                     _uid2Location.remove(fromUid,new SetiLocation("/seti/"+off));
200                 }
201             }
202         }
203         
204         // deliver message
205         if (message!=null && toUid!=null)
206         {
207             final String toChannel=(String)data.get("channel");
208             Location location=_uid2Location.get(toUid);
209             
210             if (location==null && _setiChannelId.equals(msg.getChannel()))
211                 // was sent to this node, so escalate to the shard.
212                 location =userId2Shard(toUid);
213             
214             if (location!=null)
215                 location.receive(toUid,toChannel,message);
216         }
217         
218     }
219 
220 
221     /* ------------------------------------------------------------ */
222     /* ------------------------------------------------------------ */
223     private interface Location
224     {
225         public void sendMessage(String toUser,String toChannel,Object message);
226         public void receive(String toUser,String toChannel,Object message);
227     }
228     
229 
230     /* ------------------------------------------------------------ */
231     /* ------------------------------------------------------------ */
232     class LocalLocation implements Location
233     {
234         Client _client;
235         
236         LocalLocation(Client client)
237         {
238             _client=client;
239         }
240 
241         public void sendMessage(String toUser, String toChannel, Object message)
242         {
243             _client.deliver(Seti.this._client,toChannel,message,null);
244         }
245 
246         public void receive(String toUser, String toChannel, Object message)
247         {
248             _client.deliver(Seti.this._client,toChannel,message,null);
249         }
250     }
251 
252     /* ------------------------------------------------------------ */
253     /* ------------------------------------------------------------ */
254     class SetiLocation implements Location
255     {
256         Channel _channel;
257 
258         SetiLocation(String channelId)
259         {
260             _channel=_oort._bayeux.getChannel(channelId,true);
261         }
262         
263         SetiLocation(Channel channel)
264         {
265             _channel=channel;
266         }
267         
268         public void sendMessage(String toUser, String toChannel, Object message)
269         {
270             _channel.publish(Seti.this._client,new SetiMessage(toUser,toChannel,message),null);
271         }
272 
273         public void receive(String toUser, String toChannel, Object message)
274         {
275             
276         }
277 
278         public boolean equals(Object o)
279         {
280             return o instanceof SetiLocation &&
281             ((SetiLocation)o)._channel.equals(_channel);
282         }
283         
284         public int hashCode()
285         {
286             return _channel.hashCode();
287         }
288     }
289 
290     /* ------------------------------------------------------------ */
291     /* ------------------------------------------------------------ */
292     class ShardLocation implements Location
293     {
294         Channel _channel;
295         
296         ShardLocation(String shardId)
297         {
298             _channel=_oort._bayeux.getChannel("/seti/"+shardId,true);
299             
300         }
301         
302         ShardLocation(Channel channel)
303         {
304             _channel=channel;
305         }
306         
307         public void sendMessage(final Collection<String> toUsers, final String toChannel, final Object message)
308         {
309             _channel.publish(Seti.this._client,new SetiMessage(toUsers,toChannel,message),null);
310         }
311 
312         public void sendMessage(String toUser, String toChannel, Object message)
313         {
314             _channel.publish(Seti.this._client,new SetiMessage(toUser,toChannel,message),null);
315         }
316         
317         public void receive(String toUser, String toChannel, Object message)
318         {
319             
320         }
321         
322         public void associate(final String user)
323         {
324             _channel.publish(Seti.this._client,new SetiPresence(user,true),null);
325         }
326         
327         public void disassociate(final String user)
328         {
329             _channel.publish(Seti.this._client,new SetiPresence(user,false),null);
330         }
331     }
332 
333     /* ------------------------------------------------------------ */
334     /* ------------------------------------------------------------ */
335     class SetiMessage implements JSON.Convertible
336     {
337         String _toUser;
338         Collection<String> _toUsers;
339         String _toChannel;
340         Object _message;
341 
342         SetiMessage(String toUser,String toChannel, Object message)
343         {
344             _toUser=toUser;
345             _toChannel=toChannel;
346             _message=message;
347         }
348         
349         SetiMessage(Collection<String> toUsers,String toChannel, Object message)
350         {
351             _toUsers=toUsers;
352             _toChannel=toChannel;
353             _message=message;
354         }
355         
356         public void fromJSON(Map object)
357         {
358             throw new UnsupportedOperationException();
359         }
360 
361         public void toJSON(Output out)
362         {
363             if (_toUser!=null)
364                 out.add("to",_toUser);
365             else if (_toUsers!=null)
366                 out.add("to",_toUsers);
367             out.add("channel",_toChannel);
368             out.add("from",_setiId);
369             out.add("message",_message);
370         }   
371     }
372     
373     /* ------------------------------------------------------------ */
374     /* ------------------------------------------------------------ */
375     class SetiPresence implements JSON.Convertible
376     {
377         String _user;
378         boolean _on;
379 
380         SetiPresence(String user,boolean on)
381         {
382             _user=user;
383             _on=on;
384         }
385         
386         public void fromJSON(Map object)
387         {
388             throw new UnsupportedOperationException();
389         }
390 
391         public void toJSON(Output out)
392         {
393             out.add("from",_user);
394             out.add(_on?"on":"off",_setiId);
395         }
396     }
397     
398 }