001 /* 002 * This file is part of the Jikes RVM project (http://jikesrvm.org). 003 * 004 * This file is licensed to You under the Eclipse Public License (EPL); 005 * You may not use this file except in compliance with the License. You 006 * may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/eclipse-1.0.php 009 * 010 * See the COPYRIGHT.txt file distributed with this work for information 011 * regarding copyright ownership. 012 */ 013 package org.mmtk.utility.deque; 014 015 import org.mmtk.policy.RawPageSpace; 016 import org.mmtk.policy.Space; 017 import org.mmtk.utility.Constants; 018 import org.mmtk.utility.Log; 019 import org.mmtk.vm.Lock; 020 import org.mmtk.vm.VM; 021 import org.vmmagic.pragma.Entrypoint; 022 import org.vmmagic.pragma.Inline; 023 import org.vmmagic.pragma.Uninterruptible; 024 import org.vmmagic.unboxed.Address; 025 import org.vmmagic.unboxed.Offset; 026 027 /** 028 * This supports <i>unsynchronized</i> enqueuing and dequeuing of buffers 029 * for shared use. The data can be added to and removed from either end 030 * of the deque. 031 */ 032 @Uninterruptible 033 public class SharedDeque extends Deque implements Constants { 034 private static final boolean DISABLE_WAITING = true; 035 private static final Offset NEXT_OFFSET = Offset.zero(); 036 private static final Offset PREV_OFFSET = Offset.fromIntSignExtend(BYTES_IN_ADDRESS); 037 038 private static final boolean TRACE = false; 039 private static final boolean TRACE_DETAIL = false; 040 private static final boolean TRACE_BLOCKERS = false; 041 042 /**************************************************************************** 043 * 044 * Public instance methods 045 */ 046 047 /** 048 * Constructor 049 */ 050 public SharedDeque(String name, RawPageSpace rps, int arity) { 051 this.rps = rps; 052 this.arity = arity; 053 this.name = name; 054 lock = VM.newLock("SharedDeque"); 055 clearCompletionFlag(); 056 head = HEAD_INITIAL_VALUE; 057 tail = TAIL_INITIAL_VALUE; 058 } 059 060 /** Get the arity (words per entry) of this queue */ 061 @Inline 062 final int getArity() { return arity; } 063 064 /** 065 * Enqueue a block on the head or tail of the shared queue 066 * 067 * @param buf 068 * @param arity 069 * @param toTail 070 */ 071 final void enqueue(Address buf, int arity, boolean toTail) { 072 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity); 073 lock(); 074 if (toTail) { 075 // Add to the tail of the queue 076 setNext(buf, Address.zero()); 077 if (tail.EQ(TAIL_INITIAL_VALUE)) 078 head = buf; 079 else 080 setNext(tail, buf); 081 setPrev(buf, tail); 082 tail = buf; 083 } else { 084 // Add to the head of the queue 085 setPrev(buf, Address.zero()); 086 if (head.EQ(HEAD_INITIAL_VALUE)) 087 tail = buf; 088 else 089 setPrev(head, buf); 090 setNext(buf, head); 091 head = buf; 092 } 093 bufsenqueued++; 094 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(checkDequeLength(bufsenqueued)); 095 unlock(); 096 } 097 098 public final void clearDeque(int arity) { 099 Address buf = dequeue(arity); 100 while (!buf.isZero()) { 101 free(bufferStart(buf)); 102 buf = dequeue(arity); 103 } 104 setCompletionFlag(); 105 } 106 107 @Inline 108 final Address dequeue(int arity) { 109 return dequeue(arity, false); 110 } 111 112 final Address dequeue(int arity, boolean fromTail) { 113 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity); 114 return dequeue(false, fromTail); 115 } 116 117 @Inline 118 final Address dequeueAndWait(int arity) { 119 return dequeueAndWait(arity, false); 120 } 121 122 final Address dequeueAndWait(int arity, boolean fromTail) { 123 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity); 124 Address buf = dequeue(false, fromTail); 125 if (buf.isZero() && (!complete())) { 126 buf = dequeue(true, fromTail); // Wait inside dequeue 127 } 128 return buf; 129 } 130 131 /** 132 * Prepare for parallel processing. All active GC threads will 133 * participate, and pop operations will block until all work 134 * is complete. 135 */ 136 public final void prepare() { 137 if (DISABLE_WAITING) { 138 prepareNonBlocking(); 139 } else { 140 /* This should be the normal mode of operation once performance is fixed */ 141 prepare(VM.activePlan.collector().parallelWorkerCount()); 142 } 143 } 144 145 /** 146 * Prepare for processing where pop operations on the deques 147 * will never block. 148 */ 149 public final void prepareNonBlocking() { 150 prepare(1); 151 } 152 153 /** 154 * Prepare for parallel processing where a specific number 155 * of threads take part. 156 * 157 * @param consumers # threads taking part. 158 */ 159 private void prepare(int consumers) { 160 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(numConsumersWaiting == 0); 161 setNumConsumers(consumers); 162 clearCompletionFlag(); 163 } 164 165 public final void reset() { 166 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(numConsumersWaiting == 0); 167 clearCompletionFlag(); 168 setNumConsumersWaiting(0); 169 assertExhausted(); 170 } 171 172 public final void assertExhausted() { 173 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(head.isZero() && tail.isZero()); 174 } 175 176 @Inline 177 final Address alloc() { 178 Address rtn = rps.acquire(PAGES_PER_BUFFER); 179 if (rtn.isZero()) { 180 Space.printUsageMB(); 181 VM.assertions.fail("Failed to allocate space for queue. Is metadata virtual memory exhausted?"); 182 } 183 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(rtn.EQ(bufferStart(rtn))); 184 return rtn; 185 } 186 187 @Inline 188 final void free(Address buf) { 189 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(buf.EQ(bufferStart(buf)) && !buf.isZero()); 190 rps.release(buf); 191 } 192 193 @Inline 194 public final int enqueuedPages() { 195 return bufsenqueued * PAGES_PER_BUFFER; 196 } 197 198 /**************************************************************************** 199 * 200 * Private instance methods and fields 201 */ 202 203 /** The name of this shared deque - for diagnostics */ 204 private final String name; 205 206 /** Raw page space from which to allocate */ 207 private RawPageSpace rps; 208 209 /** Number of words per entry */ 210 private final int arity; 211 212 /** Completion flag - set when all consumers have arrived at the barrier */ 213 @Entrypoint 214 private volatile int completionFlag; 215 216 /** # active threads - processing is complete when # waiting == this */ 217 @Entrypoint 218 private volatile int numConsumers; 219 220 /** # threads waiting */ 221 @Entrypoint 222 private volatile int numConsumersWaiting; 223 224 /** Head of the shared deque */ 225 @Entrypoint 226 protected volatile Address head; 227 228 /** Tail of the shared deque */ 229 @Entrypoint 230 protected volatile Address tail; 231 @Entrypoint 232 private volatile int bufsenqueued; 233 private Lock lock; 234 235 private static final long WARN_PERIOD = (long)(2*1E9); 236 private static final long TIMEOUT_PERIOD = 10 * WARN_PERIOD; 237 238 /** 239 * Dequeue a block from the shared pool. If 'waiting' is true, and the 240 * queue is empty, wait for either a new block to show up or all the 241 * other consumers to join us. 242 * 243 * @param waiting 244 * @param fromTail 245 * @return the Address of the block 246 */ 247 private Address dequeue(boolean waiting, boolean fromTail) { 248 lock(); 249 Address rtn = ((fromTail) ? tail : head); 250 if (rtn.isZero()) { 251 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(tail.isZero() && head.isZero()); 252 // no buffers available 253 if (waiting) { 254 int ordinal = TRACE ? 0 : VM.activePlan.collector().getId(); 255 setNumConsumersWaiting(numConsumersWaiting + 1); 256 while (rtn.isZero()) { 257 if (numConsumersWaiting == numConsumers) 258 setCompletionFlag(); 259 if (TRACE) { 260 Log.write("-- ("); Log.write(ordinal); 261 Log.write(") joining wait queue of SharedDeque("); 262 Log.write(name); Log.write(") "); 263 Log.write(numConsumersWaiting); Log.write("/"); 264 Log.write(numConsumers); 265 Log.write(" consumers waiting"); 266 if (complete()) Log.write(" WAIT COMPLETE"); 267 Log.writeln(); 268 if (TRACE_BLOCKERS) 269 VM.assertions.dumpStack(); 270 } 271 unlock(); 272 // Spin and wait 273 spinWait(fromTail); 274 275 if (complete()) { 276 if (TRACE) { 277 Log.write("-- ("); Log.write(ordinal); Log.writeln(") EXITING"); 278 } 279 lock(); 280 setNumConsumersWaiting(numConsumersWaiting - 1); 281 unlock(); 282 return Address.zero(); 283 } 284 lock(); 285 // Re-get the list head/tail while holding the lock 286 rtn = ((fromTail) ? tail : head); 287 } 288 setNumConsumersWaiting(numConsumersWaiting - 1); 289 if (TRACE) { 290 Log.write("-- ("); Log.write(ordinal); Log.write(") resuming work "); 291 Log.write(" n="); Log.writeln(numConsumersWaiting); 292 } 293 } else { 294 unlock(); 295 return Address.zero(); 296 } 297 } 298 if (fromTail) { 299 // dequeue the tail buffer 300 setTail(getPrev(tail)); 301 if (head.EQ(rtn)) { 302 setHead(Address.zero()); 303 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(tail.isZero()); 304 } else { 305 setNext(tail, Address.zero()); 306 } 307 } else { 308 // dequeue the head buffer 309 setHead(getNext(head)); 310 if (tail.EQ(rtn)) { 311 setTail(Address.zero()); 312 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(head.isZero()); 313 } else { 314 setPrev(head, Address.zero()); 315 } 316 } 317 bufsenqueued--; 318 unlock(); 319 return rtn; 320 } 321 322 /** 323 * Spinwait for GC work to arrive 324 * 325 * @param fromTail Check the head or the tail ? 326 */ 327 private void spinWait(boolean fromTail) { 328 long startNano = 0; 329 long lastElapsedNano = 0; 330 while (true) { 331 long startCycles = VM.statistics.cycles(); 332 long endCycles = startCycles + ((long) 1e9); // a few hundred milliseconds more or less. 333 long nowCycles; 334 do { 335 VM.memory.isync(); 336 Address rtn = ((fromTail) ? tail : head); 337 if (!rtn.isZero() || complete()) return; 338 nowCycles = VM.statistics.cycles(); 339 } while (startCycles < nowCycles && nowCycles < endCycles); /* check against both ends to guard against CPU migration */ 340 341 /* 342 * According to the cycle counter, we've been spinning for a while. 343 * Time to check nanoTime and see if we should print a warning and/or fail. 344 * We lock the deque while doing this to avoid interleaved messages from multiple threads. 345 */ 346 lock(); 347 if (startNano == 0) { 348 startNano = VM.statistics.nanoTime(); 349 } else { 350 long nowNano = VM.statistics.nanoTime(); 351 long elapsedNano = nowNano - startNano; 352 if (elapsedNano - lastElapsedNano > WARN_PERIOD) { 353 Log.write("GC Warning: SharedDeque("); Log.write(name); 354 Log.write(") wait has reached "); Log.write(VM.statistics.nanosToSecs(elapsedNano)); 355 Log.write(", "); Log.write(numConsumersWaiting); Log.write("/"); 356 Log.write(numConsumers); Log.writeln(" threads waiting"); 357 lastElapsedNano = elapsedNano; 358 } 359 if (elapsedNano > TIMEOUT_PERIOD) { 360 unlock(); // To allow other GC threads to die in turn 361 VM.assertions.fail("GC Error: SharedDeque Timeout"); 362 } 363 } 364 unlock(); 365 } 366 } 367 368 /** 369 * Set the "next" pointer in a buffer forming the linked buffer chain. 370 * 371 * @param buf The buffer whose next field is to be set. 372 * @param next The reference to which next should point. 373 */ 374 private static void setNext(Address buf, Address next) { 375 buf.store(next, NEXT_OFFSET); 376 } 377 378 /** 379 * Get the "next" pointer in a buffer forming the linked buffer chain. 380 * 381 * @param buf The buffer whose next field is to be returned. 382 * @return The next field for this buffer. 383 */ 384 protected final Address getNext(Address buf) { 385 return buf.loadAddress(NEXT_OFFSET); 386 } 387 388 /** 389 * Set the "prev" pointer in a buffer forming the linked buffer chain. 390 * 391 * @param buf The buffer whose next field is to be set. 392 * @param prev The reference to which prev should point. 393 */ 394 private void setPrev(Address buf, Address prev) { 395 buf.store(prev, PREV_OFFSET); 396 } 397 398 /** 399 * Get the "next" pointer in a buffer forming the linked buffer chain. 400 * 401 * @param buf The buffer whose next field is to be returned. 402 * @return The next field for this buffer. 403 */ 404 protected final Address getPrev(Address buf) { 405 return buf.loadAddress(PREV_OFFSET); 406 } 407 408 /** 409 * Check the number of buffers in the work queue (for debugging 410 * purposes). 411 * 412 * @param length The number of buffers believed to be in the queue. 413 * @return True if the length of the queue matches length. 414 */ 415 private boolean checkDequeLength(int length) { 416 Address top = head; 417 int l = 0; 418 while (!top.isZero() && l <= length) { 419 top = getNext(top); 420 l++; 421 } 422 return l == length; 423 } 424 425 /** 426 * Lock this shared queue. We use one simple low-level lock to 427 * synchronize access to the shared queue of buffers. 428 */ 429 private void lock() { 430 lock.acquire(); 431 } 432 433 /** 434 * Release the lock. We use one simple low-level lock to synchronize 435 * access to the shared queue of buffers. 436 */ 437 private void unlock() { 438 lock.release(); 439 } 440 441 /** 442 * Is the current round of processing complete ? 443 */ 444 private boolean complete() { 445 return completionFlag == 1; 446 } 447 448 /** 449 * Set the completion flag. 450 */ 451 @Inline 452 private void setCompletionFlag() { 453 if (TRACE_DETAIL) { 454 Log.writeln("# setCompletionFlag: "); 455 } 456 completionFlag = 1; 457 } 458 459 /** 460 * Clear the completion flag. 461 */ 462 @Inline 463 private void clearCompletionFlag() { 464 if (TRACE_DETAIL) { 465 Log.writeln("# clearCompletionFlag: "); 466 } 467 completionFlag = 0; 468 } 469 470 @Inline 471 private void setNumConsumers(int newNumConsumers) { 472 if (TRACE_DETAIL) { 473 Log.write("# Num consumers "); Log.writeln(newNumConsumers); 474 } 475 numConsumers = newNumConsumers; 476 } 477 478 @Inline 479 private void setNumConsumersWaiting(int newNCW) { 480 if (TRACE_DETAIL) { 481 Log.write("# Num consumers waiting "); Log.writeln(newNCW); 482 } 483 numConsumersWaiting = newNCW; 484 } 485 486 @Inline 487 private void setHead(Address newHead) { 488 head = newHead; 489 VM.memory.sync(); 490 } 491 492 @Inline 493 private void setTail(Address newTail) { 494 tail = newTail; 495 VM.memory.sync(); 496 } 497 }