|  
 质数查询客户端 
这个客户端的目的是尽可能的将计算量分布到其它PEER上.考虑用户要获得1到10000之间的质数, 一个peer接到这个消息的时候, 它就需要决定要分给几个peer来做这件事情。.因此,它需要不断的去发现那些通告自己有质数服务的peer,并为它们的通告提供一个缓冲来保存. 例如一个peer有另外10个peer和它一起工作, 那么它可能在第一个消息中以1作为LOW_INT, 1000作HIGH_INT, 在另外一个消息中,以1001作为LOW_INT, 2000作为HIGH_INT,以此类推. 最后, 客户端将打开一个管道通往这10个peer,然后分别向他们传输消息。 
客户端的skeleton看起和服务端的差不多, 同样是先初始化Net Peer Group,然后获得GROUP的发现服务和管道服务。. 
  
Listing 16.10 PrimeClient 
package primecruncher; 
import net.jxta.peergroup.PeerGroup; 
import net.jxta.peergroup.PeerGroupFactory; 
import net.jxta.discovery.DiscoveryService; 
import net.jxta.discovery.DiscoveryListener; 
import net.jxta.discovery.DiscoveryEvent; 
import net.jxta.pipe.PipeService; 
import net.jxta.pipe.OutputPipe; 
import net.jxta.pipe.PipeID; 
import net.jxta.exception.PeerGroupException; 
import net.jxta.protocol.DiscoveryResponseMsg; 
import net.jxta.protocol.ModuleSpecAdvertisement; 
import net.jxta.protocol.PipeAdvertisement; 
import net.jxta.document.StructuredTextDocument; 
import net.jxta.document.MimeMediaType; 
import net.jxta.document.TextElement; 
import net.jxta.document.AdvertisementFactory; 
import net.jxta.id.IDFactory; 
import net.jxta.endpoint.Message; 
import java.util.Enumeration; 
import java.io.StringWriter; 
import java.io.IOException; 
import java.net.URL; 
import java.net.MalformedURLException; 
import java.net.UnknownServiceException; 
import java.util.HashSet; 
import java.util.Set; 
public class PrimeClient implements DiscoveryListener { 
        private static PeerGroup group; 
        private static DiscoveryService discoSvc; 
        private static PipeService pipeSvc; 
        private OutputPipe outputPipe; 
        private Set adverts = new HashSet(); 
        public PrimeClient() { 
        } 
        public static void main(String[] argv) { 
               Client cl = new Client(); 
               cl.startJxta(); 
               cl.doDiscovery(); 
        } 
        public int[] processPrimes(int low, int high) { 
        } 
        private void startJxta() { 
               try { 
                       group = PeerGroupFactory.newNetPeerGroup(); 
                       discoSvc = group.getDiscoveryService(); 
                       pipeSvc = group.getPipeService(); 
               } catch (PeerGroupException e) { 
                       System.out.println("Can't create net peer group: " + 
                               e.getMessage()); 
                       System.exit(-1); 
               } 
        } 
        private void doDiscovery() { 
  
        } 
} 
虽然PrimePeer的关键责任是通告它自己的服务和处理收到的消息, PrimeClient 同时必须参与到服务发现过程中) 
 doDiscovery()方法初始化了服务发现。 首先,peer调查自己缓冲,在质数计算module的说明中去发现与名字属性想匹配的通告 
Listing 16.11 Performing Local Discovery 
        System.out.println("Starting service discovery..."); 
        System.out.println("Searching local cache for " + 
               ServiceConstants.SPEC_NAME + " advertisements"); 
        Enumeration res = null; 
        try { 
                res = discoSvc.getLocalAdvertisements(DiscoveryService.ADV, 
                      "Name", ServiceConstants.SPEC_NAME); 
        } catch (IOException e) { 
               System.out.println("IO Exception."); 
        } 
        if (res != null) { 
               while (res.hasMoreElements()) { 
                       processAdv((ModuleSpecAdvertisement)res.nextElement()); 
               } 
} 
然后,peer初始化远程通告发现, 远程发现意味着发现请求将在JXTA网络中传播,当合适的通告被发现后, 就发出回应。 因此, 远程发现是一个异步的过程,我们传递DiscoveryListener 作为一个值(argument)到DiscoveryService 的 getRemoteAdvertisements()方法中。 另外,我们还必须说明我们需要从每个peer中得到的通告数目。 
一旦远程发现被初始化了, 被发现的通告就被存储在本地的通告cache中, 这样,下一次这个peer开始工作的时候, 它将从这个缓冲中发现通告。 
Listing 16.12 Initiating Remote Service Discovery 
        System.out.println("Starting remote discovery..."); 
        discoSvc.getRemoteAdvertisements(null, DiscoveryService.ADV, 
                "Name", ServiceConstants.SPEC_NAME, 1, this); 
} 
DiscoveryListener 说明了discoveryEvent()方法在每次被调用的时候将发现一个匹配标准的通告,一个DiscoveryEvent 包括了一个DiscoveryReponseMsg, 它包含一个在远程发现中发现的实际的通告。 我们获得了这些通告的枚举 然后对每个进行操作。 
Listing 16.13 Implementing a DiscoveryListener 
        public void discoveryEvent(DiscoveryEvent event) { 
               System.out.println("DiscoveryEvent called"); 
               DiscoveryResponseMsg  mes = event.getResponse(); 
               //these contain the responses found 
               Enumeration res = mes.getResponses(); 
               if (res != null) { 
                      while (res.hasMoreElements()) { 
                              processAdv((ModuleSpecAdvertisement) res.nextElement()); 
                      } 
               } 
} 
processAdv()方法非常简单,它插入每个ModuleSpecAdvertisement到一个集合中,这个集合保证没有存储重复的通告, 这个集合作为module的说明通告的缓冲被使用。 
        private void processAdv(ModuleSpecAdvertisement ad) {  
               adverts.add(ad); 
}  
 
  |