即日起在codingBlog上分享您的技术经验即可获得积分,积分可兑换现金哦。

NIO编程 TimeServer && TimeClient

编程语言 qq_27061805 21℃ 0评论

本程序是《Netty权威指南》中 NIO编程中的一个小程序分为 Server 和Client 两部分

1,TimeServer的启动程序


/**
 * Created by ju on 2017-06-06.
 */
// nio 时间服务器
public class TimeServer {
    public static void main(String[] args) {
        int port = 9999;
        if(args !=null && args.length > 0){
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }
        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
        new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
    }
}

2,时间服务NIO 的具体实现MultiplexerTimeServer

/**
 * Created by ju on 2017-06-06.
 */
public class MultiplexerTimeServer implements  Runnable {

    private Selector selector;
    private ServerSocketChannel servChannel;
    private volatile  boolean stop;

    public MultiplexerTimeServer(int port) {
        try {
            selector = Selector.open();
            servChannel = ServerSocketChannel.open();
            servChannel.configureBlocking(false);
            servChannel.socket().bind(new InetSocketAddress(port));
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("Time Server is start in port "+ port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
    public void stop(){
        this.stop = true;
    }
    @Override
    public void run() {
        while(!stop) {
            try {
                selector.select(1000);
                Set selectedKeys = selector.selectedKeys();
                Iterator it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {

                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }


                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if(selector != null ){
            try {
                System.out.println("selector not null");
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    private void handleInput(SelectionKey key) throws IOException {
        if(key.isValid()){  // 判断是否是个有效的句柄
            if(key.isAcceptable()){
                //接受请求
                ServerSocketChannel ssc =(ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                //添加新连接到 selector
                sc.register(selector,SelectionKey.OP_READ);
            }
            if (key.isReadable()){
                //读取数据
                SocketChannel sc = (SocketChannel) key.channel();

                ByteBuffer readBuffer = ByteBuffer.allocate(1024);//为ByteBuffer分配空间大小(位于 jvm)
               // ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);//为ByteBuffer分配空间大小(基于操作系统)
                int readBytes = sc.read(readBuffer);
                if(readBytes > 0){
                    readBuffer.flip();//将缓存字节数组的指针设置为数组的开始序列即数组下标0
                    byte[] bytes = new byte[readBuffer.remaining()];//返回剩余的可用长度,此长度为实际读取的数据长度,最大自然是底层数组的长度
                    readBuffer.get(bytes);
                    String body = new String(bytes,"UTF-8");
                    System.out.println("The time server receive order : "+body);
                    String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString(): "BAD ORDER";
                    doWrite(sc,currentTime);
                }else if(readBytes < 0){
                    //对端链路关闭
                    key.cancel();
                    sc.close();
                }else{
                    //读取到 0 字节,忽略
                }
            }
        }
    }
    private void doWrite(SocketChannel channel,String response) throws IOException {
        if(response !=null && response.trim().length() >0){
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();//将position 设置为0
            channel.write(writeBuffer);
            writeBuffer.clear();
        }
    }
}

3,TimeClient 客户端启动程序

/**
 * Created by ju on 2017-06-07.
 */
public class TimeClient {
    public static void main(String[] args) {
        int port = 9999;
        if(args!=null && args.length>0){
            port = Integer.valueOf(args[0]);
        }
        new Thread(new TimeClientHandler("127.0.0.1", port)).start();
    }
}

4,客户端NIO的具体实现程序


/**
 * Created by ju on 2017-06-07.
 */
public class TimeClientHandler implements Runnable {
    private int port;
    private String host;
    private Selector selector;
    private SocketChannel channel;
    private volatile boolean stop;

    public TimeClientHandler(String host,int port){
        this.host = host ==null?"127.0.0.1":host;
        this.port = port;
        try {
            selector = Selector.open();
            channel = SocketChannel.open();
            channel.configureBlocking(false);

        } catch (IOException e) {
            e.printStackTrace();
        }

    }


    @Override
    public void run() {
        try {
            doConnect();
        } catch (IOException e1) {
            e1.printStackTrace();
            System.exit(1);
        }
        while(!stop){
            try {
                //selector每一秒被唤醒一次
                selector.select(1000);
                //还回就绪状态的chanel的selectedKeys
                Set selectedKeys = selector.selectedKeys();
                Iterator iterator = selectedKeys.iterator();
                SelectionKey key = null;
                while(iterator.hasNext()){
                    key = iterator.next();
                    iterator.remove();
                    try{
                        handleInput(key);
                    }catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        if(selector!=null){
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            selector =null;
        }
    }

    public void handleInput(SelectionKey key) throws IOException{
        if(key.isValid()){
            SocketChannel sc = (SocketChannel) key.channel();
            if (key.isConnectable()) {
                if (sc.finishConnect()) {
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                } else{
                    System.exit(1);// 连接失败,进程退出
                }
            }
            if(key.isReadable()){
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("Now is : " + body);
                    this.stop = true;
                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else
                    ; // 读到0字节,忽略
            }

        }
    }


    private void doConnect() throws IOException{
        if(channel.connect(new InetSocketAddress(host, port))){
            channel.register(selector, SelectionKey.OP_READ);
            doWrite(channel);
        }else{
            channel.register(selector, SelectionKey.OP_CONNECT);
        }
    }

    private void doWrite(SocketChannel schannel) throws IOException {
        byte[] bytes = "QUERY TIME ORDER".getBytes();
        ByteBuffer buff = ByteBuffer.allocate(bytes.length);
        buff.put(bytes);
        buff.flip();
        schannel.write(buff);
        //判断是否发送完毕
        if(!buff.hasRemaining()){
            System.out.println("SEND SUCCESS!");
        }

    }



转载请注明:CodingBlog » NIO编程 TimeServer && TimeClient

喜欢 (0)or分享 (0)
发表我的评论
取消评论

*

表情