前面的可能还是不方便,再具体一点: package zzzhc;
/** * @author <a href="mailto:[email protected]">zzzhc</a> * */ public interface SocketHandler extends ConnectHandler, ReadWriteHandler { void onConnected(); void onConnectFailed(String msg); /** * 在数据从channel中读出后被调用. * */ void onRead(); /** * 在要写的数所已写入channnel后被调用. * */ void onWrite(); void onClosed(String msg);
}
//抽象实现 package zzzhc;
import java.io.IOException; import java.net.SocketAddress; import java.net.SocketException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel;
/** * @author <a href="mailto:[email protected]">zzzhc </a> * */ public abstract class AbstractSocketHandler implements SocketHandler { public final static int DEFAULT_BUFFER_SIZE = 2 * 1024;
protected final SelectorProcessor processor;
protected final SocketChannel sc;
protected final ByteBuffer readBuf;
protected final ByteBuffer writeBuf; protected SocketAddress localAddress;
protected SocketAddress remoteAddress;
protected boolean connected = false;
protected boolean closed = false;
public AbstractSocketHandler(SelectorProcessor processor, SocketAddress remoteAddress) throws IOException { this.processor = processor; this.remoteAddress = remoteAddress; this.sc = SocketChannel.open(); this.sc.configureBlocking(false); this.sc.connect(remoteAddress); int readSize = DEFAULT_BUFFER_SIZE; int writeSize = DEFAULT_BUFFER_SIZE; try { readSize = sc.socket().getReceiveBufferSize(); writeSize = sc.socket().getSendBufferSize(); } catch (SocketException e1) { } readBuf = ByteBuffer.allocate(readSize); writeBuf = ByteBuffer.allocate(writeSize); processor.register(sc, this, SelectionKey.OP_CONNECT); }
public AbstractSocketHandler(SelectorProcessor processor, SocketAddress remoteAddress, ByteBuffer readBuf, ByteBuffer writeBuf) throws IOException { this.processor = processor; this.remoteAddress = remoteAddress; this.sc = SocketChannel.open(); this.sc.configureBlocking(false); this.sc.connect(remoteAddress); readBuf.clear(); writeBuf.clear(); this.readBuf = readBuf; this.writeBuf = writeBuf; processor.register(sc, this, SelectionKey.OP_CONNECT); }
public AbstractSocketHandler(SelectorProcessor processor, SocketChannel sc) { this.processor = processor; this.sc = sc; this.connected = true; if (this.sc.isBlocking()) { try { this.sc.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); } } int readSize = DEFAULT_BUFFER_SIZE; int writeSize = DEFAULT_BUFFER_SIZE; try { readSize = sc.socket().getReceiveBufferSize(); writeSize = sc.socket().getSendBufferSize(); } catch (SocketException e1) { } readBuf = ByteBuffer.allocateDirect(readSize); writeBuf = ByteBuffer.allocateDirect(writeSize); processor.register(sc, this, SelectionKey.OP_READ); }
public AbstractSocketHandler(SelectorProcessor processor, SocketChannel sc, ByteBuffer readBuf, ByteBuffer writeBuf) { this.processor = processor; this.sc = sc; this.connected = true; if (this.sc.isBlocking()) { try { this.sc.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); } } readBuf.clear(); writeBuf.clear(); this.readBuf = readBuf; this.writeBuf = writeBuf; processor.register(sc, this, SelectionKey.OP_READ); }
public void onConnected() { System.out.println("connect to "+this.getRemoteAddress()+" ok"); }
public void onConnectFailed(String msg) { System.out.println("connect to "+this.getRemoteAddress()+" failed:"+msg); }
/** * 如果一次没读完,最后须调用readBuf.compact(). * 如果已读完,须调用readBuf.clear(). */ public void onRead() { readBuf.flip(); int len = readBuf.limit(); byte[] buf = new byte[len]; readBuf.get(buf); readBuf.clear(); System.out.print(new String(buf)); writeBuf.put(buf); writeBuf.flip(); enableWrite(); }
public void onWrite() { }
public void onClosed(String msg) { System.out.println("channel closed:"+msg); }
public void handleConnect() { try { if (sc.finishConnect()) { onConnected(); connected = true; } else { onConnectFailed(""); } processor.addInterestOps(sc, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); onConnectFailed(e.getMessage()); } }
public void handleRead() { try { //System.out.println("read"); int len = sc.read(readBuf); if (len == -1) {//closed dispose("channel been closed correct"); } else { //readBuf.flip(); onRead(); processor.addInterestOps(sc,SelectionKey.OP_READ); } } catch (IOException e) { e.printStackTrace(); dispose(e.getMessage()); } }
public void handleWrite() { if (write()==true) { onWrite(); } } /** * 将writeBuf中的数据写入socketchannel中,如写完清空writeBuf返回true,否则返加false. * 在使用该方法前应先对writeBuf调用flip()方法. * @return */ protected boolean write() { if (writeBuf.hasRemaining()) { try { sc.write(writeBuf); } catch (IOException e) { e.printStackTrace(); dispose(e.getMessage()); return false; } } if (writeBuf.hasRemaining()) { enableWrite(); return false; } else { writeBuf.clear(); return true; } }
public SocketAddress getLocalAddress() { if (localAddress == null) { localAddress = this.sc.socket().getLocalSocketAddress(); } return localAddress; }
public SocketAddress getRemoteAddress() { if (remoteAddress == null) { remoteAddress = this.sc.socket().getRemoteSocketAddress(); } return remoteAddress; }
public void dispose(String msg) { if (!closed) { closed = true; processor.closeChannel(sc); onClosed(msg); } }
public boolean isConnected() { return connected; }
public boolean isClosed() { return closed; } public void close() { processor.closeChannel(sc); }
public void enableRead() { processor.addInterestOps(sc, SelectionKey.OP_READ); }
public void enableWrite() { processor.addInterestOps(sc, SelectionKey.OP_WRITE); }
}
//使用方法,先在console运行nc -l -p 1234再运行EchoClient package zzzhc;
import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress;
/** * @author <a href="mailto:[email protected]">zzzhc</a> * */ public class EchoClient extends AbstractSocketHandler { public EchoClient(SelectorProcessor processor,SocketAddress remote) throws IOException { super(processor,remote); }
public static void main(String[] args) throws IOException{ EchoClient client = new EchoClient(SelectorProcessor.getDefaultInstance(),new InetSocketAddress("localhost",1234)); } }
//复杂一点的 package zzzhc;
import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URL; import java.nio.channels.FileChannel;
/** * @author <a href="mailto:[email protected]">zzzhc </a> * */ public class SimpleHttpClient extends AbstractSocketHandler {
public final static int START_CONNECT = 0;
public final static int CONNECT_OK = 1;
public final static int CONNECT_FAILED = 2;
public final static int TRY_SEND_REQUEST = 10;
public final static int REQUEST_SENT = 11;
//public final static int RECV_RESPONSE_HEADER = 20; public final static int RESPONSE_HEADER_END = 21;
public final static int START_DATA_TRANSFER = 30;
public final static int END_DATA_TRANSFER = 31;
protected int status = START_CONNECT;
protected URL url;
protected FileChannel fc;
protected FileOutputStream out; private StringBuffer header = new StringBuffer();
public SimpleHttpClient(SelectorProcessor processor, SocketAddress remote, URL url) throws IOException { super(processor, remote); this.url = url; }
public void onConnected() { super.onConnected(); try { Thread.sleep(10); }catch (Exception e){} status = CONNECT_OK; fillHeader(url); enableWrite(); status = TRY_SEND_REQUEST; }
protected void fillHeader(URL url) { writeBuf.clear(); writeBuf.put("GET ".getBytes()).put(url.getPath().getBytes()).put( " HTTP/1.1".getBytes()).put((byte) '\r').put((byte) '\n'); writeBuf.put("Host: ".getBytes()).put(url.getHost().getBytes()).put((byte) '\r') .put((byte) '\n'); writeBuf.put((byte) '\r').put((byte) '\n'); writeBuf.flip(); }
public void onConnectFailed(String msg) { super.onConnectFailed(msg); status = CONNECT_FAILED; System.exit(0); }
public void onRead() { readBuf.flip(); switch (status) { case REQUEST_SENT: int pos = readBuf.limit(); byte b; while (readBuf.hasRemaining()) { b = readBuf.get(); if (b == '\r') { if (pos - readBuf.position() >= 3) { b = readBuf.get();//\n b = readBuf.get(); if (b == '\r') { b = readBuf.get();//\n readBuf.compact(); status = RESPONSE_HEADER_END; return; } } } } readBuf.position(pos); readBuf.limit(readBuf.capacity()); break; case RESPONSE_HEADER_END: status = START_DATA_TRANSFER; case START_DATA_TRANSFER: System.out.println("start data transfer."); if (out == null) { String file = url.getFile().trim(); if ("".equals(file)) { file = "index.html"; } int idx = file.lastIndexOf('/'); if (idx != -1) { file = file.substring(idx + 1); } if ("".equals(file)) { file = "index.html"; } try { System.out.println("open "+file+" to write."); out = new FileOutputStream(file); fc = out.getChannel(); } catch (FileNotFoundException e) { //this should not happend. e.printStackTrace(); close(); } } try { while (readBuf.hasRemaining()) { fc.write(readBuf); } } catch (IOException e) { e.printStackTrace(); close(); } readBuf.clear(); break; default: System.out.println("status:"+status); } }
public void onClosed(String msg) { super.onClosed(msg); try { if (out != null) { out.close(); fc.close(); } } catch (IOException e) { e.printStackTrace(); } System.exit(0); }
public void onWrite() { status = REQUEST_SENT; System.out.println("request sent."); }
public static void main(String[] args) throws IOException{ URL url = new URL("http://www.sohu.com/"); new SimpleHttpClient(SelectorProcessor.getDefaultInstance(),new InetSocketAddress("www.sohu.com",80),url); } } 
|