001    /*
002     *  This file is part of the Jikes RVM project (http://jikesrvm.org).
003     *
004     *  This file is licensed to You under the Eclipse Public License (EPL);
005     *  You may not use this file except in compliance with the License. You
006     *  may obtain a copy of the License at
007     *
008     *      http://www.opensource.org/licenses/eclipse-1.0.php
009     *
010     *  See the COPYRIGHT.txt file distributed with this work for information
011     *  regarding copyright ownership.
012     */
013    package org.jikesrvm.adaptive.util;
014    
015    import org.jikesrvm.VM;
016    import org.jikesrvm.util.PriorityQueueRVM;
017    
018    /**
019     * This class extends PriorityQueueRVM to safely
020     * support multiple producers/consumers where
021     * the consumers are blocked if no objects are available
022     * to consume.
023     */
024    public class BlockingPriorityQueue extends PriorityQueueRVM {
025    
026      /**
027       * Used to notify consumers when about to wait and when notified
028       * Default implementation does nothing, but can be overriden as needed by client.
029       */
030      public static class CallBack {
031        public void aboutToWait() {}
032    
033        public void doneWaiting() {}
034      }
035    
036      CallBack callback;
037    
038      /**
039       * @param _cb the callback object
040       */
041      public BlockingPriorityQueue(CallBack _cb) {
042        super();
043        callback = _cb;
044      }
045    
046      public BlockingPriorityQueue() {
047        this(new CallBack());
048      }
049    
050      /**
051       * Insert the object passed with the priority value passed.<p>
052       *
053       * Notify any sleeping consumer threads that an object
054       * is available for consumption.
055       *
056       * @param _priority  the priority to
057       * @param _data the object to insert
058       */
059      @Override
060      public final synchronized void insert(double _priority, Object _data) {
061        super.insert(_priority, _data);
062        try {
063          notifyAll();
064        } catch (Exception e) {
065          // TODO: should we exit or something more dramatic?
066          VM.sysWrite("Exception occurred while notifying that element was inserted!\n");
067        }
068      }
069    
070      /**
071       * Remove and return the front (minimum) object.  If the queue is currently
072       * empty, then block until an object is available to be dequeued.
073       *
074       * @return the front (minimum) object.
075       */
076      @Override
077      public final synchronized Object deleteMin() {
078        // While the queue is empty, sleep until notified that an object has been enqueued.
079        while (isEmpty()) {
080          try {
081            callback.aboutToWait();
082            wait();
083            callback.doneWaiting();
084          } catch (InterruptedException e) {
085            // TODO: should we exit or something more dramatic?
086            VM.sysWrite("Interrupted Exception occurred!\n");
087          }
088        }
089    
090        // When we get to here, we know the queue is non-empty, so dequeue an object and return it.
091        return super.deleteMin();
092      }
093    }