001 /* 002 * This file is part of the Jikes RVM project (http://jikesrvm.org). 003 * 004 * This file is licensed to You under the Eclipse Public License (EPL); 005 * You may not use this file except in compliance with the License. You 006 * may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/eclipse-1.0.php 009 * 010 * See the COPYRIGHT.txt file distributed with this work for information 011 * regarding copyright ownership. 012 */ 013 package org.mmtk.utility.deque; 014 015 import org.mmtk.utility.Constants; 016 017 import org.mmtk.vm.VM; 018 019 import org.vmmagic.pragma.*; 020 import org.vmmagic.unboxed.*; 021 022 /** 023 * This class implements a local (<i>unsynchronized</i>) queue. 024 * A queue is strictly FIFO.<p> 025 * 026 * Each instance stores word-sized values into a local buffer. When 027 * the buffer is full, or if the <code>flushLocal()</code> method is 028 * called, the buffer enqueued at the tail of a 029 * <code>SharedDeque</code>. 030 * 031 * The implementation is intended to be as efficient as possible, in 032 * time and space, and is the basis for the <code>TraceBuffer</code> used by 033 * heap trace generation. Each instance adds a single field to those inherited 034 * from the SSB: a bump pointer. 035 * 036 * Preconditions: Buffers are always aligned on buffer-size address 037 * boundaries.<p> 038 * 039 * Invariants: Buffers are filled such that tuples (of the specified 040 * arity) are packed to the low end of the buffer. Thus buffer 041 * underflows will always arise when then cursor is buffer-size aligned. 042 */ 043 @Uninterruptible class LocalQueue extends LocalSSB implements Constants { 044 045 /** 046 * Constructor 047 * 048 * @param queue The shared queue to which this local ssb will append 049 * its buffers (when full or flushed). 050 */ 051 LocalQueue(SharedDeque queue) { 052 super(queue); 053 } 054 055 /**************************************************************************** 056 * 057 * Protected instance methods and fields 058 */ 059 060 /** the start of the buffer */ 061 @Entrypoint 062 protected Address head; 063 064 @Override 065 public void resetLocal() { 066 super.resetLocal(); 067 head = Deque.HEAD_INITIAL_VALUE; 068 } 069 070 /** 071 * Check whether there are values in the buffer for a pending dequeue. 072 * If there is not data, grab the first buffer on the shared queue 073 * (after freeing the buffer). 074 * 075 * @param arity The arity of the values stored in this queue: the 076 * buffer must contain enough space for this many words. 077 */ 078 @Inline 079 protected final boolean checkDequeue(int arity) { 080 if (bufferOffset(head).isZero()) { 081 return dequeueUnderflow(arity); 082 } else { 083 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(bufferOffset(head).sGE(Word.fromIntZeroExtend(arity).lsh(LOG_BYTES_IN_ADDRESS).toOffset())); 084 return true; 085 } 086 } 087 088 /** 089 * Dequeue a value from the buffer. This is <i>unchecked</i>. The 090 * caller must first call <code>checkDequeue()</code> to ensure the 091 * buffer has and entry to be removed. 092 * 093 * @return The first entry on the queue. 094 */ 095 @Inline 096 protected final Address uncheckedDequeue(){ 097 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(bufferOffset(head).sGE(Offset.fromIntZeroExtend(BYTES_IN_ADDRESS))); 098 head = head.minus(BYTES_IN_ADDRESS); 099 return head.loadAddress(); 100 } 101 102 /** 103 * The head is empty (or null), and the shared queue has no buffers 104 * available. If the tail has sufficient entries, consume the tail. 105 * Otherwise try wait on the global queue until either all other 106 * clients of the queue reach exhaustion or a buffer becomes 107 * available. 108 * 109 * @param arity The arity of this buffer 110 * @return True if the consumer has eaten all the entries 111 */ 112 protected final boolean headStarved(int arity) { 113 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == queue.getArity()); 114 115 // If the tail has entries... 116 if (tail.NE(tailBufferEnd)) { 117 head = normalizeTail(arity).plus(BYTES_IN_ADDRESS); 118 tail = Deque.TAIL_INITIAL_VALUE; 119 tailBufferEnd = Deque.TAIL_INITIAL_VALUE; 120 // Return that we acquired more entries 121 return false; 122 } 123 // Wait for another entry to materialize... 124 head = queue.dequeueAndWait(arity); 125 // return true if a) there is a head buffer, and b) it is non-empty 126 return (head.EQ(Deque.HEAD_INITIAL_VALUE) || bufferOffset(head).isZero()); 127 } 128 129 /**************************************************************************** 130 * 131 * Private instance methods 132 */ 133 134 /** 135 * There are not sufficient entries in the head buffer for a pending 136 * dequeue. Acquire a new head buffer. If the shared queue has no 137 * buffers available, consume the tail if necessary. Return false 138 * if entries cannot be acquired. 139 * 140 * @param arity The arity of this buffer (used for sanity test only). 141 * @return True if there the head buffer has been successfully 142 * replenished. 143 */ 144 @NoInline 145 private boolean dequeueUnderflow(int arity) { 146 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == queue.getArity()); 147 do { 148 if (head.NE(Deque.HEAD_INITIAL_VALUE)) 149 queue.free(head); 150 head = queue.dequeue(arity); 151 } while (head.NE(Deque.HEAD_INITIAL_VALUE) && bufferOffset(head).isZero()); 152 153 if (head.EQ(Deque.HEAD_INITIAL_VALUE)) 154 return !headStarved(arity); 155 156 return true; 157 } 158 }