001    /*
002     *  This file is part of the Jikes RVM project (http://jikesrvm.org).
003     *
004     *  This file is licensed to You under the Eclipse Public License (EPL);
005     *  You may not use this file except in compliance with the License. You
006     *  may obtain a copy of the License at
007     *
008     *      http://www.opensource.org/licenses/eclipse-1.0.php
009     *
010     *  See the COPYRIGHT.txt file distributed with this work for information
011     *  regarding copyright ownership.
012     */
013    package org.mmtk.utility.deque;
014    
015    import org.mmtk.utility.Constants;
016    
017    import org.mmtk.vm.VM;
018    
019    import org.vmmagic.pragma.*;
020    import org.vmmagic.unboxed.*;
021    
022    /**
023     * This class implements a local (<i>unsynchronized</i>) queue.
024     * A queue is strictly FIFO.<p>
025     *
026     * Each instance stores word-sized values into a local buffer.  When
027     * the buffer is full, or if the <code>flushLocal()</code> method is
028     * called, the buffer enqueued at the tail of a
029     * <code>SharedDeque</code>.
030     *
031     * The implementation is intended to be as efficient as possible, in
032     * time and space, and is the basis for the <code>TraceBuffer</code> used by
033     * heap trace generation. Each instance adds a single field to those inherited
034     * from the SSB: a bump pointer.
035     *
036     * Preconditions: Buffers are always aligned on buffer-size address
037     * boundaries.<p>
038     *
039     * Invariants: Buffers are filled such that tuples (of the specified
040     * arity) are packed to the low end of the buffer.  Thus buffer
041     * underflows will always arise when then cursor is buffer-size aligned.
042     */
043    @Uninterruptible class LocalQueue extends LocalSSB implements Constants {
044    
045      /**
046       * Constructor
047       *
048       * @param queue The shared queue to which this local ssb will append
049       * its buffers (when full or flushed).
050       */
051      LocalQueue(SharedDeque queue) {
052        super(queue);
053      }
054    
055     /****************************************************************************
056       *
057       * Protected instance methods and fields
058       */
059    
060      /** the start of the buffer */
061      @Entrypoint
062      protected Address head;
063    
064      @Override
065      public void resetLocal() {
066        super.resetLocal();
067        head = Deque.HEAD_INITIAL_VALUE;
068      }
069    
070      /**
071       * Check whether there are values in the buffer for a pending dequeue.
072       * If there is not data, grab the first buffer on the shared queue
073       * (after freeing the buffer).
074       *
075       * @param arity The arity of the values stored in this queue: the
076       * buffer must contain enough space for this many words.
077       */
078      @Inline
079      protected final boolean checkDequeue(int arity) {
080        if (bufferOffset(head).isZero()) {
081          return dequeueUnderflow(arity);
082        } else {
083          if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(bufferOffset(head).sGE(Word.fromIntZeroExtend(arity).lsh(LOG_BYTES_IN_ADDRESS).toOffset()));
084          return true;
085        }
086      }
087    
088      /**
089       * Dequeue a value from the buffer.  This is <i>unchecked</i>.  The
090       * caller must first call <code>checkDequeue()</code> to ensure the
091       * buffer has and entry to be removed.
092       *
093       * @return The first entry on the queue.
094       */
095      @Inline
096      protected final Address uncheckedDequeue(){
097        if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(bufferOffset(head).sGE(Offset.fromIntZeroExtend(BYTES_IN_ADDRESS)));
098        head = head.minus(BYTES_IN_ADDRESS);
099        return head.loadAddress();
100      }
101    
102      /**
103       * The head is empty (or null), and the shared queue has no buffers
104       * available.  If the tail has sufficient entries, consume the tail.
105       * Otherwise try wait on the global queue until either all other
106       * clients of the queue reach exhaustion or a buffer becomes
107       * available.
108       *
109       * @param arity The arity of this buffer
110       * @return True if the consumer has eaten all the entries
111       */
112      protected final boolean headStarved(int arity) {
113        if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == queue.getArity());
114    
115        // If the tail has entries...
116        if (tail.NE(tailBufferEnd)) {
117          head = normalizeTail(arity).plus(BYTES_IN_ADDRESS);
118          tail = Deque.TAIL_INITIAL_VALUE;
119          tailBufferEnd = Deque.TAIL_INITIAL_VALUE;
120          // Return that we acquired more entries
121          return false;
122        }
123        // Wait for another entry to materialize...
124        head = queue.dequeueAndWait(arity);
125        // return true if a) there is a head buffer, and b) it is non-empty
126        return (head.EQ(Deque.HEAD_INITIAL_VALUE) || bufferOffset(head).isZero());
127      }
128    
129      /****************************************************************************
130       *
131       * Private instance methods
132       */
133    
134      /**
135       * There are not sufficient entries in the head buffer for a pending
136       * dequeue.  Acquire a new head buffer.  If the shared queue has no
137       * buffers available, consume the tail if necessary.  Return false
138       * if entries cannot be acquired.
139       *
140       * @param arity The arity of this buffer (used for sanity test only).
141       * @return True if there the head buffer has been successfully
142       * replenished.
143       */
144      @NoInline
145      private boolean dequeueUnderflow(int arity) {
146        if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == queue.getArity());
147        do {
148          if (head.NE(Deque.HEAD_INITIAL_VALUE))
149            queue.free(head);
150          head = queue.dequeue(arity);
151        } while (head.NE(Deque.HEAD_INITIAL_VALUE) && bufferOffset(head).isZero());
152    
153        if (head.EQ(Deque.HEAD_INITIAL_VALUE))
154          return !headStarved(arity);
155    
156        return true;
157      }
158    }