使用nio最尴尬的莫过于一不小心cpu利用率就维持在100%,一般的原因可能是
- 注册了OP_WRITE事件
- 对某个注册事件一直没处理(或没处理完)
使用nio应该注意:
- 只在一个线程中操作selector(在多个线程中操作同一个selector就是一场噩梦)
- 只注册当前感兴趣的事件
- 要发送数据时直接写,一次写不完再注册OP_WRITE事件,在下一次可写时发送
实践代码: SelectorProcessor类,分发事件(最好使用线程池) package zzzhc;
import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator;
/** * @author <a href="mailto:[email protected]">zzzhc </a> * */ public class SelectorProcessor implements Runnable { private final Selector selector;
private final Queue waitCloseQueue;
private final Queue waitRegisterQueue;
private final Queue waitAddInterestQueue;
private final Thread processorThread;
private boolean shutdown = false; private final static SelectorProcessor instance; static { try { instance = new SelectorProcessor(); } catch (IOException e) { throw new RuntimeException(e); } } public static SelectorProcessor getDefaultInstance() { return instance; }
public SelectorProcessor() throws IOException { selector = Selector.open(); waitCloseQueue = new Queue(); waitRegisterQueue = new Queue(); waitAddInterestQueue = new Queue(); processorThread = new Thread(this); //processorThread.setDaemon(true);//is this needed? processorThread.start(); }
public void register(SelectableChannel sc, Handler handler, int ops) { if (Thread.currentThread() == processorThread) { doRegister(sc, handler, ops); } else { ChannelAssociater r = new ChannelAssociater(sc, handler, ops); synchronized (waitRegisterQueue) { waitRegisterQueue.push(r); selector.wakeup(); } } }
public void addInterestOps(SelectableChannel sc, int addOps) { if (Thread.currentThread() == processorThread) { addOps(sc, addOps); } else { ChannelAssociater r = new ChannelAssociater(sc, null, addOps); synchronized (waitAddInterestQueue) { waitAddInterestQueue.push(r); selector.wakeup(); } } }
public void closeChannel(SelectableChannel sc) { if (Thread.currentThread() == processorThread) { doClose(sc); } else { synchronized (waitCloseQueue) { waitCloseQueue.push(sc); selector.wakeup(); } } }
protected void doRegister(SelectableChannel sc, Handler handler, int ops) { if (Thread.currentThread() == processorThread) { try { sc.register(selector, ops, handler); } catch (ClosedChannelException e) { e.printStackTrace(); } } }
protected void addOps(SelectableChannel sc, int addOps) { if (Thread.currentThread() == processorThread) { SelectionKey key = sc.keyFor(selector); key.interestOps(key.interestOps() | addOps); } }
protected void doClose(SelectableChannel sc) { if (Thread.currentThread() == processorThread) { try { sc.close(); } catch (IOException e) { e.printStackTrace(); } } }
protected void dealRegister() { if (Thread.currentThread() == processorThread) { synchronized (waitRegisterQueue) { while (!waitRegisterQueue.isEmpty()) { ChannelAssociater ca = (ChannelAssociater) waitRegisterQueue .shift(); doRegister(ca.sc, ca.handler, ca.ops); } } } }
protected void dealAddInterest() { if (Thread.currentThread() == processorThread) { synchronized (waitAddInterestQueue) { while (!waitAddInterestQueue.isEmpty()) { ChannelAssociater ca = (ChannelAssociater) waitAddInterestQueue .shift(); addOps(ca.sc, ca.ops); } } } }
protected void dealClose() { if (Thread.currentThread() == processorThread) { synchronized (waitCloseQueue) { while (!waitCloseQueue.isEmpty()) { SelectableChannel sc = (SelectableChannel) waitCloseQueue .shift(); doClose(sc); } } } }
protected void dealShutdown() { Iterator iterator = selector.keys().iterator(); while (iterator.hasNext()) { try { SelectionKey key = (SelectionKey) iterator.next(); key.channel().close(); } catch (IOException e) { e.printStackTrace(); } } try { selector.close(); } catch (IOException e) { e.printStackTrace(); } }
public void shutdown() { this.shutdown = true; selector.wakeup(); }
public void run() { int keyCount = 0; while (!shutdown) { dealRegister(); dealAddInterest(); dealClose(); try { keyCount = selector.select(); if (keyCount == 0) { continue; } Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = (SelectionKey) iterator.next(); iterator.remove(); key.interestOps(key.interestOps() & ~key.readyOps()); Handler handler = (Handler) key.attachment(); if (key.isAcceptable()) { ((AcceptHandler) handler).handleAccept(); } else if (key.isConnectable()) { ((ConnectHandler) handler).handleConnect(); } else { ReadWriteHandler rwh = ((ReadWriteHandler) handler); if (key.isValid() && key.isReadable()) { rwh.handleRead(); } else if (key.isValid() && key.isWritable()) { rwh.handleWrite(); } } } } catch (ClosedSelectorException cse) { System.err.println("selector closed:" + cse.getMessage() + "\nquit"); return; } catch (IOException e) { e.printStackTrace(); } } dealShutdown(); }
class ChannelAssociater { SelectableChannel sc;
Handler handler;
int ops;
ChannelAssociater(SelectableChannel sc, Handler handler, int ops) { this.sc = sc; this.handler = handler; this.ops = ops; } }
}
//Queue类,对LinkedArrayList的简单包装,提供push,pop,shift,unshift操作,用起来顺手 package zzzhc;
import java.util.*;
/** * @author <a href="mailto:[email protected]">zzzhc</a> * */ public class Queue { private LinkedList content = new LinkedList(); public void unshift(Object o) { content.addFirst(o); } public Object shift() { return content.removeFirst(); } public void push(Object o) { content.addLast(o); } public Object pop() { return content.removeLast(); } public boolean isEmpty() { return content.isEmpty(); } public int size() { return content.size(); }
}
//Handler,ConnectHandler ,AcceptHandler ,ReadWriteHandler 接口,处理事件 package zzzhc;
/** * @author <a href="mailto:[email protected]">zzzhc</a> * */ public interface Handler {
}
package zzzhc;
/** * @author <a href="mailto:[email protected]">zzzhc</a> * */ public interface ConnectHandler extends Handler { void handleConnect();
}
package zzzhc;
/** * @author <a href="mailto:[email protected]">zzzhc</a> * */ public interface AcceptHandler extends Handler { void handleAccept();
}
package zzzhc;
/** * @author <a href="mailto:[email protected]">zzzhc</a> * */ public interface ReadWriteHandler extends Handler { void handleRead(); void handleWrite();
}

|