001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*- 002 * 003 * Copyright © 2017-2018 microBean. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 014 * implied. See the License for the specific language governing 015 * permissions and limitations under the License. 016 */ 017package org.microbean.kubernetes.controller; 018 019import java.beans.PropertyChangeEvent; 020import java.beans.PropertyChangeListener; 021import java.beans.PropertyChangeSupport; 022 023import java.io.Serializable; 024 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.LinkedHashMap; 030import java.util.Map; 031import java.util.Map.Entry; 032import java.util.Objects; 033import java.util.Set; 034 035import java.util.concurrent.CountDownLatch; 036import java.util.concurrent.ExecutorService; 037import java.util.concurrent.Future; 038import java.util.concurrent.ScheduledExecutorService; 039import java.util.concurrent.ScheduledThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041 042import java.util.function.Consumer; 043import java.util.function.Function; 044import java.util.function.Supplier; 045 046import java.util.logging.Level; 047import java.util.logging.Logger; 048 049import io.fabric8.kubernetes.api.model.HasMetadata; 050 051import net.jcip.annotations.GuardedBy; 052import net.jcip.annotations.ThreadSafe; 053 054import org.microbean.development.annotation.Blocking; 055import org.microbean.development.annotation.NonBlocking; 056 057/** 058 * An {@link EventCache} that temporarily stores {@link Event}s in 059 * {@link EventQueue}s, one per named Kubernetes resource, and 060 * {@linkplain #start(Consumer) provides a means for processing those 061 * queues}. 062 * 063 * <h2>Thread Safety</h2> 064 * 065 * <p>This class is safe for concurrent use by multiple {@link 066 * Thread}s.</p> 067 * 068 * <h2>Design Notes</h2> 069 * 070 * <p>This class loosely models the <a 071 * href="https://github.com/kubernetes/client-go/blob/37c3c02ec96533daec0dbda1f39a6b1d68505c79/tools/cache/delta_fifo.go#L96">{@code 072 * DeltaFIFO}</a> type in the Kubernetes Go client {@code tools/cache} 073 * package.</p> 074 * 075 * @param <T> a type of Kubernetes resource 076 * 077 * @author <a href="https://about.me/lairdnelson" 078 * target="_parent">Laird Nelson</a> 079 * 080 * @see #add(Object, AbstractEvent.Type, HasMetadata) 081 * 082 * @see #replace(Collection, Object) 083 * 084 * @see #synchronize() 085 * 086 * @see #start(Consumer) 087 * 088 * @see EventQueue 089 */ 090@ThreadSafe 091public class EventQueueCollection<T extends HasMetadata> implements EventCache<T>, Supplier<EventQueue<T>>, AutoCloseable { 092 093 094 /* 095 * Instance fields. 096 */ 097 098 099 /** 100 * A {@link PropertyChangeSupport} object that manages {@link 101 * PropertyChangeEvent}s on behalf of this {@link 102 * EventQueueCollection}. 103 * 104 * <p>This field is never {@code null}.</p> 105 * 106 * @see #addPropertyChangeListener(String, PropertyChangeListener) 107 */ 108 private final PropertyChangeSupport propertyChangeSupport; 109 110 /** 111 * Whether this {@link EventQueueCollection} is in the process of 112 * {@linkplain #close() closing}. 113 * 114 * @see #close() 115 */ 116 private volatile boolean closing; 117 118 /** 119 * Whether or not this {@link EventQueueCollection} has been 120 * populated via an invocation of the {@link #replace(Collection, 121 * Object)} method. 122 * 123 * <p>Mutations of this field must be synchronized on {@code 124 * this}.</p> 125 * 126 * @see #replace(Collection, Object) 127 */ 128 @GuardedBy("this") 129 private boolean populated; 130 131 /** 132 * The number of {@link EventQueue}s that this {@link 133 * EventQueueCollection} was initially {@linkplain 134 * #replace(Collection, Object) seeded with}. 135 * 136 * <p>Mutations of this field must be synchronized on {@code 137 * this}.</p> 138 * 139 * @see #replace(Collection, Object) 140 */ 141 @GuardedBy("this") 142 private int initialPopulationCount; 143 144 /** 145 * A {@link LinkedHashMap} of {@link EventQueue} instances, indexed 146 * by {@linkplain EventQueue#getKey() their keys}. 147 * 148 * <p>This field is never {@code null}.</p> 149 * 150 * <p>Mutations of the contents of this {@link LinkedHashMap} must 151 * be synchronized on {@code this}.</p> 152 * 153 * @see #add(Object, AbstractEvent.Type, HasMetadata) 154 */ 155 @GuardedBy("this") 156 private final LinkedHashMap<Object, EventQueue<T>> eventQueueMap; 157 158 /** 159 * A {@link Map} containing the last known state of Kubernetes 160 * resources this {@link EventQueueCollection} is caching events 161 * for. This field is used chiefly by the {@link #synchronize()} 162 * method, but by others as well. 163 * 164 * <p>This field may be {@code null}.</p> 165 * 166 * <p>Mutations of this field must be synchronized on this field's 167 * value.</p> 168 * 169 * @see #getKnownObjects() 170 * 171 * @see #synchronize() 172 */ 173 @GuardedBy("itself") 174 private final Map<?, ? extends T> knownObjects; 175 176 @GuardedBy("this") 177 private ScheduledExecutorService consumerExecutor; 178 179 @GuardedBy("this") 180 private Future<?> eventQueueConsumptionTask; 181 182 private final Function<? super Throwable, Boolean> errorHandler; 183 184 /** 185 * A {@link Logger} used by this {@link EventQueueCollection}. 186 * 187 * <p>This field is never {@code null}.</p> 188 * 189 * @see #createLogger() 190 */ 191 protected final Logger logger; 192 193 /* 194 * Constructors. 195 */ 196 197 198 /** 199 * Creates a new {@link EventQueueCollection} with an initial 200 * capacity of {@code 16} and a load factor of {@code 0.75} that is 201 * not interested in tracking Kubernetes resource deletions. 202 * 203 * @see #EventQueueCollection(Map, int, float) 204 */ 205 public EventQueueCollection() { 206 this(null, null, 16, 0.75f); 207 } 208 209 /** 210 * Creates a new {@link EventQueueCollection} with an initial 211 * capacity of {@code 16} and a load factor of {@code 0.75}. 212 * 213 * @param knownObjects a {@link Map} containing the last known state 214 * of Kubernetes resources this {@link EventQueueCollection} is 215 * caching events for; may be {@code null} if this {@link 216 * EventQueueCollection} is not interested in tracking deletions of 217 * objects; if non-{@code null} <strong>will be synchronized on by 218 * this class</strong> during retrieval and traversal operations 219 * 220 * @see #EventQueueCollection(Map, int, float) 221 */ 222 public EventQueueCollection(final Map<?, ? extends T> knownObjects) { 223 this(knownObjects, null, 16, 0.75f); 224 } 225 226 /** 227 * Creates a new {@link EventQueueCollection}. 228 * 229 * @param knownObjects a {@link Map} containing the last known state 230 * of Kubernetes resources this {@link EventQueueCollection} is 231 * caching events for; may be {@code null} if this {@link 232 * EventQueueCollection} is not interested in tracking deletions of 233 * objects; if non-{@code null} <strong>will be synchronized on by 234 * this class</strong> during retrieval and traversal operations 235 * 236 * @param initialCapacity the initial capacity of the internal data 237 * structure used to house this {@link EventQueueCollection}'s 238 * {@link EventQueue}s; must be an integer greater than {@code 0} 239 * 240 * @param loadFactor the load factor of the internal data structure 241 * used to house this {@link EventQueueCollection}'s {@link 242 * EventQueue}s; must be a positive number between {@code 0} and 243 * {@code 1} 244 */ 245 public EventQueueCollection(final Map<?, ? extends T> knownObjects, final int initialCapacity, final float loadFactor) { 246 this(knownObjects, null, initialCapacity, loadFactor); 247 } 248 249 /** 250 * Creates a new {@link EventQueueCollection}. 251 * 252 * @param knownObjects a {@link Map} containing the last known state 253 * of Kubernetes resources this {@link EventQueueCollection} is 254 * caching events for; may be {@code null} if this {@link 255 * EventQueueCollection} is not interested in tracking deletions of 256 * objects; if non-{@code null} <strong>will be synchronized on by 257 * this class</strong> during retrieval and traversal operations 258 * 259 * @param errorHandler a {@link Function} that accepts a {@link 260 * Throwable} and returns a {@link Boolean} indicating whether the 261 * error was handled or not; used to handle truly unanticipated 262 * errors from within a {@link ScheduledThreadPoolExecutor}; may be 263 * {@code null} 264 * 265 * @param initialCapacity the initial capacity of the internal data 266 * structure used to house this {@link EventQueueCollection}'s 267 * {@link EventQueue}s; must be an integer greater than {@code 0} 268 * 269 * @param loadFactor the load factor of the internal data structure 270 * used to house this {@link EventQueueCollection}'s {@link 271 * EventQueue}s; must be a positive number between {@code 0} and 272 * {@code 1} 273 */ 274 public EventQueueCollection(final Map<?, ? extends T> knownObjects, 275 final Function<? super Throwable, Boolean> errorHandler, 276 final int initialCapacity, 277 final float loadFactor) { 278 super(); 279 final String cn = this.getClass().getName(); 280 final String mn = "<init>"; 281 282 this.logger = this.createLogger(); 283 if (logger == null) { 284 throw new IllegalStateException(); 285 } 286 if (this.logger.isLoggable(Level.FINER)) { 287 final String knownObjectsString; 288 if (knownObjects == null) { 289 knownObjectsString = null; 290 } else { 291 synchronized (knownObjects) { 292 knownObjectsString = knownObjects.toString(); 293 } 294 } 295 this.logger.entering(cn, mn, new Object[] { knownObjectsString, Integer.valueOf(initialCapacity), Float.valueOf(loadFactor) }); 296 } 297 298 this.propertyChangeSupport = new PropertyChangeSupport(this); 299 this.eventQueueMap = new LinkedHashMap<>(initialCapacity, loadFactor); 300 this.knownObjects = knownObjects; 301 if (errorHandler == null) { 302 this.errorHandler = t -> { 303 if (this.logger.isLoggable(Level.SEVERE)) { 304 this.logger.logp(Level.SEVERE, this.getClass().getName(), "<eventQueueConsumptionTask>", t.getMessage(), t); 305 } 306 return true; 307 }; 308 } else { 309 this.errorHandler = errorHandler; 310 } 311 312 if (this.logger.isLoggable(Level.FINER)) { 313 this.logger.exiting(cn, mn); 314 } 315 } 316 317 318 /* 319 * Instance methods. 320 */ 321 322 /** 323 * Returns a {@link Logger} for use by this {@link 324 * EventQueueCollection}. 325 * 326 * <p>This method never returns {@code null}.</p> 327 * 328 * <p>Overrides of this method must not return {@code null}.</p> 329 * 330 * @return a non-{@code null} {@link Logger} for use by this {@link 331 * EventQueueCollection} 332 */ 333 protected Logger createLogger() { 334 return Logger.getLogger(this.getClass().getName()); 335 } 336 337 private final Map<?, ? extends T> getKnownObjects() { 338 return this.knownObjects; 339 } 340 341 /** 342 * Returns {@code true} if this {@link EventQueueCollection} is empty. 343 * 344 * @return {@code true} if this {@link EventQueueCollection} is 345 * empty; {@code false} otherwise 346 */ 347 private synchronized final boolean isEmpty() { 348 return this.eventQueueMap.isEmpty(); 349 } 350 351 /** 352 * Returns {@code true} if this {@link EventQueueCollection} has 353 * been populated via a call to {@link #add(Object, AbstractEvent.Type, 354 * HasMetadata)} at some point, and if there are no {@link 355 * EventQueue}s remaining to be {@linkplain #start(Consumer) 356 * removed}. 357 * 358 * <p>This is a <a 359 * href="https://docs.oracle.com/javase/tutorial/javabeans/writing/properties.html#bound">bound 360 * property</a>.</p> 361 * 362 * @return {@code true} if this {@link EventQueueCollection} has 363 * been populated via a call to {@link #add(Object, AbstractEvent.Type, 364 * HasMetadata)} at some point, and if there are no {@link 365 * EventQueue}s remaining to be {@linkplain #start(Consumer) 366 * removed}; {@code false} otherwise 367 * 368 * @see #replace(Collection, Object) 369 * 370 * @see #add(Object, AbstractEvent.Type, HasMetadata) 371 * 372 * @see #synchronize() 373 */ 374 public synchronized final boolean isSynchronized() { 375 return this.populated && this.initialPopulationCount == 0; 376 } 377 378 /** 379 * <strong>Synchronizes on the {@code knownObjects} object</strong> 380 * {@linkplain #EventQueueCollection(Map, int, float) supplied at 381 * construction time}, if there is one, and, for every Kubernetes 382 * resource found within at the time of this call, adds a {@link 383 * SynchronizationEvent} for it with an {@link AbstractEvent.Type} 384 * of {@link AbstractEvent.Type#MODIFICATION}. 385 */ 386 @Override 387 public final void synchronize() { 388 final String cn = this.getClass().getName(); 389 final String mn = "synchronize"; 390 if (this.logger.isLoggable(Level.FINER)) { 391 this.logger.entering(cn, mn); 392 } 393 synchronized (this) { 394 final Map<?, ? extends T> knownObjects = this.getKnownObjects(); 395 if (knownObjects != null) { 396 synchronized (knownObjects) { 397 if (!knownObjects.isEmpty()) { 398 final Collection<? extends T> values = knownObjects.values(); 399 if (values != null && !values.isEmpty()) { 400 for (final T knownObject : values) { 401 if (knownObject != null) { 402 403 // We follow the Go code in that we use *our* key 404 // extraction logic, rather than relying on the 405 // known key in the knownObjects map. See 406 // https://github.com/kubernetes/client-go/blob/37c3c02ec96533daec0dbda1f39a6b1d68505c79/tools/cache/delta_fifo.go#L567. 407 // I'm not sure this is significant as they should 408 // evaluate to the same thing, but there may be an 409 // edge case I'm not thinking of. 410 final Object key = this.getKey(knownObject); 411 412 if (key != null) { 413 final EventQueue<T> eventQueue = this.eventQueueMap.get(key); 414 if (eventQueue == null || eventQueue.isEmpty()) { 415 // There was an object in our knownObjects map 416 // somehow, but not in one of the queues we 417 // manage. Make sure others know about it. 418 419 // We make a SynchronizationEvent of type 420 // MODIFICATION. shared_informer.go checks in 421 // its HandleDeltas function to see if oldObj 422 // exists; if so, it's a modification 423 // (https://github.com/kubernetes/client-go/blob/37c3c02ec96533daec0dbda1f39a6b1d68505c79/tools/cache/shared_informer.go#L354-L358). 424 // Here we take action *only if* the equivalent 425 // of oldObj exists, therefore this is a 426 // SynchronizationEvent of type MODIFICATION, 427 // not ADDITION. 428 this.addSynchronizationEvent(this, AbstractEvent.Type.MODIFICATION, knownObject); 429 } 430 } 431 432 } 433 } 434 } 435 } 436 } 437 } 438 } 439 if (this.logger.isLoggable(Level.FINER)) { 440 this.logger.exiting(cn, mn); 441 } 442 } 443 444 /** 445 * At a high level, fully replaces the internal state of this {@link 446 * EventQueueCollection} to reflect only the Kubernetes resources 447 * contained in the supplied {@link Collection}. 448 * 449 * <p>{@link SynchronizationEvent}s of type {@link 450 * AbstractEvent.Type#ADDITION} are added for every resource present 451 * in the {@code incomingResources} parameter.</p> 452 * 453 * <p>{@link Event}s of type {@link AbstractEvent.Type#DELETION} are 454 * added when this {@link EventQueueCollection} can determine that 455 * the lack of a resource's presence in the {@code 456 * incomingResources} parameter indicates that it has been deleted 457 * from Kubernetes.</p> 458 * 459 * <p>(No {@link Event}s of type {@link 460 * AbstractEvent.Type#MODIFICATION} are added by this method.)</p> 461 * 462 * <p>{@link EventQueue}s managed by this {@link 463 * EventQueueCollection} that have not yet {@linkplain 464 * #start(Consumer) been processed} are not removed by this 465 * operation.</p> 466 * 467 * <p><strong>This method synchronizes on the supplied {@code 468 * incomingResources} {@link Collection}</strong> while iterating 469 * over it.</p> 470 * 471 * @param incomingResources the {@link Collection} of Kubernetes 472 * resources with which to replace this {@link 473 * EventQueueCollection}'s internal state; <strong>will be 474 * synchronized on</strong>; may be {@code null} or {@linkplain 475 * Collection#isEmpty() empty}, which will be taken as an indication 476 * that this {@link EventQueueCollection} should effectively be 477 * emptied 478 * 479 * @param resourceVersion the version of the Kubernetes list 480 * resource that contained the incoming resources; currently 481 * ignored but reserved for future use; may be {@code null} 482 * 483 * @exception IllegalStateException if the {@link 484 * #createEvent(Object, AbstractEvent.Type, HasMetadata)} method returns 485 * {@code null} for any reason 486 * 487 * @see SynchronizationEvent 488 * 489 * @see #createEvent(Object, AbstractEvent.Type, HasMetadata) 490 */ 491 @Override 492 public synchronized final void replace(final Collection<? extends T> incomingResources, final Object resourceVersion) { 493 final String cn = this.getClass().getName(); 494 final String mn = "replace"; 495 if (this.logger.isLoggable(Level.FINER)) { 496 final String incomingResourcesString; 497 if (incomingResources == null) { 498 incomingResourcesString = null; 499 } else { 500 synchronized (incomingResources) { 501 incomingResourcesString = incomingResources.toString(); 502 } 503 } 504 this.logger.entering(cn, mn, new Object[] { incomingResourcesString, resourceVersion }); 505 } 506 507 // Keep track of our old synchronization state; we'll fire a 508 // JavaBeans bound property at the bottom of this method. 509 final boolean oldSynchronized = this.isSynchronized(); 510 511 final int size; 512 final Set<Object> replacementKeys; 513 514 // Process all the additions first. 515 if (incomingResources == null) { 516 size = 0; 517 replacementKeys = Collections.emptySet(); 518 } else { 519 synchronized (incomingResources) { 520 if (incomingResources.isEmpty()) { 521 size = 0; 522 replacementKeys = Collections.emptySet(); 523 } else { 524 size = incomingResources.size(); 525 assert size > 0; 526 replacementKeys = new HashSet<>(); 527 for (final T resource : incomingResources) { 528 if (resource != null) { 529 replacementKeys.add(this.getKey(resource)); 530 // The final boolean parameter indicates that we don't 531 // want our populated status to be set by this call. We 532 // do this at the bottom of this method ourselves. 533 this.addSynchronizationEvent(this, AbstractEvent.Type.ADDITION, resource, false); 534 } 535 } 536 } 537 } 538 } 539 540 // Now process deletions. 541 542 int queuedDeletions = 0; 543 544 final Map<?, ? extends T> knownObjects = this.getKnownObjects(); 545 if (knownObjects == null) { 546 547 // No one is keeping track of known objects. The best we can do 548 // is: if there's an EventQueue currently being processed, then 549 // if we get here the object it concerns itself with in 550 // Kubernetes is gone. We need to synthesize a deletion event 551 // to say, effectively, "The object was deleted but we don't 552 // know what its prior state was". 553 554 for (final EventQueue<T> eventQueue : this.eventQueueMap.values()) { 555 assert eventQueue != null; 556 557 final Object key; 558 final AbstractEvent<? extends T> newestEvent; 559 synchronized (eventQueue) { 560 561 if (eventQueue.isEmpty()) { 562 key = null; 563 newestEvent = null; 564 throw new IllegalStateException("eventQueue.isEmpty(): " + eventQueue); 565 } 566 567 key = eventQueue.getKey(); 568 if (key == null) { 569 throw new IllegalStateException(); 570 } 571 572 if (replacementKeys.contains(key)) { 573 newestEvent = null; 574 } else { 575 // We have an EventQueue indexed under a key that 576 // identifies a resource that no longer exists in 577 // Kubernetes. Inform any consumers via a deletion event 578 // that this object was removed at some point from 579 // Kubernetes. The state of the object in question is 580 // indeterminate. 581 newestEvent = eventQueue.getLast(); 582 assert newestEvent != null; 583 } 584 585 } 586 587 if (newestEvent != null) { 588 assert key != null; 589 // We grab the last event in the queue in question and get 590 // its resource; this will serve as the state of the 591 // Kubernetes resource in question the last time we knew 592 // about it. This state is not necessarily, but could be, 593 // the true actual last state of the resource in question. 594 // The point is, the true state of the object when it was 595 // deleted is unknown. We build a new event to reflect all 596 // this. 597 // 598 // Astute readers will realize that this could result in two 599 // DELETION events enqueued, back to back, with identical 600 // payloads. See the deduplicate() method in EventQueue, 601 // which takes care of this situation. 602 final T resourceToBeDeleted = newestEvent.getResource(); 603 assert resourceToBeDeleted != null; 604 final Event<T> event = this.createEvent(this, AbstractEvent.Type.DELETION, resourceToBeDeleted); 605 if (event == null) { 606 throw new IllegalStateException("createEvent() == null"); 607 } 608 event.setKey(key); 609 // The final boolean parameter indicates that we don't want 610 // our populated status to be set by this call. We do this 611 // at the bottom of this method ourselves. 612 this.add(event, false); 613 } 614 } 615 616 } else { 617 assert knownObjects != null; 618 619 // We're keeping track of known objects, so fire deletion events 620 // if objects were removed from Kubernetes. 621 622 synchronized (knownObjects) { 623 624 if (!knownObjects.isEmpty()) { 625 final Collection<? extends Entry<?, ? extends T>> entrySet = knownObjects.entrySet(); 626 if (entrySet != null && !entrySet.isEmpty()) { 627 for (final Entry<?, ? extends T> entry : entrySet) { 628 if (entry != null) { 629 final Object knownKey = entry.getKey(); 630 if (!replacementKeys.contains(knownKey)) { 631 final Event<T> event = this.createEvent(this, AbstractEvent.Type.DELETION, entry.getValue()); 632 if (event == null) { 633 throw new IllegalStateException("createEvent() == null"); 634 } 635 event.setKey(knownKey); 636 // The final boolean parameter (false) indicates 637 // that we don't want our populated status to be set 638 // by this call. We do this at the bottom of this 639 // method ourselves. 640 this.add(event, false); 641 queuedDeletions++; 642 } 643 } 644 } 645 } 646 } 647 648 } 649 650 } 651 652 if (!this.populated) { 653 this.populated = true; 654 assert size >= 0; 655 assert queuedDeletions >= 0; 656 final int oldInitialPopulationCount = this.initialPopulationCount; 657 this.initialPopulationCount = size + queuedDeletions; 658 this.firePropertyChange("populated", false, true); 659 this.firePropertyChange("initialPopulationCount", oldInitialPopulationCount, this.initialPopulationCount); 660 if (this.initialPopulationCount == 0) { 661 // We know that we are now synchronized because the definition 662 // of being synchronized is to have an initialPopulationCount 663 // of 0 and a populated status of true. 664 assert this.isSynchronized(); 665 this.firePropertyChange("synchronized", oldSynchronized, true); 666 } 667 } 668 669 if (this.logger.isLoggable(Level.FINER)) { 670 this.logger.exiting(cn, mn); 671 } 672 } 673 674 /** 675 * Returns an {@link Object} which will be used as the key that will 676 * uniquely identify the supplied {@code resource} to this {@link 677 * EventQueueCollection}. 678 * 679 * <p>This method may return {@code null}, but only if {@code 680 * resource} is {@code null} or is constructed in such a way that 681 * its {@link HasMetadata#getMetadata()} method returns {@code 682 * null}.</p> 683 * 684 * <p>Overrides of this method may return {@code null}, but only if 685 * {@code resource} is {@code null}. 686 * 687 * <p>The default implementation of this method returns the return 688 * value of the {@link HasMetadatas#getKey(HasMetadata)} method.</p> 689 * 690 * @param resource a {@link HasMetadata} for which a key should be 691 * returned; may be {@code null} in which case {@code null} may be 692 * returned 693 * 694 * @return a non-{@code null} key for the supplied {@code resource}; 695 * or {@code null} if {@code resource} is {@code null} 696 * 697 * @see HasMetadatas#getKey(HasMetadata) 698 */ 699 protected Object getKey(final T resource) { 700 final String cn = this.getClass().getName(); 701 final String mn = "getKey"; 702 if (this.logger.isLoggable(Level.FINER)) { 703 this.logger.entering(cn, mn, resource); 704 } 705 final Object returnValue = HasMetadatas.getKey(resource); 706 if (this.logger.isLoggable(Level.FINER)) { 707 this.logger.exiting(cn, mn, returnValue); 708 } 709 return returnValue; 710 } 711 712 /** 713 * Creates a new {@link EventQueue} suitable for holding {@link 714 * Event}s {@linkplain Event#getKey() matching} the supplied {@code 715 * key}. 716 * 717 * <p>This method never returns {@code null}.</p> 718 * 719 * <p>Overrides of this method must not return {@code null}.</p> 720 * 721 * @param key the key {@linkplain EventQueue#getKey() for the new 722 * <code>EventQueue</code>}; must not be {@code null} 723 * 724 * @return the new {@link EventQueue}; never {@code null} 725 * 726 * @exception NullPointerException if {@code key} is {@code null} 727 * 728 * @see EventQueue#EventQueue(Object) 729 */ 730 protected EventQueue<T> createEventQueue(final Object key) { 731 final String cn = this.getClass().getName(); 732 final String mn = "createEventQueue"; 733 if (this.logger.isLoggable(Level.FINER)) { 734 this.logger.entering(cn, mn, key); 735 } 736 final EventQueue<T> returnValue = new EventQueue<T>(key); 737 if (this.logger.isLoggable(Level.FINER)) { 738 this.logger.exiting(cn, mn, returnValue); 739 } 740 return returnValue; 741 } 742 743 /** 744 * Starts a new {@link Thread} that, until {@link #close()} is 745 * called, removes {@link EventQueue}s from this {@link 746 * EventQueueCollection} and supplies them to the supplied {@link 747 * Consumer}, and returns a {@link Future} representing this task. 748 * 749 * <p>This method never returns {@code null}.</p> 750 * 751 * <p>If this method has been called before, then the existing 752 * {@link Future} representing the task that was scheduled is 753 * returned instead.</p> 754 * 755 * <p>Invoking this method does not block the calling {@link 756 * Thread}.</p> 757 * 758 * <p>The non-{@code null} {@link Future} that is returned will not 759 * return {@code true} from its {@linkplain Future#isDone()} method 760 * unless {@linkplain Future#cancel(boolean) it has been cancelled} 761 * or an exception has occurred. That is, the task represented by 762 * the returned {@link Future} is never-ending under normal 763 * circumstances.</p> 764 * 765 * @param eventQueueConsumer the {@link Consumer} that will process 766 * each {@link EventQueue} as it becomes ready; must not be {@code 767 * null}. If this {@link Consumer}'s {@link 768 * Consumer#accept(Object)} method throws a {@link 769 * TransientException}, then the {@link EventQueue} that was 770 * supplied to it will be re-enqueued and at some point in the 771 * future this {@link Consumer} will have a chance to re-process it. 772 * 773 * @return a {@link Future} representing the task that is feeding 774 * {@link EventQueue}s to the supplied {@link Consumer}; never 775 * {@code null}; suitable only for {@linkplain 776 * Future#cancel(boolean) cancellation} 777 * 778 * @exception NullPointerException if {@code eventQueueConsumer} is 779 * {@code null} 780 */ 781 @NonBlocking 782 public final Future<?> start(final Consumer<? super EventQueue<? extends T>> eventQueueConsumer) { 783 final String cn = this.getClass().getName(); 784 final String mn = "start"; 785 if (this.logger.isLoggable(Level.FINER)) { 786 this.logger.entering(cn, mn, eventQueueConsumer); 787 } 788 789 Objects.requireNonNull(eventQueueConsumer); 790 791 final Future<?> returnValue; 792 793 synchronized (this) { 794 795 if (this.consumerExecutor == null) { 796 this.consumerExecutor = createScheduledThreadPoolExecutor(); 797 assert this.consumerExecutor != null : "createScheduledThreadPoolExecutor() == null"; 798 } 799 800 if (this.eventQueueConsumptionTask == null) { 801 // This task is scheduled, rather than simply executed, so 802 // that if it terminates exceptionally it will be restarted 803 // after one second. The task could have been written to do 804 // this "scheduling" itself (i.e. it could restart a loop 805 // itself in the presence of an exception) but we follow the 806 // Go code idiom here. 807 this.eventQueueConsumptionTask = 808 this.consumerExecutor.scheduleWithFixedDelay(this.createEventQueueConsumptionTask(eventQueueConsumer), 809 0L, 810 1L, 811 TimeUnit.SECONDS); 812 } 813 assert this.eventQueueConsumptionTask != null; 814 815 returnValue = this.eventQueueConsumptionTask; 816 817 } 818 819 if (this.logger.isLoggable(Level.FINER)) { 820 this.logger.exiting(cn, mn, returnValue); 821 } 822 return returnValue; 823 } 824 825 private static final ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor() { 826 final ScheduledThreadPoolExecutor returnValue = new ScheduledThreadPoolExecutor(1); 827 returnValue.setRemoveOnCancelPolicy(true); 828 return returnValue; 829 } 830 831 /** 832 * Semantically closes this {@link EventQueueCollection} by 833 * detaching any {@link Consumer} previously attached via the {@link 834 * #start(Consumer)} method. {@linkplain #add(Object, AbstractEvent.Type, 835 * HasMetadata) Additions}, {@linkplain #replace(Collection, Object) 836 * replacements} and {@linkplain #synchronize() synchronizations} 837 * are still possible, but there won't be anything consuming any 838 * events generated by or supplied to these operations. 839 * 840 * <p>A closed {@link EventQueueCollection} may be {@linkplain 841 * #start(Consumer) started} again.</p> 842 * 843 * @see #start(Consumer) 844 */ 845 @Override 846 public final void close() { 847 final String cn = this.getClass().getName(); 848 final String mn = "close"; 849 if (this.logger.isLoggable(Level.FINER)) { 850 this.logger.entering(cn, mn); 851 } 852 853 // (closing is a volatile field.) 854 this.closing = true; 855 856 try { 857 858 final ScheduledExecutorService consumerExecutor; 859 synchronized (this) { 860 // We keep this synchronized block as small as we can since in 861 // other areas in the code there are threads holding this 862 // object's monitor. 863 // 864 // Cancel the consumer pump task and begin the lengthy process 865 // of shutting down the pump itself. 866 if (this.eventQueueConsumptionTask != null) { 867 this.eventQueueConsumptionTask.cancel(true); 868 this.eventQueueConsumptionTask = null; 869 } 870 consumerExecutor = this.consumerExecutor; 871 this.consumerExecutor = null; 872 } 873 874 if (consumerExecutor != null) { 875 876 // Stop accepting new tasks. 877 consumerExecutor.shutdown(); 878 879 // Cancel all running tasks firmly (there shouldn't be any, 880 // but it's the right thing to do). 881 consumerExecutor.shutdownNow(); 882 883 try { 884 // Wait for termination to complete normally. This should 885 // complete instantly because there aren't any running 886 // tasks. 887 if (!consumerExecutor.awaitTermination(60, TimeUnit.SECONDS) && this.logger.isLoggable(Level.WARNING)) { 888 this.logger.logp(Level.WARNING, cn, mn, "this.consumerExecutor.awaitTermination() failed"); 889 } 890 } catch (final InterruptedException interruptedException) { 891 Thread.currentThread().interrupt(); 892 } 893 } 894 895 } finally { 896 this.closing = false; 897 } 898 899 assert this.eventQueueConsumptionTask == null; 900 assert this.consumerExecutor == null; 901 902 if (this.logger.isLoggable(Level.FINER)) { 903 this.logger.exiting(cn, mn); 904 } 905 } 906 907 /** 908 * Creates and returns a {@link Runnable} that will serve as the 909 * "body" of a never-ending task that {@linkplain #get() removes the 910 * eldest <code>EventQueue</code>} from this {@link 911 * EventQueueCollection} and {@linkplain Consumer#accept(Object) 912 * supplies it to the supplied <code>eventQueueConsumer</code>}. 913 * 914 * <p>This method never returns {@code null}.</p> 915 * 916 * @param eventQueueConsumer the {@link Consumer} that will act upon 917 * the eldest {@link EventQueue} in this {@link 918 * EventQueueCollection}; must not be {@code null} 919 * 920 * @return a new {@link Runnable}; never {@code null} 921 * 922 * @exception NullPointerException if {@code eventQueueConsumer} is 923 * {@code null} 924 */ 925 private final Runnable createEventQueueConsumptionTask(final Consumer<? super EventQueue<? extends T>> eventQueueConsumer) { 926 Objects.requireNonNull(eventQueueConsumer); 927 final Runnable returnValue = () -> { 928 // This Runnable loosely models the processLoop() function in 929 // https://github.com/kubernetes/kubernetes/blob/v1.9.0/staging/src/k8s.io/client-go/tools/cache/controller.go#L139-L161. 930 try { 931 while (!Thread.currentThread().isInterrupted()) { 932 // Note that get() *removes* an EventQueue from this 933 // EventQueueCollection, blocking until one is available. 934 @Blocking 935 final EventQueue<T> eventQueue = this.get(); 936 if (eventQueue != null) { 937 Throwable unhandledThrowable = null; 938 synchronized (eventQueue) { 939 try { 940 eventQueueConsumer.accept(eventQueue); 941 } catch (final TransientException transientException) { 942 this.eventQueueMap.putIfAbsent(eventQueue.getKey(), eventQueue); 943 } catch (final Throwable e) { 944 unhandledThrowable = e; 945 } 946 } 947 if (unhandledThrowable != null && !this.errorHandler.apply(unhandledThrowable)) { 948 if (unhandledThrowable instanceof RuntimeException) { 949 throw (RuntimeException)unhandledThrowable; 950 } else if (unhandledThrowable instanceof Error) { 951 throw (Error)unhandledThrowable; 952 } else { 953 assert !(unhandledThrowable instanceof Exception) : "unhandledThrowable instanceof Exception: " + unhandledThrowable; 954 } 955 } 956 } 957 } 958 } catch (final RuntimeException | Error unhandledRuntimeExceptionOrError) { 959 // This RuntimeException or Error was almost certainly 960 // supplied to our error handler, who had a chance to process 961 // it, but who rejected it for one reason or another. As a 962 // last attempt to make sure it is noticed, we don't just 963 // throw this RuntimeException or Error but also log it 964 // because it is very easy for a Runnable submitted to an 965 // ExecutorService to have its RuntimeExceptions and Errors 966 // disappear into the ether. Yes, this is a case where we're 967 // both logging and throwing, but the redundancy in this case 968 // is worth it: if an error handler failed, then things are 969 // really bad. 970 if (logger.isLoggable(Level.SEVERE)) { 971 logger.logp(Level.SEVERE, 972 this.getClass().getName(), 973 "<eventQueueConsumptionTask>", 974 unhandledRuntimeExceptionOrError.getMessage(), 975 unhandledRuntimeExceptionOrError); 976 } 977 throw unhandledRuntimeExceptionOrError; 978 } 979 }; 980 return returnValue; 981 } 982 983 /** 984 * Implements the {@link Supplier#get()} contract by 985 * <strong>removing</strong> and returning an {@link EventQueue} if 986 * one is available, <strong>blocking if one is not</strong> and 987 * returning {@code null} only if the {@linkplain Thread#interrupt() 988 * current thread is interrupted} or this {@linkplain 989 * EventQueueCollection#close() <code>EventQueueCollection</code> is 990 * closing}. 991 * 992 * <h2>Design Notes</h2> 993 * 994 * <p>This method calls an internal method that models the <a 995 * href="https://github.com/kubernetes/client-go/blob/03bfb9bdcfe5482795b999f39ca3ed9ad42ce5bb/tools/cache/delta_fifo.go#L407-L453">{@code 996 * Pop} function in {@code delta_fifo.go}</a>.</p> 997 * 998 * @return an {@link EventQueue}, or {@code null} 999 */ 1000 @Blocking 1001 @Override 1002 public final EventQueue<T> get() { 1003 final String cn = this.getClass().getName(); 1004 final String mn = "get"; 1005 if (this.logger.isLoggable(Level.FINER)) { 1006 this.logger.entering(cn, mn); 1007 } 1008 1009 EventQueue<T> returnValue = null; 1010 try { 1011 returnValue = this.take(); 1012 } catch (final InterruptedException interruptedException) { 1013 Thread.currentThread().interrupt(); 1014 returnValue = null; 1015 } 1016 1017 if (this.logger.isLoggable(Level.FINER)) { 1018 this.logger.exiting(cn, mn, returnValue); 1019 } 1020 return returnValue; 1021 } 1022 1023 /** 1024 * Blocks until there is an {@link EventQueue} in this {@link 1025 * EventQueueCollection} available for removal whereupon it is 1026 * removed and returned. 1027 * 1028 * <p>This method may return {@code null} if it is invoked while 1029 * this {@linkplain #close() <code>EventQueueCollection</code> is 1030 * closing}.</p> 1031 * 1032 * <h2>Design Notes</h2> 1033 * 1034 * <p>This method models the <a 1035 * href="https://github.com/kubernetes/client-go/blob/03bfb9bdcfe5482795b999f39ca3ed9ad42ce5bb/tools/cache/delta_fifo.go#L407-L453">{@code 1036 * Pop} function in {@code delta_fifo.go}</a>.</p> 1037 * 1038 * <p>This method is called internally only by the {@link #get()} 1039 * method, which is its {@code public}-facing counterpart. {@link 1040 * #get()} cannot, by contract, throw an {@link 1041 * InterruptedException}; hence this method.</p> 1042 * 1043 * @return the {@link EventQueue} that was removed (taken), or, in 1044 * exceptional circumstances, {@code null} 1045 * 1046 * @exception InterruptedException if the {@link 1047 * Thread#currentThread() current thread} was interrupted 1048 */ 1049 @Blocking 1050 private final EventQueue<T> take() throws InterruptedException { 1051 final String cn = this.getClass().getName(); 1052 final String mn = "take"; 1053 if (this.logger.isLoggable(Level.FINER)) { 1054 this.logger.entering(cn, mn); 1055 } 1056 1057 final EventQueue<T> returnValue; 1058 synchronized (this) { 1059 while (this.isEmpty() && !this.closing) { 1060 this.wait(); // blocks; see add() for corresponding notifyAll() 1061 } 1062 1063 // We can assert this because add() is the method that calls 1064 // notifyAll(), and add() sets populated to true before it calls 1065 // notifyAll(). The notifyAll() call is the only way that the 1066 // blocking wait() call above will exit other than interruption, 1067 // and if it exited via interruption, an InterruptedException 1068 // would have been thrown and we'd never get here. 1069 assert this.populated : "this.populated == false"; 1070 1071 if (this.isEmpty()) { 1072 assert this.closing : "this.isEmpty() && !this.closing"; 1073 returnValue = null; 1074 } else { 1075 1076 // Pop the first EventQueue off and return it. 1077 final Iterator<EventQueue<T>> iterator = this.eventQueueMap.values().iterator(); 1078 assert iterator != null; 1079 assert iterator.hasNext(); 1080 returnValue = iterator.next(); 1081 assert returnValue != null; 1082 iterator.remove(); 1083 1084 if (this.initialPopulationCount > 0) { 1085 // We know we're not populated and our 1086 // initialPopulationCount is not 0, so therefore we are not 1087 // synchronized. See isSynchronized(). 1088 assert !this.isSynchronized(); 1089 final int oldInitialPopulationCount = this.initialPopulationCount; 1090 this.initialPopulationCount--; 1091 this.firePropertyChange("initialPopulationCount", oldInitialPopulationCount, this.initialPopulationCount); 1092 this.firePropertyChange("synchronized", false, this.isSynchronized()); 1093 } 1094 this.firePropertyChange("empty", false, this.isEmpty()); 1095 } 1096 } 1097 1098 if (this.logger.isLoggable(Level.FINER)) { 1099 final String eventQueueString; 1100 synchronized (returnValue) { 1101 eventQueueString = returnValue.toString(); 1102 } 1103 this.logger.exiting(cn, mn, eventQueueString); 1104 } 1105 return returnValue; 1106 } 1107 1108 /** 1109 * Creates an {@link Event} using the supplied raw materials and 1110 * returns it. 1111 * 1112 * <p>This method never returns {@code null}.</p> 1113 * 1114 * <p>Implementations of this method must not return {@code 1115 * null}.</p> 1116 * 1117 * <p>Implementations of this method must return a new {@link Event} 1118 * with every invocation.</p> 1119 * 1120 * @param source the {@linkplain Event#getSource() source} of the 1121 * {@link Event} that will be created; must not be null 1122 * 1123 * @param eventType the {@linkplain Event#getType() type} of {@link 1124 * Event} that will be created; must not be {@code null} 1125 * 1126 * @param resource the {@linkplain Event#getResource() resource} of 1127 * the {@link Event} that will be created; must not be 1128 * {@code null} 1129 * 1130 * @return the created {@link Event}; never {@code null} 1131 * 1132 * @exception NullPointerException if any parameter is {@code null} 1133 */ 1134 protected Event<T> createEvent(final Object source, final AbstractEvent.Type eventType, final T resource) { 1135 final String cn = this.getClass().getName(); 1136 final String mn = "createEvent"; 1137 if (this.logger.isLoggable(Level.FINER)) { 1138 this.logger.entering(cn, mn, new Object[] { source, eventType, resource }); 1139 } 1140 1141 Objects.requireNonNull(source); 1142 Objects.requireNonNull(eventType); 1143 Objects.requireNonNull(resource); 1144 final Event<T> returnValue = new Event<>(source, eventType, null, resource); 1145 1146 if (this.logger.isLoggable(Level.FINER)) { 1147 this.logger.exiting(cn, mn, returnValue); 1148 } 1149 return returnValue; 1150 } 1151 1152 /** 1153 * Creates a {@link SynchronizationEvent} using the supplied raw 1154 * materials and returns it. 1155 * 1156 * <p>This method never returns {@code null}.</p> 1157 * 1158 * <p>Implementations of this method must not return {@code 1159 * null}.</p> 1160 * 1161 * <p>Implementations of this method must return a new {@link 1162 * SynchronizationEvent} with every invocation.</p> 1163 * 1164 * @param source the {@linkplain SynchronizationEvent#getSource() 1165 * source} of the {@link SynchronizationEvent} that will be created; 1166 * must not be null 1167 * 1168 * @param eventType the {@linkplain Event#getType() type} of {@link 1169 * SynchronizationEvent} that will be created; must not be {@code 1170 * null} 1171 * 1172 * @param resource the {@linkplain Event#getResource() resource} of 1173 * the {@link SynchronizationEvent} that will be created; must not 1174 * be {@code null} 1175 * 1176 * @return the created {@link SynchronizationEvent}; never {@code 1177 * null} 1178 * 1179 * @exception NullPointerException if any parameter is {@code null} 1180 */ 1181 protected SynchronizationEvent<T> createSynchronizationEvent(final Object source, final AbstractEvent.Type eventType, final T resource) { 1182 final String cn = this.getClass().getName(); 1183 final String mn = "createSynchronizationEvent"; 1184 if (this.logger.isLoggable(Level.FINER)) { 1185 this.logger.entering(cn, mn, new Object[] { source, eventType, resource }); 1186 } 1187 1188 Objects.requireNonNull(source); 1189 Objects.requireNonNull(eventType); 1190 Objects.requireNonNull(resource); 1191 final SynchronizationEvent<T> returnValue = new SynchronizationEvent<>(source, eventType, null, resource); 1192 1193 if (this.logger.isLoggable(Level.FINER)) { 1194 this.logger.exiting(cn, mn, returnValue); 1195 } 1196 return returnValue; 1197 } 1198 1199 private final SynchronizationEvent<T> addSynchronizationEvent(final Object source, 1200 final AbstractEvent.Type eventType, 1201 final T resource) { 1202 return this.addSynchronizationEvent(source, eventType, resource, true); 1203 } 1204 1205 private final SynchronizationEvent<T> addSynchronizationEvent(final Object source, 1206 final AbstractEvent.Type eventType, 1207 final T resource, 1208 final boolean populate) { 1209 final String cn = this.getClass().getName(); 1210 final String mn = "addSynchronizationEvent"; 1211 if (this.logger.isLoggable(Level.FINER)) { 1212 this.logger.entering(cn, mn, new Object[] { source, eventType, resource, Boolean.valueOf(populate) }); 1213 } 1214 1215 Objects.requireNonNull(source); 1216 Objects.requireNonNull(eventType); 1217 Objects.requireNonNull(resource); 1218 1219 if (!(eventType.equals(AbstractEvent.Type.ADDITION) || eventType.equals(AbstractEvent.Type.MODIFICATION))) { 1220 throw new IllegalArgumentException("Illegal eventType: " + eventType); 1221 } 1222 1223 final SynchronizationEvent<T> event = this.createSynchronizationEvent(source, eventType, resource); 1224 if (event == null) { 1225 throw new IllegalStateException("createSynchronizationEvent() == null"); 1226 } 1227 final SynchronizationEvent<T> returnValue = this.add(event, populate); 1228 1229 if (this.logger.isLoggable(Level.FINER)) { 1230 this.logger.exiting(cn, mn, returnValue); 1231 } 1232 return returnValue; 1233 } 1234 1235 /** 1236 * Adds a new {@link Event} constructed out of the parameters 1237 * supplied to this method to this {@link EventQueueCollection} and 1238 * returns the {@link Event} that was added. 1239 * 1240 * <p>This method may return {@code null}.</p> 1241 * 1242 * <p>This implementation {@linkplain #createEventQueue(Object) 1243 * creates an <code>EventQueue</code>} if necessary for the {@link 1244 * Event} that will be added, and then adds the new {@link Event} to 1245 * the queue.</p> 1246 * 1247 * @param source the {@linkplain Event#getSource() source} of the 1248 * {@link Event} that will be created and added; must not be null 1249 * 1250 * @param eventType the {@linkplain Event#getType() type} of {@link 1251 * Event} that will be created and added; must not be {@code null} 1252 * 1253 * @param resource the {@linkplain Event#getResource() resource} of 1254 * the {@link Event} that will be created and added; must not be 1255 * {@code null} 1256 * 1257 * @return the {@link Event} that was created and added, or {@code 1258 * null} if no {@link Event} was actually added as a result of this 1259 * method's invocation 1260 * 1261 * @exception NullPointerException if any of the parameters is 1262 * {@code null} 1263 * 1264 * @see Event 1265 */ 1266 @Override 1267 public final Event<T> add(final Object source, final AbstractEvent.Type eventType, final T resource) { 1268 return this.add(source, eventType, resource, true); 1269 } 1270 1271 /** 1272 * Adds a new {@link Event} constructed out of the parameters 1273 * supplied to this method to this {@link EventQueueCollection} and 1274 * returns the {@link Event} that was added. 1275 * 1276 * <p>This method may return {@code null}.</p> 1277 * 1278 * <p>This implementation {@linkplain #createEventQueue(Object) 1279 * creates an <code>EventQueue</code>} if necessary for the {@link 1280 * Event} that will be added, and then adds the new {@link Event} to 1281 * the queue.</p> 1282 * 1283 * @param source the {@linkplain Event#getSource() source} of the 1284 * {@link Event} that will be created and added; must not be null 1285 * 1286 * @param eventType the {@linkplain Event#getType() type} of {@link 1287 * Event} that will be created and added; must not be {@code null} 1288 * 1289 * @param resource the {@linkplain Event#getResource() resource} of 1290 * the {@link Event} that will be created and added; must not be 1291 * {@code null} 1292 * 1293 * @param populate if {@code true} then this {@link 1294 * EventQueueCollection} will be internally marked as <em>initially 1295 * populated</em> 1296 * 1297 * @return the {@link Event} that was created and added, or {@code 1298 * null} if no {@link Event} was actually added as a result of this 1299 * method's invocation 1300 * 1301 * @exception NullPointerException if any of the parameters is 1302 * {@code null} 1303 * 1304 * @see Event 1305 */ 1306 private final Event<T> add(final Object source, 1307 final AbstractEvent.Type eventType, 1308 final T resource, 1309 final boolean populate) { 1310 final String cn = this.getClass().getName(); 1311 final String mn = "add"; 1312 if (this.logger.isLoggable(Level.FINER)) { 1313 this.logger.entering(cn, mn, new Object[] { source, eventType, resource, Boolean.valueOf(populate)}); 1314 } 1315 1316 final Event<T> event = this.createEvent(source, eventType, resource); 1317 if (event == null) { 1318 throw new IllegalStateException("createEvent() == null"); 1319 } 1320 final Event<T> returnValue = this.add(event, populate); 1321 1322 if (this.logger.isLoggable(Level.FINER)) { 1323 this.logger.exiting(cn, mn, returnValue); 1324 } 1325 return returnValue; 1326 } 1327 1328 /** 1329 * Adds the supplied {@link Event} to this {@link 1330 * EventQueueCollection} and returns the {@link Event} that was 1331 * added. 1332 * 1333 * <p>This method may return {@code null}.</p> 1334 * 1335 * <p>This implementation {@linkplain #createEventQueue(Object) 1336 * creates an <code>EventQueue</code>} if necessary for the {@link 1337 * Event} that will be added, and then adds the new {@link Event} to 1338 * the queue.</p> 1339 * 1340 * @param <E> an {@link AbstractEvent} type that is both consumed 1341 * and returned 1342 * 1343 * @param event the {@link Event} to add; must not be {@code null} 1344 * 1345 * @param populate if {@code true} then this {@link 1346 * EventQueueCollection} will be internally marked as <em>initially 1347 * populated</em> 1348 * 1349 * @return the {@link Event} that was created and added, or {@code 1350 * null} if no {@link Event} was actually added as a result of this 1351 * method's invocation 1352 * 1353 * @exception NullPointerException if any of the parameters is 1354 * {@code null} 1355 * 1356 * @see Event 1357 */ 1358 private final <E extends AbstractEvent<T>> E add(final E event, final boolean populate) { 1359 final String cn = this.getClass().getName(); 1360 final String mn = "add"; 1361 if (this.logger.isLoggable(Level.FINER)) { 1362 this.logger.entering(cn, mn, new Object[] { event, Boolean.valueOf(populate) }); 1363 } 1364 1365 if (this.closing) { 1366 throw new IllegalStateException(); 1367 } 1368 1369 Objects.requireNonNull(event); 1370 1371 final Object key = event.getKey(); 1372 if (key == null) { 1373 throw new IllegalArgumentException("event.getKey() == null"); 1374 } 1375 1376 E returnValue = null; 1377 1378 synchronized (this) { 1379 1380 if (populate && !this.populated) { 1381 this.populated = true; 1382 // TODO: too early? 1383 this.firePropertyChange("populated", false, true); 1384 } 1385 1386 EventQueue<T> eventQueue = this.eventQueueMap.get(key); 1387 1388 final boolean eventQueueExisted = eventQueue != null; 1389 1390 if (!eventQueueExisted) { 1391 eventQueue = this.createEventQueue(key); 1392 if (eventQueue == null) { 1393 throw new IllegalStateException("createEventQueue(key) == null: " + key); 1394 } 1395 } 1396 assert eventQueue != null; 1397 1398 final boolean eventAdded; 1399 final boolean eventQueueIsEmpty; 1400 synchronized (eventQueue) { 1401 eventAdded = eventQueue.addEvent(event); 1402 // Adding an event to an EventQueue can result in compression, 1403 // which may result in the EventQueue becoming empty as a 1404 // result of the add operation. 1405 eventQueueIsEmpty = eventQueue.isEmpty(); 1406 } 1407 1408 if (eventAdded) { 1409 returnValue = event; 1410 } 1411 1412 if (eventQueueIsEmpty) { 1413 // Compression might have emptied the queue, so an add could 1414 // result in an empty queue. We don't permit empty queues. 1415 if (eventQueueExisted) { 1416 returnValue = null; 1417 final boolean oldEmpty = this.isEmpty(); 1418 final Object oldEventQueue = this.eventQueueMap.remove(key); 1419 assert oldEventQueue != null; 1420 this.firePropertyChange("empty", oldEmpty, this.isEmpty()); 1421 } else { 1422 // Nothing to do; the queue we added the event to was 1423 // created here, and was never added to our internal eventQueueMap, so 1424 // we're done. 1425 } 1426 } else if (!eventQueueExisted) { 1427 // We created the EventQueue we just added to; now we need to 1428 // store it. 1429 final boolean oldEmpty = this.isEmpty(); 1430 final Object oldEventQueue = this.eventQueueMap.put(key, eventQueue); 1431 assert oldEventQueue == null; 1432 this.firePropertyChange("empty", oldEmpty, this.isEmpty()); 1433 // Notify anyone blocked on our empty state that we're no 1434 // longer empty. 1435 this.notifyAll(); 1436 } 1437 1438 } 1439 1440 if (this.logger.isLoggable(Level.FINER)) { 1441 this.logger.exiting(cn, mn, returnValue); 1442 } 1443 return returnValue; 1444 } 1445 1446 1447 /* 1448 * PropertyChangeListener support. 1449 */ 1450 1451 1452 /** 1453 * Adds the supplied {@link PropertyChangeListener} to this {@link 1454 * EventQueueCollection}'s collection of such listeners so that it 1455 * will be notified only when the bound property bearing the 1456 * supplied {@code name} changes. 1457 * 1458 * @param name the name of the bound property whose changes are of 1459 * interest; may be {@code null} in which case all property change 1460 * notifications will be dispatched to the supplied {@link 1461 * PropertyChangeListener} 1462 * 1463 * @param listener the {@link PropertyChangeListener} to add; may be 1464 * {@code null} in which case no action will be taken 1465 * 1466 * @see #addPropertyChangeListener(PropertyChangeListener) 1467 */ 1468 public final void addPropertyChangeListener(final String name, final PropertyChangeListener listener) { 1469 if (listener != null) { 1470 this.propertyChangeSupport.addPropertyChangeListener(name, listener); 1471 } 1472 } 1473 1474 /** 1475 * Adds the supplied {@link PropertyChangeListener} to this {@link 1476 * EventQueueCollection}'s collection of such listeners so that it 1477 * will be notified whenever any bound property of this {@link 1478 * EventQueueCollection} changes. 1479 * 1480 * @param listener the {@link PropertyChangeListener} to add; may be 1481 * {@code null} in which case no action will be taken 1482 * 1483 * @see #addPropertyChangeListener(String, PropertyChangeListener) 1484 */ 1485 public final void addPropertyChangeListener(final PropertyChangeListener listener) { 1486 if (listener != null) { 1487 this.propertyChangeSupport.addPropertyChangeListener(listener); 1488 } 1489 } 1490 1491 /** 1492 * Removes the supplied {@link PropertyChangeListener} from this 1493 * {@link EventQueueCollection} so that it will no longer be 1494 * notified of changes to bound properties bearing the supplied 1495 * {@code name}. 1496 * 1497 * @param name a bound property name; may be {@code null} 1498 * 1499 * @param listener the {@link PropertyChangeListener} to remove; may 1500 * be {@code null} in which case no action will be taken 1501 * 1502 * @see #addPropertyChangeListener(String, PropertyChangeListener) 1503 * 1504 * @see #removePropertyChangeListener(PropertyChangeListener) 1505 */ 1506 public final void removePropertyChangeListener(final String name, final PropertyChangeListener listener) { 1507 if (listener != null) { 1508 this.propertyChangeSupport.removePropertyChangeListener(name, listener); 1509 } 1510 } 1511 1512 /** 1513 * Removes the supplied {@link PropertyChangeListener} from this 1514 * {@link EventQueueCollection} so that it will no longer be 1515 * notified of any changes to bound properties. 1516 * 1517 * @param listener the {@link PropertyChangeListener} to remove; may 1518 * be {@code null} in which case no action will be taken 1519 * 1520 * @see #addPropertyChangeListener(PropertyChangeListener) 1521 * 1522 * @see #removePropertyChangeListener(String, PropertyChangeListener) 1523 */ 1524 public final void removePropertyChangeListener(final PropertyChangeListener listener) { 1525 if (listener != null) { 1526 this.propertyChangeSupport.removePropertyChangeListener(listener); 1527 } 1528 } 1529 1530 /** 1531 * Returns an array of {@link PropertyChangeListener}s that were 1532 * {@linkplain #addPropertyChangeListener(String, 1533 * PropertyChangeListener) registered} to receive notifications for 1534 * changes to bound properties bearing the supplied {@code name}. 1535 * 1536 * <p>This method never returns {@code null}.</p> 1537 * 1538 * @param name the name of a bound property; may be {@code null} in 1539 * which case an empty array will be returned 1540 * 1541 * @return a non-{@code null} array of {@link 1542 * PropertyChangeListener}s 1543 * 1544 * @see #getPropertyChangeListeners() 1545 * 1546 * @see #addPropertyChangeListener(String, PropertyChangeListener) 1547 * 1548 * @see #removePropertyChangeListener(String, 1549 * PropertyChangeListener) 1550 */ 1551 public final PropertyChangeListener[] getPropertyChangeListeners(final String name) { 1552 return this.propertyChangeSupport.getPropertyChangeListeners(name); 1553 } 1554 1555 /** 1556 * Returns an array of {@link PropertyChangeListener}s that were 1557 * {@linkplain #addPropertyChangeListener(String, 1558 * PropertyChangeListener) registered} to receive notifications for 1559 * changes to all bound properties. 1560 * 1561 * <p>This method never returns {@code null}.</p> 1562 * 1563 * @return a non-{@code null} array of {@link 1564 * PropertyChangeListener}s 1565 * 1566 * @see #getPropertyChangeListeners(String) 1567 * 1568 * @see #addPropertyChangeListener(PropertyChangeListener) 1569 * 1570 * @see #removePropertyChangeListener(PropertyChangeListener) 1571 */ 1572 public final PropertyChangeListener[] getPropertyChangeListeners() { 1573 return this.propertyChangeSupport.getPropertyChangeListeners(); 1574 } 1575 1576 /** 1577 * Fires a {@link PropertyChangeEvent} to {@linkplain 1578 * #addPropertyChangeListener(String, PropertyChangeListener) 1579 * registered <tt>PropertyChangeListener</tt>s} if the supplied 1580 * {@code old} and {@code newValue} objects are non-{@code null} and 1581 * not equal to each other. 1582 * 1583 * @param propertyName the name of the bound property that might 1584 * have changed; may be {@code null} (indicating that some unknown 1585 * set of bound properties has changed) 1586 * 1587 * @param old the old value of the bound property in question; may 1588 * be {@code null} 1589 * 1590 * @param newValue the new value of the bound property; may be 1591 * {@code null} 1592 */ 1593 protected final void firePropertyChange(final String propertyName, final Object old, final Object newValue) { 1594 final String cn = this.getClass().getName(); 1595 final String mn = "firePropertyChange"; 1596 if (this.logger.isLoggable(Level.FINER)) { 1597 this.logger.entering(cn, mn, new Object[] { propertyName, old, newValue }); 1598 } 1599 this.propertyChangeSupport.firePropertyChange(propertyName, old, newValue); 1600 if (this.logger.isLoggable(Level.FINER)) { 1601 this.logger.exiting(cn, mn); 1602 } 1603 } 1604 1605 /** 1606 * Fires a {@link PropertyChangeEvent} to {@linkplain 1607 * #addPropertyChangeListener(String, PropertyChangeListener) 1608 * registered <tt>PropertyChangeListener</tt>s} if the supplied 1609 * {@code old} and {@code newValue} objects are non-{@code null} and 1610 * not equal to each other. 1611 * 1612 * @param propertyName the name of the bound property that might 1613 * have changed; may be {@code null} (indicating that some unknown 1614 * set of bound properties has changed) 1615 * 1616 * @param old the old value of the bound property in question 1617 * 1618 * @param newValue the new value of the bound property 1619 */ 1620 protected final void firePropertyChange(final String propertyName, final int old, final int newValue) { 1621 final String cn = this.getClass().getName(); 1622 final String mn = "firePropertyChange"; 1623 if (this.logger.isLoggable(Level.FINER)) { 1624 this.logger.entering(cn, mn, new Object[] { propertyName, Integer.valueOf(old), Integer.valueOf(newValue) }); 1625 } 1626 this.propertyChangeSupport.firePropertyChange(propertyName, old, newValue); 1627 if (this.logger.isLoggable(Level.FINER)) { 1628 this.logger.exiting(cn, mn); 1629 } 1630 } 1631 1632 /** 1633 * Fires a {@link PropertyChangeEvent} to {@linkplain 1634 * #addPropertyChangeListener(String, PropertyChangeListener) 1635 * registered <tt>PropertyChangeListener</tt>s} if the supplied 1636 * {@code old} and {@code newValue} objects are non-{@code null} and 1637 * not equal to each other. 1638 * 1639 * @param name the name of the bound property that might 1640 * have changed; may be {@code null} (indicating that some unknown 1641 * set of bound properties has changed) 1642 * 1643 * @param old the old value of the bound property in question 1644 * 1645 * @param newValue the new value of the bound property 1646 */ 1647 protected final void firePropertyChange(final String name, final boolean old, final boolean newValue) { 1648 final String cn = this.getClass().getName(); 1649 final String mn = "firePropertyChange"; 1650 if (this.logger.isLoggable(Level.FINER)) { 1651 this.logger.entering(cn, mn, new Object[] { name, Boolean.valueOf(old), Boolean.valueOf(newValue) }); 1652 } 1653 this.propertyChangeSupport.firePropertyChange(name, old, newValue); 1654 if (this.logger.isLoggable(Level.FINER)) { 1655 this.logger.exiting(cn, mn); 1656 } 1657 } 1658 1659 1660 /* 1661 * Inner and nested classes. 1662 */ 1663 1664 1665 /** 1666 * A {@link RuntimeException} indicating that a {@link Consumer} 1667 * {@linkplain EventQueueCollection#start(Consumer) started} by an 1668 * {@link EventQueueCollection} has encountered an error that might 1669 * not happen if the consumption operation is retried. 1670 * 1671 * @author <a href="https://about.me/lairdnelson" 1672 * target="_parent">Laird Nelson</a> 1673 * 1674 * @see EventQueueCollection#start(Consumer) 1675 * 1676 * @see EventQueueCollection 1677 */ 1678 public static final class TransientException extends RuntimeException { 1679 1680 1681 /* 1682 * Static fields. 1683 */ 1684 1685 1686 /** 1687 * The version of this class for {@linkplain Serializable 1688 * serialization purposes}. 1689 * 1690 * @see Serializable 1691 */ 1692 private static final long serialVersionUID = 1L; 1693 1694 1695 /* 1696 * Constructors. 1697 */ 1698 1699 1700 /** 1701 * Creates a new {@link TransientException}. 1702 */ 1703 public TransientException() { 1704 super(); 1705 } 1706 1707 /** 1708 * Creates a new {@link TransientException}. 1709 * 1710 * @param message a detail message describing the error; may be 1711 * {@code null} 1712 */ 1713 public TransientException(final String message) { 1714 super(message); 1715 } 1716 1717 /** 1718 * Creates a new {@link TransientException}. 1719 * 1720 * @param cause the {@link Throwable} that caused this {@link 1721 * TransientException} to be created; may be {@code null} 1722 */ 1723 public TransientException(final Throwable cause) { 1724 super(cause); 1725 } 1726 1727 /** 1728 * Creates a new {@link TransientException}. 1729 * 1730 * @param message a detail message describing the error; may be 1731 * {@code null} 1732 * 1733 * @param cause the {@link Throwable} that caused this {@link 1734 * TransientException} to be created; may be {@code null} 1735 */ 1736 public TransientException(final String message, final Throwable cause) { 1737 super(message, cause); 1738 } 1739 1740 } 1741 1742 1743 /** 1744 * A {@link PropertyChangeListener} specifically designed for 1745 * reacting to a change in the synchronization status of an {@link 1746 * EventQueueCollection} as represented by the firing of its {@code 1747 * synchronized} <a 1748 * href="https://docs.oracle.com/javase/tutorial/javabeans/writing/properties.html#bound">bound 1749 * Java Beans property</a>. 1750 * 1751 * @author <a href="https://about.me/lairdnelson" 1752 * target="_parent">Laird Nelson</a> 1753 * 1754 * @see EventQueueCollection#addPropertyChangeListener(String, 1755 * PropertyChangeListener) 1756 * 1757 * @see #await() 1758 * 1759 * @see #propertyChange(PropertyChangeEvent) 1760 */ 1761 public static final class SynchronizationAwaitingPropertyChangeListener implements PropertyChangeListener { 1762 1763 1764 /* 1765 * Instance fields. 1766 */ 1767 1768 1769 /** 1770 * A {@link CountDownLatch} whose {@link 1771 * CountDownLatch#countDown()} method is invoked in certain cases 1772 * by the {@link #propertyChange(PropertyChangeEvent)} method. 1773 * 1774 * <p>This field is never {@code null}.</p> 1775 * 1776 * @see #propertyChange(PropertyChangeEvent) 1777 */ 1778 private final CountDownLatch latch; 1779 1780 1781 /* 1782 * Constructors. 1783 */ 1784 1785 1786 /** 1787 * Creates a new {@link 1788 * SynchronizationAwaitingPropertyChangeListener}. 1789 */ 1790 public SynchronizationAwaitingPropertyChangeListener() { 1791 super(); 1792 this.latch = new CountDownLatch(1); 1793 } 1794 1795 1796 /* 1797 * Instance methods. 1798 */ 1799 1800 1801 /** 1802 * If the supplied {@link PropertyChangeEvent} is non-{@code 1803 * null}, has a {@linkplain PropertyChangeEvent#getSource() 1804 * source} that is an instance of {@link EventQueueCollection}, 1805 * has a {@linkplain PropertyChangeEvent#getPropertyName() 1806 * property name} equal to {@code synchronized} and a {@linkplain 1807 * PropertyChangeEvent#getNewValue() new value} equal to {@link 1808 * Boolean#TRUE}, then it is guaranteed that any calls currently 1809 * blocked on the {@link #await()} or {@link #await(long, 1810 * TimeUnit)} methods will unblock, and subsequent invocations of 1811 * those methods will never block again. 1812 * 1813 * @param event a {@link PropertyChangeEvent} fired by an {@link 1814 * EventQueueCollection}; may be {@code null} in which case no 1815 * action will be taken 1816 * 1817 * @see EventQueueCollection#addPropertyChangeListener(String, 1818 * PropertyChangeListener) 1819 * 1820 * @see EventQueueCollection#isSynchronized() 1821 */ 1822 @Override 1823 public final void propertyChange(final PropertyChangeEvent event) { 1824 if (event != null && 1825 event.getSource() instanceof EventQueueCollection && 1826 "synchronized".equals(event.getPropertyName()) && 1827 Boolean.TRUE.equals(event.getNewValue())) { 1828 this.latch.countDown(); 1829 } 1830 } 1831 1832 /** 1833 * Blocks until the conditions described in the documentation of 1834 * the {@link #propertyChange(PropertyChangeEvent)} method hold 1835 * true. 1836 * 1837 * @exception InterruptedException if the current {@link Thread} 1838 * is interrupted 1839 */ 1840 @Blocking 1841 public final void await() throws InterruptedException { 1842 this.latch.await(); 1843 } 1844 1845 /** 1846 * Blocks until the conditions described in the documentation of 1847 * the {@link #propertyChange(PropertyChangeEvent)} method hold 1848 * true or the indicated time has passed. 1849 * 1850 * @param timeout the number of units of time to wait for 1851 * 1852 * @param timeUnit the unit of time designated by the {@code 1853 * timeout} parameter; must not be {@code null} 1854 * 1855 * @return {@code false} if the waiting time elapsed before the 1856 * bound property named {@code synchronized} changed its value to 1857 * {@code true}; {@code true} otherwise 1858 * 1859 * @exception InterruptedException if the current {@link Thread} 1860 * is interrupted 1861 * 1862 * @exception NullPointerException if {@code timeUnit} is {@code 1863 * null} 1864 * 1865 * @see #propertyChange(PropertyChangeEvent) 1866 */ 1867 @Blocking 1868 public final boolean await(final long timeout, final TimeUnit timeUnit) throws InterruptedException { 1869 return this.latch.await(timeout, timeUnit); 1870 } 1871 1872 } 1873 1874}