001    /*
002     *  This file is part of the Jikes RVM project (http://jikesrvm.org).
003     *
004     *  This file is licensed to You under the Eclipse Public License (EPL);
005     *  You may not use this file except in compliance with the License. You
006     *  may obtain a copy of the License at
007     *
008     *      http://www.opensource.org/licenses/eclipse-1.0.php
009     *
010     *  See the COPYRIGHT.txt file distributed with this work for information
011     *  regarding copyright ownership.
012     */
013    package org.mmtk.utility.deque;
014    
015    import org.mmtk.policy.RawPageSpace;
016    import org.mmtk.policy.Space;
017    import org.mmtk.utility.Constants;
018    import org.mmtk.utility.Log;
019    import org.mmtk.vm.Lock;
020    import org.mmtk.vm.VM;
021    import org.vmmagic.pragma.Entrypoint;
022    import org.vmmagic.pragma.Inline;
023    import org.vmmagic.pragma.Uninterruptible;
024    import org.vmmagic.unboxed.Address;
025    import org.vmmagic.unboxed.Offset;
026    
027    /**
028     * This supports <i>unsynchronized</i> enqueuing and dequeuing of buffers
029     * for shared use.  The data can be added to and removed from either end
030     * of the deque.
031     */
032    @Uninterruptible
033    public class SharedDeque extends Deque implements Constants {
034      private static final boolean DISABLE_WAITING = true;
035      private static final Offset NEXT_OFFSET = Offset.zero();
036      private static final Offset PREV_OFFSET = Offset.fromIntSignExtend(BYTES_IN_ADDRESS);
037    
038      private static final boolean TRACE = false;
039      private static final boolean TRACE_DETAIL = false;
040      private static final boolean TRACE_BLOCKERS = false;
041    
042      /****************************************************************************
043       *
044       * Public instance methods
045       */
046    
047      /**
048       * Constructor
049       */
050      public SharedDeque(String name, RawPageSpace rps, int arity) {
051        this.rps = rps;
052        this.arity = arity;
053        this.name = name;
054        lock = VM.newLock("SharedDeque");
055        clearCompletionFlag();
056        head = HEAD_INITIAL_VALUE;
057        tail = TAIL_INITIAL_VALUE;
058      }
059    
060      /** Get the arity (words per entry) of this queue */
061      @Inline
062      final int getArity() { return arity; }
063    
064      /**
065       * Enqueue a block on the head or tail of the shared queue
066       *
067       * @param buf
068       * @param arity
069       * @param toTail
070       */
071      final void enqueue(Address buf, int arity, boolean toTail) {
072        if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity);
073        lock();
074        if (toTail) {
075          // Add to the tail of the queue
076          setNext(buf, Address.zero());
077          if (tail.EQ(TAIL_INITIAL_VALUE))
078            head = buf;
079          else
080            setNext(tail, buf);
081          setPrev(buf, tail);
082          tail = buf;
083        } else {
084          // Add to the head of the queue
085          setPrev(buf, Address.zero());
086          if (head.EQ(HEAD_INITIAL_VALUE))
087            tail = buf;
088          else
089            setPrev(head, buf);
090          setNext(buf, head);
091          head = buf;
092        }
093        bufsenqueued++;
094        if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(checkDequeLength(bufsenqueued));
095        unlock();
096      }
097    
098      public final void clearDeque(int arity) {
099        Address buf = dequeue(arity);
100        while (!buf.isZero()) {
101          free(bufferStart(buf));
102          buf = dequeue(arity);
103        }
104        setCompletionFlag();
105      }
106    
107      @Inline
108      final Address dequeue(int arity) {
109        return dequeue(arity, false);
110      }
111    
112      final Address dequeue(int arity, boolean fromTail) {
113        if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity);
114        return dequeue(false, fromTail);
115      }
116    
117      @Inline
118      final Address dequeueAndWait(int arity) {
119        return dequeueAndWait(arity, false);
120      }
121    
122      final Address dequeueAndWait(int arity, boolean fromTail) {
123        if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity);
124        Address buf = dequeue(false, fromTail);
125        if (buf.isZero() && (!complete())) {
126          buf = dequeue(true, fromTail);  // Wait inside dequeue
127        }
128        return buf;
129      }
130    
131      /**
132       * Prepare for parallel processing. All active GC threads will
133       * participate, and pop operations will block until all work
134       * is complete.
135       */
136      public final void prepare() {
137        if (DISABLE_WAITING) {
138          prepareNonBlocking();
139        } else {
140          /* This should be the normal mode of operation once performance is fixed */
141          prepare(VM.activePlan.collector().parallelWorkerCount());
142        }
143      }
144    
145      /**
146       * Prepare for processing where pop operations on the deques
147       * will never block.
148       */
149      public final void prepareNonBlocking() {
150        prepare(1);
151      }
152    
153      /**
154       * Prepare for parallel processing where a specific number
155       * of threads take part.
156       *
157       * @param consumers # threads taking part.
158       */
159      private void prepare(int consumers) {
160        if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(numConsumersWaiting == 0);
161        setNumConsumers(consumers);
162        clearCompletionFlag();
163      }
164    
165      public final void reset() {
166        if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(numConsumersWaiting == 0);
167        clearCompletionFlag();
168        setNumConsumersWaiting(0);
169        assertExhausted();
170      }
171    
172      public final void assertExhausted() {
173        if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(head.isZero() && tail.isZero());
174      }
175    
176      @Inline
177      final Address alloc() {
178        Address rtn = rps.acquire(PAGES_PER_BUFFER);
179        if (rtn.isZero()) {
180          Space.printUsageMB();
181          VM.assertions.fail("Failed to allocate space for queue.  Is metadata virtual memory exhausted?");
182        }
183        if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(rtn.EQ(bufferStart(rtn)));
184        return rtn;
185      }
186    
187      @Inline
188      final void free(Address buf) {
189        if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(buf.EQ(bufferStart(buf)) && !buf.isZero());
190        rps.release(buf);
191      }
192    
193      @Inline
194      public final int enqueuedPages() {
195        return bufsenqueued * PAGES_PER_BUFFER;
196      }
197    
198      /****************************************************************************
199       *
200       * Private instance methods and fields
201       */
202    
203      /** The name of this shared deque - for diagnostics */
204      private final String name;
205    
206      /** Raw page space from which to allocate */
207      private RawPageSpace rps;
208    
209      /** Number of words per entry */
210      private final int arity;
211    
212      /** Completion flag - set when all consumers have arrived at the barrier */
213      @Entrypoint
214      private volatile int completionFlag;
215    
216      /** # active threads - processing is complete when # waiting == this */
217      @Entrypoint
218      private volatile int numConsumers;
219    
220      /** # threads waiting */
221      @Entrypoint
222      private volatile int numConsumersWaiting;
223    
224      /** Head of the shared deque */
225      @Entrypoint
226      protected volatile Address head;
227    
228      /** Tail of the shared deque */
229      @Entrypoint
230      protected volatile Address tail;
231      @Entrypoint
232      private volatile int bufsenqueued;
233      private Lock lock;
234    
235      private static final long WARN_PERIOD = (long)(2*1E9);
236      private static final long TIMEOUT_PERIOD = 10 * WARN_PERIOD;
237    
238      /**
239       * Dequeue a block from the shared pool.  If 'waiting' is true, and the
240       * queue is empty, wait for either a new block to show up or all the
241       * other consumers to join us.
242       *
243       * @param waiting
244       * @param fromTail
245       * @return the Address of the block
246       */
247      private Address dequeue(boolean waiting, boolean fromTail) {
248        lock();
249        Address rtn = ((fromTail) ? tail : head);
250        if (rtn.isZero()) {
251          if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(tail.isZero() && head.isZero());
252          // no buffers available
253          if (waiting) {
254            int ordinal = TRACE ? 0 : VM.activePlan.collector().getId();
255            setNumConsumersWaiting(numConsumersWaiting + 1);
256            while (rtn.isZero()) {
257              if (numConsumersWaiting == numConsumers)
258                setCompletionFlag();
259              if (TRACE) {
260                Log.write("-- ("); Log.write(ordinal);
261                Log.write(") joining wait queue of SharedDeque(");
262                Log.write(name); Log.write(") ");
263                Log.write(numConsumersWaiting); Log.write("/");
264                Log.write(numConsumers);
265                Log.write(" consumers waiting");
266                if (complete()) Log.write(" WAIT COMPLETE");
267                Log.writeln();
268                if (TRACE_BLOCKERS)
269                  VM.assertions.dumpStack();
270              }
271              unlock();
272              // Spin and wait
273              spinWait(fromTail);
274    
275              if (complete()) {
276                if (TRACE) {
277                  Log.write("-- ("); Log.write(ordinal); Log.writeln(") EXITING");
278                }
279                lock();
280                setNumConsumersWaiting(numConsumersWaiting - 1);
281                unlock();
282                return Address.zero();
283              }
284              lock();
285              // Re-get the list head/tail while holding the lock
286              rtn = ((fromTail) ? tail : head);
287            }
288            setNumConsumersWaiting(numConsumersWaiting - 1);
289            if (TRACE) {
290              Log.write("-- ("); Log.write(ordinal); Log.write(") resuming work ");
291              Log.write(" n="); Log.writeln(numConsumersWaiting);
292            }
293          } else {
294            unlock();
295            return Address.zero();
296          }
297        }
298        if (fromTail) {
299          // dequeue the tail buffer
300          setTail(getPrev(tail));
301          if (head.EQ(rtn)) {
302            setHead(Address.zero());
303            if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(tail.isZero());
304          } else {
305            setNext(tail, Address.zero());
306          }
307        } else {
308          // dequeue the head buffer
309          setHead(getNext(head));
310          if (tail.EQ(rtn)) {
311            setTail(Address.zero());
312            if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(head.isZero());
313          } else {
314            setPrev(head, Address.zero());
315          }
316        }
317        bufsenqueued--;
318        unlock();
319        return rtn;
320      }
321    
322      /**
323       * Spinwait for GC work to arrive
324       *
325       * @param fromTail Check the head or the tail ?
326       */
327      private void spinWait(boolean fromTail) {
328        long startNano = 0;
329        long lastElapsedNano = 0;
330        while (true) {
331          long startCycles = VM.statistics.cycles();
332          long endCycles = startCycles + ((long) 1e9); // a few hundred milliseconds more or less.
333          long nowCycles;
334          do {
335            VM.memory.isync();
336            Address rtn = ((fromTail) ? tail : head);
337            if (!rtn.isZero() || complete()) return;
338            nowCycles = VM.statistics.cycles();
339          } while (startCycles < nowCycles && nowCycles < endCycles); /* check against both ends to guard against CPU migration */
340    
341          /*
342           * According to the cycle counter, we've been spinning for a while.
343           * Time to check nanoTime and see if we should print a warning and/or fail.
344           * We lock the deque while doing this to avoid interleaved messages from multiple threads.
345           */
346          lock();
347          if (startNano == 0) {
348            startNano = VM.statistics.nanoTime();
349          } else {
350            long nowNano = VM.statistics.nanoTime();
351            long elapsedNano = nowNano - startNano;
352            if (elapsedNano - lastElapsedNano > WARN_PERIOD) {
353              Log.write("GC Warning: SharedDeque("); Log.write(name);
354              Log.write(") wait has reached "); Log.write(VM.statistics.nanosToSecs(elapsedNano));
355              Log.write(", "); Log.write(numConsumersWaiting); Log.write("/");
356              Log.write(numConsumers); Log.writeln(" threads waiting");
357              lastElapsedNano = elapsedNano;
358            }
359            if (elapsedNano > TIMEOUT_PERIOD) {
360              unlock();   // To allow other GC threads to die in turn
361              VM.assertions.fail("GC Error: SharedDeque Timeout");
362            }
363          }
364          unlock();
365        }
366      }
367    
368      /**
369       * Set the "next" pointer in a buffer forming the linked buffer chain.
370       *
371       * @param buf The buffer whose next field is to be set.
372       * @param next The reference to which next should point.
373       */
374      private static void setNext(Address buf, Address next) {
375        buf.store(next, NEXT_OFFSET);
376      }
377    
378      /**
379       * Get the "next" pointer in a buffer forming the linked buffer chain.
380       *
381       * @param buf The buffer whose next field is to be returned.
382       * @return The next field for this buffer.
383       */
384      protected final Address getNext(Address buf) {
385        return buf.loadAddress(NEXT_OFFSET);
386      }
387    
388      /**
389       * Set the "prev" pointer in a buffer forming the linked buffer chain.
390       *
391       * @param buf The buffer whose next field is to be set.
392       * @param prev The reference to which prev should point.
393       */
394      private void setPrev(Address buf, Address prev) {
395        buf.store(prev, PREV_OFFSET);
396      }
397    
398      /**
399       * Get the "next" pointer in a buffer forming the linked buffer chain.
400       *
401       * @param buf The buffer whose next field is to be returned.
402       * @return The next field for this buffer.
403       */
404      protected final Address getPrev(Address buf) {
405        return buf.loadAddress(PREV_OFFSET);
406      }
407    
408      /**
409       * Check the number of buffers in the work queue (for debugging
410       * purposes).
411       *
412       * @param length The number of buffers believed to be in the queue.
413       * @return True if the length of the queue matches length.
414       */
415      private boolean checkDequeLength(int length) {
416        Address top = head;
417        int l = 0;
418        while (!top.isZero() && l <= length) {
419          top = getNext(top);
420          l++;
421        }
422        return l == length;
423      }
424    
425      /**
426       * Lock this shared queue.  We use one simple low-level lock to
427       * synchronize access to the shared queue of buffers.
428       */
429      private void lock() {
430        lock.acquire();
431      }
432    
433      /**
434       * Release the lock.  We use one simple low-level lock to synchronize
435       * access to the shared queue of buffers.
436       */
437      private void unlock() {
438        lock.release();
439      }
440    
441      /**
442       * Is the current round of processing complete ?
443       */
444      private boolean complete() {
445        return completionFlag == 1;
446      }
447    
448      /**
449       * Set the completion flag.
450       */
451      @Inline
452      private void setCompletionFlag() {
453        if (TRACE_DETAIL) {
454          Log.writeln("# setCompletionFlag: ");
455        }
456        completionFlag = 1;
457      }
458    
459      /**
460       * Clear the completion flag.
461       */
462      @Inline
463      private void clearCompletionFlag() {
464        if (TRACE_DETAIL) {
465          Log.writeln("# clearCompletionFlag: ");
466        }
467        completionFlag = 0;
468      }
469    
470      @Inline
471      private void setNumConsumers(int newNumConsumers) {
472        if (TRACE_DETAIL) {
473          Log.write("# Num consumers "); Log.writeln(newNumConsumers);
474        }
475        numConsumers = newNumConsumers;
476      }
477    
478      @Inline
479      private void setNumConsumersWaiting(int newNCW) {
480        if (TRACE_DETAIL) {
481          Log.write("# Num consumers waiting "); Log.writeln(newNCW);
482        }
483        numConsumersWaiting = newNCW;
484      }
485    
486      @Inline
487      private void setHead(Address newHead) {
488        head = newHead;
489        VM.memory.sync();
490      }
491    
492      @Inline
493      private void setTail(Address newTail) {
494        tail = newTail;
495        VM.memory.sync();
496      }
497    }