Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 568|回复: 0

开发交流:Android开发进阶之NIO非阻塞包(四)

[复制链接]

该用户从未签到

发表于 2011-10-24 10:50:48 | 显示全部楼层 |阅读模式
  在整个DDMS中体现Android NIO主要框架的要数MonitorThread.java这个文件了,有关PC和Android手机同步以及NIO非阻塞编程的精髓可以在下面的文件中充分体现出来。 final class MonitorThread extends Thread {

    private static final int CLIENT_READY = 2;

    private static final int CLIENT_DISCONNECTED = 3;

    private volatile boolean mQuit = false;

    private ArrayList<Client> mClientList; //用一个数组保存客户端信息

    private Selector mSelector;

    private HashMap<Integer, ChunkHandler> mHandlerMap; //这里提示大家,由于在多线程中concurrentHashMap效率比HashMap更安全高效,推荐使用并发库的这个替代版本。

    private ServerSocketChannel mDebugSelectedChan; //一个用于调试的服务器通道

    private int mNewDebugSelectedPort;

    private int mDebugSelectedPort = -1;

    private Client mSelectedClient = null;

    private static MonitorThread mInstance;

    private MonitorThread() {

        super("Monitor");

        mClientList = new ArrayList<Client>();

        mHandlerMap = new HashMap<Integer, ChunkHandler>();

        mNewDebugSelectedPort = DdmPreferences.getSelectedDebugPort();

    }

    static MonitorThread createInstance() {  //创建实例

        return mInstance = new MonitorThread();

    }

    static MonitorThread getInstance() { //获取实例

        return mInstance;

    }

    synchronized void setDebugSelectedPort(int port) throws IllegalStateException { //设置调试端口号

        if (mInstance == null) {

            return;

        }

        if (AndroidDebugBridge.getClientSupport() == false) {

            return;

        }

        if (mDebugSelectedChan != null) {

            Log.d("ddms", "Changing debug-selected port to " + port);

            mNewDebugSelectedPort = port;

            wakeup(); //这里用来唤醒所有的Selector

        } else {

            // we set mNewDebugSelectedPort instead of mDebugSelectedPort so that it's automatically

            mNewDebugSelectedPort = port;

        }

    }

    synchronized void setSelectedClient(Client selectedClient) {

        if (mInstance == null) {

            return;

        }

        if (mSelectedClient != selectedClient) {

            Client oldClient = mSelectedClient;

            mSelectedClient = selectedClient;

            if (oldClient != null) {

                oldClient.update(Client.CHANGE_PORT);

            }

            if (mSelectedClient != null) {

                mSelectedClient.update(Client.CHANGE_PORT);

            }

        }

    }

    Client getSelectedClient() {

        return mSelectedClient;

    }

    boolean getRetryOnBadHandshake() {

        return true; // TODO? make configurable

    }

    Client[] getClients() {

        synchronized (mClientList) {

            return mClientList.toArray(new Client[0]);

        }

    }

    synchronized void registerChunkHandler(int type, ChunkHandler handler) {

        if (mInstance == null) {

            return;

        }

        synchronized (mHandlerMap) {

            if (mHandlerMap.get(type) == null) {

                mHandlerMap.put(type, handler);

            }

        }

    }

    @Override

    public void run() { //本类的主要线程

        Log.d("ddms", "Monitor is up");

        try {

            mSelector = Selector.open();

        } catch (IOException ioe) {

            Log.logAndDisplay(LogLevel.ERROR, "ddms",

                    "Failed to initialize Monitor Thread: " + ioe.getMessage());

            return;

        }

        while (!mQuit) {

            try {

                synchronized (mClientList) {

                }

                try {

                    if (AndroidDebugBridge.getClientSupport()) {

                        if ((mDebugSelectedChan == null ||

                                mNewDebugSelectedPort != mDebugSelectedPort) &&

                                mNewDebugSelectedPort != -1) {

                            if (reopenDebugSelectedPort()) {

                                mDebugSelectedPort = mNewDebugSelectedPort;

                            }

                        }

                    }

                } catch (IOException ioe) {

                    Log.e("ddms",

                            "Failed to reopen debug port for Selected Client to: " + mNewDebugSelectedPort);

                    Log.e("ddms", ioe);

                    mNewDebugSelectedPort = mDebugSelectedPort; // no retry

                }

                int count;

                try {

                    count = mSelector.select();

                } catch (IOException ioe) {

                    ioe.printStackTrace();

                    continue;

                } catch (CancelledKeyException cke) {

                    continue;

                }

                if (count == 0) {

                    continue;

                } //这里代码写的不是很好,提示大家因为这个NIO是DDMS工作在PC端的还不明显,这样轮训的在一个while中,效率不是很高,CPU很容易占用率很高。

                Set<SelectionKey> keys = mSelector.selectedKeys();

                Iterator<SelectionKey> iter = keys.iterator(); //使用迭代器获取这个选择键

                while (iter.hasNext()) {

                    SelectionKey key = iter.next();

                    iter.remove();

                    try {

                        if (key.attachment() instanceof Client) { //判断收到的key的附件是否是Client的实例

                            processClientActivity(key);

                        }

                        else if (key.attachment() instanceof Debugger) { //如果是Debug实例

                            processDebuggerActivity(key);

                        }

                        else if (key.attachment() instanceof MonitorThread) {

                            processDebugSelectedActivity(key);

                        }

                        else {

                            Log.e("ddms", "unknown activity key");

                        }

                    } catch (Exception e) {

                        Log.e("ddms", "Exception during activity from Selector.");

                        Log.e("ddms", e);

                    }

                }

            } catch (Exception e) {

                Log.e("ddms", "Exception MonitorThread.run()");

                Log.e("ddms", e);

            }

        }

    }

    int getDebugSelectedPort() {

        return mDebugSelectedPort;

    }

    private void processClientActivity(SelectionKey key) {

        Client client = (Client)key.attachment();

        try {

            if (key.isReadable() == false || key.isValid() == false) {

                Log.d("ddms", "Invalid key from " + client + ". Dropping client.");

                dropClient(client, true /* notify */);

                return;

            }

            client.read();

            JdwpPacket packet = client.getJdwpPacket();

            while (packet != null) {

                if (packet.isDdmPacket()) {

                    // unsolicited DDM request - hand it off

                    assert !packet.isReply();

                    callHandler(client, packet, null);

                    packet.consume();

                } else if (packet.isReply()

                        && client.isResponseToUs(packet.getId()) != null) {

                    // reply to earlier DDM request

                    ChunkHandler handler = client

                            .isResponseToUs(packet.getId());

                    if (packet.isError())

                        client.packetFailed(packet);

                    else if (packet.isEmpty())

                        Log.d("ddms", "Got empty reply for 0x"

                                + Integer.toHexString(packet.getId())

                                + " from " + client);

                    else

                        callHandler(client, packet, handler);

                    packet.consume();

                    client.removeRequestId(packet.getId());

                } else {

                    Log.v("ddms", "Forwarding client "

                            + (packet.isReply() ? "reply" : "event") + " 0x"

                            + Integer.toHexString(packet.getId()) + " to "

                            + client.getDebugger());

                    client.forwardPacketToDebugger(packet);

                }

                packet = client.getJdwpPacket();

            }

        } catch (CancelledKeyException e) { //注意正确处理这个异常

            dropClient(client, true /* notify */);

        } catch (IOException ex) {

            dropClient(client, true /* notify */);

        } catch (Exception ex) {

            Log.e("ddms", ex);

            dropClient(client, true /* notify */);

            if (ex instanceof BufferOverflowException) { //可能存在缓冲区异常

                Log.w("ddms",

                        "Client data packet exceeded maximum buffer size "

                                + client);

            } else {

                // don't know what this is, display it

                Log.e("ddms", ex);

            }

        }

    }

    private void callHandler(Client client, JdwpPacket packet,

            ChunkHandler handler) {

        // on first DDM packet received, broadcast a "ready" message

        if (!client.ddmSeen())

            broadcast(CLIENT_READY, client);

        ByteBuffer buf = packet.getPayload();

        int type, length;

        boolean reply = true;

        type = buf.getInt();

        length = buf.getInt();

        if (handler == null) {

            // not a reply, figure out who wants it

            synchronized (mHandlerMap) {

                handler = mHandlerMap.get(type);

                reply = false;

            }

        }

        if (handler == null) {

            Log.w("ddms", "Received unsupported chunk type "

                    + ChunkHandler.name(type) + " (len=" + length + ")");

        } else {

            Log.d("ddms", "Calling handler for " + ChunkHandler.name(type)

                    + " [" + handler + "] (len=" + length + ")");

            ByteBuffer ibuf = buf.slice();

            ByteBuffer roBuf = ibuf.asReadOnlyBuffer(); // enforce R/O

            roBuf.order(ChunkHandler.CHUNK_ORDER);

            synchronized (mClientList) {

                handler.handleChunk(client, type, roBuf, reply, packet.getId());

            }

        }

    }

    synchronized void dropClient(Client client, boolean notify) {

        if (mInstance == null) {

            return;

        }

        synchronized (mClientList) {

            if (mClientList.remove(client) == false) {

                return;

            }

        }

        client.close(notify);

        broadcast(CLIENT_DISCONNECTED, client);

        /*

         * http://forum.java.sun.com/thread ... 715&start=0

         * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5073504

         */

        wakeup();

    }

    /*

     * Process activity from one of the debugger sockets. This could be a new

     * connection or a data packet.

     */

    private void processDebuggerActivity(SelectionKey key) {

        Debugger dbg = (Debugger)key.attachment();

        try {

            if (key.isAcceptable()) { //处理Server响应这个事件

                try {

                    acceptNewDebugger(dbg, null);

                } catch (IOException ioe) {

                    Log.w("ddms", "debugger accept() failed");

                    ioe.printStackTrace();

                }

            } else if (key.isReadable()) { //如果是收到的数据,则可读取

                processDebuggerData(key);

            } else {

                Log.d("ddm-debugger", "key in unknown state");

            }

        } catch (CancelledKeyException cke) { //记住,NIO处理这个异常,很多入门的开发者很容易忘记

            // key has been cancelled we can ignore that.

        }

    }

     private void acceptNewDebugger(Debugger dbg, ServerSocketChannel acceptChan) //这里用到了阻塞方式

            throws IOException {

        synchronized (mClientList) {

            SocketChannel chan;

            if (acceptChan == null)

                chan = dbg.accept();

            else

                chan = dbg.accept(acceptChan);

            if (chan != null) {

                chan.socket().setTcpNoDelay(true);

                wakeup();

                try {

                    chan.register(mSelector, SelectionKey.OP_READ, dbg);

                } catch (IOException ioe) {

                    // failed, drop the connection

                    dbg.closeData();

                    throw ioe;

                } catch (RuntimeException re) {

                    // failed, drop the connection

                    dbg.closeData();

                    throw re;

                }

            } else {

                Log.w("ddms", "ignoring duplicate debugger");

            }

        }

    }

    private void processDebuggerData(SelectionKey key) {

        Debugger dbg = (Debugger)key.attachment();

        try {

            dbg.read();

            JdwpPacket packet = dbg.getJdwpPacket();

            while (packet != null) {

                Log.v("ddms", "Forwarding dbg req 0x"

                        + Integer.toHexString(packet.getId()) + " to "

                        + dbg.getClient());

                dbg.forwardPacketToClient(packet);

                packet = dbg.getJdwpPacket();

            }

        } catch (IOException ioe) {

            Log.d("ddms", "Closing connection to debugger " + dbg);

            dbg.closeData();

            Client client = dbg.getClient();

            if (client.isDdmAware()) {

                   Log.d("ddms", " (recycling client connection as well)");

                    client.getDeviceImpl().getMonitor().addClientToDropAndReopen(client,

                        IDebugPortProvider.NO_STATIC_PORT);

            } else {

                Log.d("ddms", " (recycling client connection as well)");

                // we should drop the client, but also attempt to reopen it.

                // This is done by the DeviceMonitor.

                client.getDeviceImpl().getMonitor().addClientToDropAndReopen(client,

                        IDebugPortProvider.NO_STATIC_PORT);

            }

        }

    }

    private void wakeup() {

        mSelector.wakeup();

    }

    synchronized void quit() {

        mQuit = true;

        wakeup();

        Log.d("ddms", "Waiting for Monitor thread");

        try {

            this.join();

            // since we're quitting, lets drop all the client and disconnect

            // the DebugSelectedPort

            synchronized (mClientList) {

                for (Client c : mClientList) {

                    c.close(false /* notify */);

                    broadcast(CLIENT_DISCONNECTED, c);

                }

                mClientList.clear();

            }

            if (mDebugSelectedChan != null) {

                mDebugSelectedChan.close();

                mDebugSelectedChan.socket().close();

                mDebugSelectedChan = null;

            }

            mSelector.close();

        } catch (InterruptedException ie) {

            ie.printStackTrace();

        } catch (IOException e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        }

        mInstance = null;

    }

    synchronized void addClient(Client client) {

        if (mInstance == null) {

            return;

        }

        Log.d("ddms", "Adding new client " + client);

        synchronized (mClientList) {

            mClientList.add(client);

            try {

                wakeup();

                client.register(mSelector);

                Debugger dbg = client.getDebugger();

                if (dbg != null) {

                    dbg.registerListener(mSelector);

                }

            } catch (IOException ioe) {

                // not really expecting this to happen

                ioe.printStackTrace();

            }

        }

    }

    /*

     * Broadcast an event to all message handlers.

     */

    private void broadcast(int event, Client client) {

        Log.d("ddms", "broadcast " + event + ": " + client);

        /*

         * The handler objects appear once in mHandlerMap for each message they

         * handle. We want to notify them once each, so we convert the HashMap

         * to a HashSet before we iterate.

         */

        HashSet<ChunkHandler> set;

        synchronized (mHandlerMap) {

            Collection<ChunkHandler> values = mHandlerMap.values();

            set = new HashSet<ChunkHandler>(values);

        }

        Iterator<ChunkHandler> iter = set.iterator();

        while (iter.hasNext()) {

            ChunkHandler handler = iter.next();

            switch (event) {

                case CLIENT_READY:

                    try {

                        handler.clientReady(client);

                    } catch (IOException ioe) {

                        // Something failed with the client. It should

                        // fall out of the list the next time we try to

                        // do something with it, so we discard the

                        // exception here and assume cleanup will happen

                        // later. May need to propagate farther. The

                        // trouble is that not all values for "event" may

                        // actually throw an exception.

                        Log.w("ddms",

                                "Got exception while broadcasting 'ready'");

                        return;

                    }

                    break;

                case CLIENT_DISCONNECTED:

                    handler.clientDisconnected(client);

                    break;

                default:

                    throw new UnsupportedOperationException();

            }

        }

    }

    /**

     * Opens (or reopens) the "debug selected" port and listen for connections.

     * @return true if the port was opened successfully.

     * @throws IOException

     */

    private boolean reopenDebugSelectedPort() throws IOException {

        Log.d("ddms", "reopen debug-selected port: " + mNewDebugSelectedPort);

        if (mDebugSelectedChan != null) {

            mDebugSelectedChan.close();

        }

        mDebugSelectedChan = ServerSocketChannel.open();

        mDebugSelectedChan.configureBlocking(false); // required for Selector

        InetSocketAddress addr = new InetSocketAddress(

                InetAddress.getByName("localhost"), //$NON-NLS-1$

                mNewDebugSelectedPort);

        mDebugSelectedChan.socket().setReuseAddress(true); // enable SO_REUSEADDR

        try {

            mDebugSelectedChan.socket().bind(addr);

            if (mSelectedClient != null) {

                mSelectedClient.update(Client.CHANGE_PORT);

            }

            mDebugSelectedChan.register(mSelector, SelectionKey.OP_ACCEPT, this);

            return true;

        } catch (java.net.BindException e) {

            displayDebugSelectedBindError(mNewDebugSelectedPort);

            // do not attempt to reopen it.

            mDebugSelectedChan = null;

            mNewDebugSelectedPort = -1;

            return false;

        }

    }

    /*

     * We have some activity on the "debug selected" port. Handle it.

     */

    private void processDebugSelectedActivity(SelectionKey key) {

        assert key.isAcceptable();

        ServerSocketChannel acceptChan = (ServerSocketChannel)key.channel();

        /*

         * Find the debugger associated with the currently-selected client.

         */

        if (mSelectedClient != null) {

            Debugger dbg = mSelectedClient.getDebugger();

            if (dbg != null) {

                Log.d("ddms", "Accepting connection on 'debug selected' port");

                try {

                    acceptNewDebugger(dbg, acceptChan);

                } catch (IOException ioe) {

                    // client should be gone, keep going

                }

                return;

            }

        }

        Log.w("ddms",

                "Connection on 'debug selected' port, but none selected");

        try {

            SocketChannel chan = acceptChan.accept();

            chan.close();

        } catch (IOException ioe) {

            // not expected; client should be gone, keep going

        } catch (NotYetBoundException e) {

            displayDebugSelectedBindError(mDebugSelectedPort);

        }

    }

    private void displayDebugSelectedBindError(int port) {

        String message = String.format(

                "Could not open Selected VM debug port (%1$d). Make sure you do not have another instance of DDMS or of the eclipse plugin running. If it's being used by something else, choose a new port number in the preferences.",

                port);

        Log.logAndDisplay(LogLevel.ERROR, "ddms", message);

    }

}
复制代码


Android开发进阶之NIO非阻塞包(一)
Android开发进阶之NIO非阻塞包(二)
Android开发进阶之NIO非阻塞包(三)
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

GMT+8, 2025-1-11 07:49 , Processed in 0.407182 second(s), 46 queries .

Powered by Discuz! X3.4

© 2001-2017 Comsenz Inc.

快速回复 返回顶部 返回列表