001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
002 *
003 * Copyright © 2017-2018 microBean.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
014 * implied.  See the License for the specific language governing
015 * permissions and limitations under the License.
016 */
017package org.microbean.kubernetes.controller;
018
019import java.beans.PropertyChangeEvent;
020import java.beans.PropertyChangeListener;
021import java.beans.PropertyChangeSupport;
022
023import java.io.Serializable;
024
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.LinkedHashMap;
030import java.util.Map;
031import java.util.Map.Entry;
032import java.util.Objects;
033import java.util.Set;
034
035import java.util.concurrent.CountDownLatch;
036import java.util.concurrent.ExecutorService;
037import java.util.concurrent.Future;
038import java.util.concurrent.ScheduledExecutorService;
039import java.util.concurrent.ScheduledThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041
042import java.util.function.Consumer;
043import java.util.function.Function;
044import java.util.function.Supplier;
045
046import java.util.logging.Level;
047import java.util.logging.Logger;
048
049import io.fabric8.kubernetes.api.model.HasMetadata;
050
051import net.jcip.annotations.GuardedBy;
052import net.jcip.annotations.ThreadSafe;
053
054import org.microbean.development.annotation.Blocking;
055import org.microbean.development.annotation.NonBlocking;
056
057/**
058 * An {@link EventCache} that temporarily stores {@link Event}s in
059 * {@link EventQueue}s, one per named Kubernetes resource, and
060 * {@linkplain #start(Consumer) provides a means for processing those
061 * queues}.
062 *
063 * <h2>Thread Safety</h2>
064 *
065 * <p>This class is safe for concurrent use by multiple {@link
066 * Thread}s.</p>
067 *
068 * <h2>Design Notes</h2>
069 *
070 * <p>This class loosely models the <a
071 * href="https://github.com/kubernetes/client-go/blob/37c3c02ec96533daec0dbda1f39a6b1d68505c79/tools/cache/delta_fifo.go#L96">{@code
072 * DeltaFIFO}</a> type in the Kubernetes Go client {@code tools/cache}
073 * package.</p>
074 *
075 * @param <T> a type of Kubernetes resource
076 *
077 * @author <a href="https://about.me/lairdnelson"
078 * target="_parent">Laird Nelson</a>
079 *
080 * @see #add(Object, AbstractEvent.Type, HasMetadata)
081 *
082 * @see #replace(Collection, Object)
083 *
084 * @see #synchronize()
085 *
086 * @see #start(Consumer)
087 *
088 * @see EventQueue
089 */
090@ThreadSafe
091public class EventQueueCollection<T extends HasMetadata> implements EventCache<T>, Supplier<EventQueue<T>>, AutoCloseable {
092
093
094  /*
095   * Instance fields.
096   */
097
098
099  /**
100   * A {@link PropertyChangeSupport} object that manages {@link
101   * PropertyChangeEvent}s on behalf of this {@link
102   * EventQueueCollection}.
103   *
104   * <p>This field is never {@code null}.</p>
105   *
106   * @see #addPropertyChangeListener(String, PropertyChangeListener)
107   */
108  private final PropertyChangeSupport propertyChangeSupport;
109
110  /**
111   * Whether this {@link EventQueueCollection} is in the process of
112   * {@linkplain #close() closing}.
113   *
114   * @see #close()
115   */
116  private volatile boolean closing;
117
118  /**
119   * Whether or not this {@link EventQueueCollection} has been
120   * populated via an invocation of the {@link #replace(Collection,
121   * Object)} method.
122   *
123   * <p>Mutations of this field must be synchronized on {@code
124   * this}.</p>
125   *
126   * @see #replace(Collection, Object)
127   */
128  @GuardedBy("this")
129  private boolean populated;
130
131  /**
132   * The number of {@link EventQueue}s that this {@link
133   * EventQueueCollection} was initially {@linkplain
134   * #replace(Collection, Object) seeded with}.
135   *
136   * <p>Mutations of this field must be synchronized on {@code
137   * this}.</p>
138   *
139   * @see #replace(Collection, Object)
140   */
141  @GuardedBy("this")
142  private int initialPopulationCount;
143
144  /**
145   * A {@link LinkedHashMap} of {@link EventQueue} instances, indexed
146   * by {@linkplain EventQueue#getKey() their keys}.
147   *
148   * <p>This field is never {@code null}.</p>
149   *
150   * <p>Mutations of the contents of this {@link LinkedHashMap} must
151   * be synchronized on {@code this}.</p>
152   *
153   * @see #add(Object, AbstractEvent.Type, HasMetadata)
154   */
155  @GuardedBy("this")
156  private final LinkedHashMap<Object, EventQueue<T>> eventQueueMap;
157
158  /**
159   * A {@link Map} containing the last known state of Kubernetes
160   * resources this {@link EventQueueCollection} is caching events
161   * for.  This field is used chiefly by the {@link #synchronize()}
162   * method, but by others as well.
163   *
164   * <p>This field may be {@code null}.</p>
165   *
166   * <p>Mutations of this field must be synchronized on this field's
167   * value.</p>
168   *
169   * @see #getKnownObjects()
170   *
171   * @see #synchronize()
172   */
173  @GuardedBy("itself")
174  private final Map<?, ? extends T> knownObjects;
175
176  @GuardedBy("this")
177  private ScheduledExecutorService consumerExecutor;
178
179  @GuardedBy("this")
180  private Future<?> eventQueueConsumptionTask;
181
182  private final Function<? super Throwable, Boolean> errorHandler;
183
184  /**
185   * A {@link Logger} used by this {@link EventQueueCollection}.
186   *
187   * <p>This field is never {@code null}.</p>
188   *
189   * @see #createLogger()
190   */
191  protected final Logger logger;
192
193  /*
194   * Constructors.
195   */
196
197
198  /**
199   * Creates a new {@link EventQueueCollection} with an initial
200   * capacity of {@code 16} and a load factor of {@code 0.75} that is
201   * not interested in tracking Kubernetes resource deletions.
202   *
203   * @see #EventQueueCollection(Map, int, float)
204   */
205  public EventQueueCollection() {
206    this(null, null, 16, 0.75f);
207  }
208
209  /**
210   * Creates a new {@link EventQueueCollection} with an initial
211   * capacity of {@code 16} and a load factor of {@code 0.75}.
212   *
213   * @param knownObjects a {@link Map} containing the last known state
214   * of Kubernetes resources this {@link EventQueueCollection} is
215   * caching events for; may be {@code null} if this {@link
216   * EventQueueCollection} is not interested in tracking deletions of
217   * objects; if non-{@code null} <strong>will be synchronized on by
218   * this class</strong> during retrieval and traversal operations
219   *
220   * @see #EventQueueCollection(Map, int, float)
221   */
222  public EventQueueCollection(final Map<?, ? extends T> knownObjects) {
223    this(knownObjects, null, 16, 0.75f);
224  }
225
226  /**
227   * Creates a new {@link EventQueueCollection}.
228   *
229   * @param knownObjects a {@link Map} containing the last known state
230   * of Kubernetes resources this {@link EventQueueCollection} is
231   * caching events for; may be {@code null} if this {@link
232   * EventQueueCollection} is not interested in tracking deletions of
233   * objects; if non-{@code null} <strong>will be synchronized on by
234   * this class</strong> during retrieval and traversal operations
235   *
236   * @param initialCapacity the initial capacity of the internal data
237   * structure used to house this {@link EventQueueCollection}'s
238   * {@link EventQueue}s; must be an integer greater than {@code 0}
239   *
240   * @param loadFactor the load factor of the internal data structure
241   * used to house this {@link EventQueueCollection}'s {@link
242   * EventQueue}s; must be a positive number between {@code 0} and
243   * {@code 1}
244   */
245  public EventQueueCollection(final Map<?, ? extends T> knownObjects, final int initialCapacity, final float loadFactor) {
246    this(knownObjects, null, initialCapacity, loadFactor);
247  }
248
249  /**
250   * Creates a new {@link EventQueueCollection}.
251   *
252   * @param knownObjects a {@link Map} containing the last known state
253   * of Kubernetes resources this {@link EventQueueCollection} is
254   * caching events for; may be {@code null} if this {@link
255   * EventQueueCollection} is not interested in tracking deletions of
256   * objects; if non-{@code null} <strong>will be synchronized on by
257   * this class</strong> during retrieval and traversal operations
258   *
259   * @param errorHandler a {@link Function} that accepts a {@link
260   * Throwable} and returns a {@link Boolean} indicating whether the
261   * error was handled or not; used to handle truly unanticipated
262   * errors from within a {@link ScheduledThreadPoolExecutor}; may be
263   * {@code null}
264   *
265   * @param initialCapacity the initial capacity of the internal data
266   * structure used to house this {@link EventQueueCollection}'s
267   * {@link EventQueue}s; must be an integer greater than {@code 0}
268   *
269   * @param loadFactor the load factor of the internal data structure
270   * used to house this {@link EventQueueCollection}'s {@link
271   * EventQueue}s; must be a positive number between {@code 0} and
272   * {@code 1}
273   */
274  public EventQueueCollection(final Map<?, ? extends T> knownObjects,
275                              final Function<? super Throwable, Boolean> errorHandler,
276                              final int initialCapacity,
277                              final float loadFactor) {
278    super();
279    final String cn = this.getClass().getName();
280    final String mn = "<init>";
281
282    this.logger = this.createLogger();
283    if (logger == null) {
284      throw new IllegalStateException();
285    }
286    if (this.logger.isLoggable(Level.FINER)) {
287      final String knownObjectsString;
288      if (knownObjects == null) {
289        knownObjectsString = null;
290      } else {
291        synchronized (knownObjects) {
292          knownObjectsString = knownObjects.toString();
293        }
294      }
295      this.logger.entering(cn, mn, new Object[] { knownObjectsString, Integer.valueOf(initialCapacity), Float.valueOf(loadFactor) });
296    }
297
298    this.propertyChangeSupport = new PropertyChangeSupport(this);
299    this.eventQueueMap = new LinkedHashMap<>(initialCapacity, loadFactor);
300    this.knownObjects = knownObjects;
301    if (errorHandler == null) {
302      this.errorHandler = t -> {
303        if (this.logger.isLoggable(Level.SEVERE)) {
304          this.logger.logp(Level.SEVERE, this.getClass().getName(), "<eventQueueConsumptionTask>", t.getMessage(), t);
305        }
306        return true;
307      };
308    } else {
309      this.errorHandler = errorHandler;
310    }
311
312    if (this.logger.isLoggable(Level.FINER)) {
313      this.logger.exiting(cn, mn);
314    }
315  }
316
317
318  /*
319   * Instance methods.
320   */
321
322  /**
323   * Returns a {@link Logger} for use by this {@link
324   * EventQueueCollection}.
325   *
326   * <p>This method never returns {@code null}.</p>
327   *
328   * <p>Overrides of this method must not return {@code null}.</p>
329   *
330   * @return a non-{@code null} {@link Logger} for use by this {@link
331   * EventQueueCollection}
332   */
333  protected Logger createLogger() {
334    return Logger.getLogger(this.getClass().getName());
335  }
336
337  private final Map<?, ? extends T> getKnownObjects() {
338    return this.knownObjects;
339  }
340
341  /**
342   * Returns {@code true} if this {@link EventQueueCollection} is empty.
343   *
344   * @return {@code true} if this {@link EventQueueCollection} is
345   * empty; {@code false} otherwise
346   */
347  private synchronized final boolean isEmpty() {
348    return this.eventQueueMap.isEmpty();
349  }
350
351  /**
352   * Returns {@code true} if this {@link EventQueueCollection} has
353   * been populated via a call to {@link #add(Object, AbstractEvent.Type,
354   * HasMetadata)} at some point, and if there are no {@link
355   * EventQueue}s remaining to be {@linkplain #start(Consumer)
356   * removed}.
357   *
358   * <p>This is a <a
359   * href="https://docs.oracle.com/javase/tutorial/javabeans/writing/properties.html#bound">bound
360   * property</a>.</p>
361   *
362   * @return {@code true} if this {@link EventQueueCollection} has
363   * been populated via a call to {@link #add(Object, AbstractEvent.Type,
364   * HasMetadata)} at some point, and if there are no {@link
365   * EventQueue}s remaining to be {@linkplain #start(Consumer)
366   * removed}; {@code false} otherwise
367   *
368   * @see #replace(Collection, Object)
369   *
370   * @see #add(Object, AbstractEvent.Type, HasMetadata)
371   *
372   * @see #synchronize()
373   */
374  public synchronized final boolean isSynchronized() {
375    return this.populated && this.initialPopulationCount == 0;
376  }
377
378  /**
379   * <strong>Synchronizes on the {@code knownObjects} object</strong>
380   * {@linkplain #EventQueueCollection(Map, int, float) supplied at
381   * construction time}, if there is one, and, for every Kubernetes
382   * resource found within at the time of this call, adds a {@link
383   * SynchronizationEvent} for it with an {@link AbstractEvent.Type}
384   * of {@link AbstractEvent.Type#MODIFICATION}.
385   */
386  @Override
387  public final void synchronize() {
388    final String cn = this.getClass().getName();
389    final String mn = "synchronize";
390    if (this.logger.isLoggable(Level.FINER)) {
391      this.logger.entering(cn, mn);
392    }
393    synchronized (this) {
394      final Map<?, ? extends T> knownObjects = this.getKnownObjects();
395      if (knownObjects != null) {
396        synchronized (knownObjects) {
397          if (!knownObjects.isEmpty()) {
398            final Collection<? extends T> values = knownObjects.values();
399            if (values != null && !values.isEmpty()) {
400              for (final T knownObject : values) {
401                if (knownObject != null) {
402
403                  // We follow the Go code in that we use *our* key
404                  // extraction logic, rather than relying on the
405                  // known key in the knownObjects map.  See
406                  // https://github.com/kubernetes/client-go/blob/37c3c02ec96533daec0dbda1f39a6b1d68505c79/tools/cache/delta_fifo.go#L567.
407                  // I'm not sure this is significant as they should
408                  // evaluate to the same thing, but there may be an
409                  // edge case I'm not thinking of.
410                  final Object key = this.getKey(knownObject);
411                  
412                  if (key != null) {
413                    final EventQueue<T> eventQueue = this.eventQueueMap.get(key);
414                    if (eventQueue == null || eventQueue.isEmpty()) {
415                      // There was an object in our knownObjects map
416                      // somehow, but not in one of the queues we
417                      // manage.  Make sure others know about it.
418
419                      // We make a SynchronizationEvent of type
420                      // MODIFICATION.  shared_informer.go checks in
421                      // its HandleDeltas function to see if oldObj
422                      // exists; if so, it's a modification
423                      // (https://github.com/kubernetes/client-go/blob/37c3c02ec96533daec0dbda1f39a6b1d68505c79/tools/cache/shared_informer.go#L354-L358).
424                      // Here we take action *only if* the equivalent
425                      // of oldObj exists, therefore this is a
426                      // SynchronizationEvent of type MODIFICATION,
427                      // not ADDITION.
428                      this.addSynchronizationEvent(this, AbstractEvent.Type.MODIFICATION, knownObject);
429                    }
430                  }
431
432                }
433              }
434            }
435          }
436        }
437      }
438    }
439    if (this.logger.isLoggable(Level.FINER)) {
440      this.logger.exiting(cn, mn);
441    }
442  }
443
444  /**
445   * At a high level, fully replaces the internal state of this {@link
446   * EventQueueCollection} to reflect only the Kubernetes resources
447   * contained in the supplied {@link Collection}.
448   *
449   * <p>{@link SynchronizationEvent}s of type {@link
450   * AbstractEvent.Type#ADDITION} are added for every resource present
451   * in the {@code incomingResources} parameter.</p>
452   *
453   * <p>{@link Event}s of type {@link AbstractEvent.Type#DELETION} are
454   * added when this {@link EventQueueCollection} can determine that
455   * the lack of a resource's presence in the {@code
456   * incomingResources} parameter indicates that it has been deleted
457   * from Kubernetes.</p>
458   *
459   * <p>(No {@link Event}s of type {@link
460   * AbstractEvent.Type#MODIFICATION} are added by this method.)</p>
461   *
462   * <p>{@link EventQueue}s managed by this {@link
463   * EventQueueCollection} that have not yet {@linkplain
464   * #start(Consumer) been processed} are not removed by this
465   * operation.</p>
466   *
467   * <p><strong>This method synchronizes on the supplied {@code
468   * incomingResources} {@link Collection}</strong> while iterating
469   * over it.</p>
470   *
471   * @param incomingResources the {@link Collection} of Kubernetes
472   * resources with which to replace this {@link
473   * EventQueueCollection}'s internal state; <strong>will be
474   * synchronized on</strong>; may be {@code null} or {@linkplain
475   * Collection#isEmpty() empty}, which will be taken as an indication
476   * that this {@link EventQueueCollection} should effectively be
477   * emptied
478   *
479   * @param resourceVersion the version of the Kubernetes list
480   * resource that contained the incoming resources; currently
481   * ignored but reserved for future use; may be {@code null}
482   *
483   * @exception IllegalStateException if the {@link
484   * #createEvent(Object, AbstractEvent.Type, HasMetadata)} method returns
485   * {@code null} for any reason
486   *
487   * @see SynchronizationEvent
488   *
489   * @see #createEvent(Object, AbstractEvent.Type, HasMetadata)
490   */
491  @Override
492  public synchronized final void replace(final Collection<? extends T> incomingResources, final Object resourceVersion) {
493    final String cn = this.getClass().getName();
494    final String mn = "replace";
495    if (this.logger.isLoggable(Level.FINER)) {
496      final String incomingResourcesString;
497      if (incomingResources == null) {
498        incomingResourcesString = null;
499      } else {
500        synchronized (incomingResources) {
501          incomingResourcesString = incomingResources.toString();
502        }
503      }
504      this.logger.entering(cn, mn, new Object[] { incomingResourcesString, resourceVersion });
505    }
506
507    // Keep track of our old synchronization state; we'll fire a
508    // JavaBeans bound property at the bottom of this method.
509    final boolean oldSynchronized = this.isSynchronized();
510    
511    final int size;
512    final Set<Object> replacementKeys;
513
514    // Process all the additions first.
515    if (incomingResources == null) {
516      size = 0;
517      replacementKeys = Collections.emptySet();
518    } else {
519      synchronized (incomingResources) {
520        if (incomingResources.isEmpty()) {
521          size = 0;
522          replacementKeys = Collections.emptySet();
523        } else {
524          size = incomingResources.size();
525          assert size > 0;
526          replacementKeys = new HashSet<>();
527          for (final T resource : incomingResources) {
528            if (resource != null) {
529              replacementKeys.add(this.getKey(resource));
530              // The final boolean parameter indicates that we don't
531              // want our populated status to be set by this call.  We
532              // do this at the bottom of this method ourselves.
533              this.addSynchronizationEvent(this, AbstractEvent.Type.ADDITION, resource, false);
534            }
535          }
536        }
537      }
538    }
539
540    // Now process deletions.
541
542    int queuedDeletions = 0;
543
544    final Map<?, ? extends T> knownObjects = this.getKnownObjects();
545    if (knownObjects == null) {
546
547      // No one is keeping track of known objects.  The best we can do
548      // is: if there's an EventQueue currently being processed, then
549      // if we get here the object it concerns itself with in
550      // Kubernetes is gone.  We need to synthesize a deletion event
551      // to say, effectively, "The object was deleted but we don't
552      // know what its prior state was".
553
554      for (final EventQueue<T> eventQueue : this.eventQueueMap.values()) {
555        assert eventQueue != null;
556
557        final Object key;
558        final AbstractEvent<? extends T> newestEvent;
559        synchronized (eventQueue) {
560
561          if (eventQueue.isEmpty()) {
562            key = null;
563            newestEvent = null;
564            throw new IllegalStateException("eventQueue.isEmpty(): " + eventQueue);
565          }
566
567          key = eventQueue.getKey();
568          if (key == null) {
569            throw new IllegalStateException();
570          }
571
572          if (replacementKeys.contains(key)) {
573            newestEvent = null;
574          } else {
575            // We have an EventQueue indexed under a key that
576            // identifies a resource that no longer exists in
577            // Kubernetes.  Inform any consumers via a deletion event
578            // that this object was removed at some point from
579            // Kubernetes.  The state of the object in question is
580            // indeterminate.
581            newestEvent = eventQueue.getLast();
582            assert newestEvent != null;
583          }
584
585        }
586
587        if (newestEvent != null) {
588          assert key != null;
589          // We grab the last event in the queue in question and get
590          // its resource; this will serve as the state of the
591          // Kubernetes resource in question the last time we knew
592          // about it.  This state is not necessarily, but could be,
593          // the true actual last state of the resource in question.
594          // The point is, the true state of the object when it was
595          // deleted is unknown.  We build a new event to reflect all
596          // this.
597          //
598          // Astute readers will realize that this could result in two
599          // DELETION events enqueued, back to back, with identical
600          // payloads.  See the deduplicate() method in EventQueue,
601          // which takes care of this situation.
602          final T resourceToBeDeleted = newestEvent.getResource();
603          assert resourceToBeDeleted != null;
604          final Event<T> event = this.createEvent(this, AbstractEvent.Type.DELETION, resourceToBeDeleted);
605          if (event == null) {
606            throw new IllegalStateException("createEvent() == null");
607          }
608          event.setKey(key);
609          // The final boolean parameter indicates that we don't want
610          // our populated status to be set by this call.  We do this
611          // at the bottom of this method ourselves.
612          this.add(event, false);
613        }
614      }
615
616    } else {
617      assert knownObjects != null;
618
619      // We're keeping track of known objects, so fire deletion events
620      // if objects were removed from Kubernetes.
621
622      synchronized (knownObjects) {
623
624        if (!knownObjects.isEmpty()) {
625          final Collection<? extends Entry<?, ? extends T>> entrySet = knownObjects.entrySet();
626          if (entrySet != null && !entrySet.isEmpty()) {
627            for (final Entry<?, ? extends T> entry : entrySet) {
628              if (entry != null) {
629                final Object knownKey = entry.getKey();
630                if (!replacementKeys.contains(knownKey)) {
631                  final Event<T> event = this.createEvent(this, AbstractEvent.Type.DELETION, entry.getValue());
632                  if (event == null) {
633                    throw new IllegalStateException("createEvent() == null");
634                  }
635                  event.setKey(knownKey);
636                  // The final boolean parameter (false) indicates
637                  // that we don't want our populated status to be set
638                  // by this call.  We do this at the bottom of this
639                  // method ourselves.
640                  this.add(event, false);
641                  queuedDeletions++;
642                }
643              }
644            }
645          }
646        }
647
648      }
649
650    }
651
652    if (!this.populated) {
653      this.populated = true;
654      assert size >= 0;
655      assert queuedDeletions >= 0;
656      final int oldInitialPopulationCount = this.initialPopulationCount;
657      this.initialPopulationCount = size + queuedDeletions;
658      this.firePropertyChange("populated", false, true);
659      this.firePropertyChange("initialPopulationCount", oldInitialPopulationCount, this.initialPopulationCount);
660      if (this.initialPopulationCount == 0) {
661        // We know that we are now synchronized because the definition
662        // of being synchronized is to have an initialPopulationCount
663        // of 0 and a populated status of true.
664        assert this.isSynchronized();
665        this.firePropertyChange("synchronized", oldSynchronized, true);
666      }
667    }
668
669    if (this.logger.isLoggable(Level.FINER)) {
670      this.logger.exiting(cn, mn);
671    }
672  }
673
674  /**
675   * Returns an {@link Object} which will be used as the key that will
676   * uniquely identify the supplied {@code resource} to this {@link
677   * EventQueueCollection}.
678   *
679   * <p>This method may return {@code null}, but only if {@code
680   * resource} is {@code null} or is constructed in such a way that
681   * its {@link HasMetadata#getMetadata()} method returns {@code
682   * null}.</p>
683   *
684   * <p>Overrides of this method may return {@code null}, but only if
685   * {@code resource} is {@code null}.
686   *
687   * <p>The default implementation of this method returns the return
688   * value of the {@link HasMetadatas#getKey(HasMetadata)} method.</p>
689   *
690   * @param resource a {@link HasMetadata} for which a key should be
691   * returned; may be {@code null} in which case {@code null} may be
692   * returned
693   *
694   * @return a non-{@code null} key for the supplied {@code resource};
695   * or {@code null} if {@code resource} is {@code null}
696   *
697   * @see HasMetadatas#getKey(HasMetadata)
698   */
699  protected Object getKey(final T resource) {
700    final String cn = this.getClass().getName();
701    final String mn = "getKey";
702    if (this.logger.isLoggable(Level.FINER)) {
703      this.logger.entering(cn, mn, resource);
704    }
705    final Object returnValue = HasMetadatas.getKey(resource);
706    if (this.logger.isLoggable(Level.FINER)) {
707      this.logger.exiting(cn, mn, returnValue);
708    }
709    return returnValue;
710  }
711
712  /**
713   * Creates a new {@link EventQueue} suitable for holding {@link
714   * Event}s {@linkplain Event#getKey() matching} the supplied {@code
715   * key}.
716   *
717   * <p>This method never returns {@code null}.</p>
718   *
719   * <p>Overrides of this method must not return {@code null}.</p>
720   *
721   * @param key the key {@linkplain EventQueue#getKey() for the new
722   * <code>EventQueue</code>}; must not be {@code null}
723   *
724   * @return the new {@link EventQueue}; never {@code null}
725   *
726   * @exception NullPointerException if {@code key} is {@code null}
727   *
728   * @see EventQueue#EventQueue(Object)
729   */
730  protected EventQueue<T> createEventQueue(final Object key) {
731    final String cn = this.getClass().getName();
732    final String mn = "createEventQueue";
733    if (this.logger.isLoggable(Level.FINER)) {
734      this.logger.entering(cn, mn, key);
735    }
736    final EventQueue<T> returnValue = new EventQueue<T>(key);
737    if (this.logger.isLoggable(Level.FINER)) {
738      this.logger.exiting(cn, mn, returnValue);
739    }
740    return returnValue;
741  }
742
743  /**
744   * Starts a new {@link Thread} that, until {@link #close()} is
745   * called, removes {@link EventQueue}s from this {@link
746   * EventQueueCollection} and supplies them to the supplied {@link
747   * Consumer}, and returns a {@link Future} representing this task.
748   *
749   * <p>This method never returns {@code null}.</p>
750   *
751   * <p>If this method has been called before, then the existing
752   * {@link Future} representing the task that was scheduled is
753   * returned instead.</p>
754   *
755   * <p>Invoking this method does not block the calling {@link
756   * Thread}.</p>
757   *
758   * <p>The non-{@code null} {@link Future} that is returned will not
759   * return {@code true} from its {@linkplain Future#isDone()} method
760   * unless {@linkplain Future#cancel(boolean) it has been cancelled}
761   * or an exception has occurred.  That is, the task represented by
762   * the returned {@link Future} is never-ending under normal
763   * circumstances.</p>
764   *
765   * @param eventQueueConsumer the {@link Consumer} that will process
766   * each {@link EventQueue} as it becomes ready; must not be {@code
767   * null}.  If this {@link Consumer}'s {@link
768   * Consumer#accept(Object)} method throws a {@link
769   * TransientException}, then the {@link EventQueue} that was
770   * supplied to it will be re-enqueued and at some point in the
771   * future this {@link Consumer} will have a chance to re-process it.
772   *
773   * @return a {@link Future} representing the task that is feeding
774   * {@link EventQueue}s to the supplied {@link Consumer}; never
775   * {@code null}; suitable only for {@linkplain
776   * Future#cancel(boolean) cancellation}
777   *
778   * @exception NullPointerException if {@code eventQueueConsumer} is
779   * {@code null}
780   */
781  @NonBlocking
782  public final Future<?> start(final Consumer<? super EventQueue<? extends T>> eventQueueConsumer) {
783    final String cn = this.getClass().getName();
784    final String mn = "start";
785    if (this.logger.isLoggable(Level.FINER)) {
786      this.logger.entering(cn, mn, eventQueueConsumer);
787    }
788
789    Objects.requireNonNull(eventQueueConsumer);
790
791    final Future<?> returnValue;
792
793    synchronized (this) {
794      
795      if (this.consumerExecutor == null) {
796        this.consumerExecutor = createScheduledThreadPoolExecutor();
797        assert this.consumerExecutor != null : "createScheduledThreadPoolExecutor() == null";
798      }
799
800      if (this.eventQueueConsumptionTask == null) {
801        // This task is scheduled, rather than simply executed, so
802        // that if it terminates exceptionally it will be restarted
803        // after one second.  The task could have been written to do
804        // this "scheduling" itself (i.e. it could restart a loop
805        // itself in the presence of an exception) but we follow the
806        // Go code idiom here.
807        this.eventQueueConsumptionTask =
808          this.consumerExecutor.scheduleWithFixedDelay(this.createEventQueueConsumptionTask(eventQueueConsumer),
809                                                       0L,
810                                                       1L,
811                                                       TimeUnit.SECONDS);
812      }
813      assert this.eventQueueConsumptionTask != null;
814      
815      returnValue = this.eventQueueConsumptionTask;
816
817    }
818
819    if (this.logger.isLoggable(Level.FINER)) {
820      this.logger.exiting(cn, mn, returnValue);
821    }
822    return returnValue;
823  }
824
825  private static final ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor() {
826    final ScheduledThreadPoolExecutor returnValue = new ScheduledThreadPoolExecutor(1);
827    returnValue.setRemoveOnCancelPolicy(true);
828    return returnValue;
829  }
830
831  /**
832   * Semantically closes this {@link EventQueueCollection} by
833   * detaching any {@link Consumer} previously attached via the {@link
834   * #start(Consumer)} method.  {@linkplain #add(Object, AbstractEvent.Type,
835   * HasMetadata) Additions}, {@linkplain #replace(Collection, Object)
836   * replacements} and {@linkplain #synchronize() synchronizations}
837   * are still possible, but there won't be anything consuming any
838   * events generated by or supplied to these operations.
839   *
840   * <p>A closed {@link EventQueueCollection} may be {@linkplain
841   * #start(Consumer) started} again.</p>
842   *
843   * @see #start(Consumer)
844   */
845  @Override
846  public final void close() {
847    final String cn = this.getClass().getName();
848    final String mn = "close";
849    if (this.logger.isLoggable(Level.FINER)) {
850      this.logger.entering(cn, mn);
851    }
852
853    // (closing is a volatile field.)
854    this.closing = true;
855
856    try {
857
858      final ScheduledExecutorService consumerExecutor;
859      synchronized (this) {
860        // We keep this synchronized block as small as we can since in
861        // other areas in the code there are threads holding this
862        // object's monitor.
863        //
864        // Cancel the consumer pump task and begin the lengthy process
865        // of shutting down the pump itself.
866        if (this.eventQueueConsumptionTask != null) {
867          this.eventQueueConsumptionTask.cancel(true);
868          this.eventQueueConsumptionTask = null;
869        }
870        consumerExecutor = this.consumerExecutor;
871        this.consumerExecutor = null;
872      }
873
874      if (consumerExecutor != null) {
875
876        // Stop accepting new tasks.
877        consumerExecutor.shutdown();
878
879        // Cancel all running tasks firmly (there shouldn't be any,
880        // but it's the right thing to do).
881        consumerExecutor.shutdownNow();
882
883        try {
884          // Wait for termination to complete normally.  This should
885          // complete instantly because there aren't any running
886          // tasks.
887          if (!consumerExecutor.awaitTermination(60, TimeUnit.SECONDS) && this.logger.isLoggable(Level.WARNING)) {
888            this.logger.logp(Level.WARNING, cn, mn, "this.consumerExecutor.awaitTermination() failed");
889          }
890        } catch (final InterruptedException interruptedException) {
891          Thread.currentThread().interrupt();
892        }
893      }
894
895    } finally {
896      this.closing = false;
897    }
898
899    assert this.eventQueueConsumptionTask == null;
900    assert this.consumerExecutor == null;
901
902    if (this.logger.isLoggable(Level.FINER)) {
903      this.logger.exiting(cn, mn);
904    }
905  }
906
907  /**
908   * Creates and returns a {@link Runnable} that will serve as the
909   * "body" of a never-ending task that {@linkplain #get() removes the
910   * eldest <code>EventQueue</code>} from this {@link
911   * EventQueueCollection} and {@linkplain Consumer#accept(Object)
912   * supplies it to the supplied <code>eventQueueConsumer</code>}.
913   *
914   * <p>This method never returns {@code null}.</p>
915   *
916   * @param eventQueueConsumer the {@link Consumer} that will act upon
917   * the eldest {@link EventQueue} in this {@link
918   * EventQueueCollection}; must not be {@code null}
919   *
920   * @return a new {@link Runnable}; never {@code null}
921   *
922   * @exception NullPointerException if {@code eventQueueConsumer} is
923   * {@code null}
924   */
925  private final Runnable createEventQueueConsumptionTask(final Consumer<? super EventQueue<? extends T>> eventQueueConsumer) {
926    Objects.requireNonNull(eventQueueConsumer);
927    final Runnable returnValue = () -> {
928      // This Runnable loosely models the processLoop() function in
929      // https://github.com/kubernetes/kubernetes/blob/v1.9.0/staging/src/k8s.io/client-go/tools/cache/controller.go#L139-L161.
930      try {
931        while (!Thread.currentThread().isInterrupted()) {
932          // Note that get() *removes* an EventQueue from this
933          // EventQueueCollection, blocking until one is available.
934          @Blocking          
935          final EventQueue<T> eventQueue = this.get();
936          if (eventQueue != null) {
937            Throwable unhandledThrowable = null;
938            synchronized (eventQueue) {
939              try {
940                eventQueueConsumer.accept(eventQueue);
941              } catch (final TransientException transientException) {
942                this.eventQueueMap.putIfAbsent(eventQueue.getKey(), eventQueue);
943              } catch (final Throwable e) {
944                unhandledThrowable = e;
945              }
946            }
947            if (unhandledThrowable != null && !this.errorHandler.apply(unhandledThrowable)) {
948              if (unhandledThrowable instanceof RuntimeException) {
949                throw (RuntimeException)unhandledThrowable;
950              } else if (unhandledThrowable instanceof Error) {
951                throw (Error)unhandledThrowable;
952              } else {
953                assert !(unhandledThrowable instanceof Exception) : "unhandledThrowable instanceof Exception: " + unhandledThrowable;
954              }
955            }
956          }
957        }
958      } catch (final RuntimeException | Error unhandledRuntimeExceptionOrError) {
959        // This RuntimeException or Error was almost certainly
960        // supplied to our error handler, who had a chance to process
961        // it, but who rejected it for one reason or another.  As a
962        // last attempt to make sure it is noticed, we don't just
963        // throw this RuntimeException or Error but also log it
964        // because it is very easy for a Runnable submitted to an
965        // ExecutorService to have its RuntimeExceptions and Errors
966        // disappear into the ether.  Yes, this is a case where we're
967        // both logging and throwing, but the redundancy in this case
968        // is worth it: if an error handler failed, then things are
969        // really bad.
970        if (logger.isLoggable(Level.SEVERE)) {
971          logger.logp(Level.SEVERE,
972                      this.getClass().getName(),
973                      "<eventQueueConsumptionTask>",
974                      unhandledRuntimeExceptionOrError.getMessage(),
975                      unhandledRuntimeExceptionOrError);
976        }
977        throw unhandledRuntimeExceptionOrError;
978      }
979    };
980    return returnValue;
981  }
982
983  /**
984   * Implements the {@link Supplier#get()} contract by
985   * <strong>removing</strong> and returning an {@link EventQueue} if
986   * one is available, <strong>blocking if one is not</strong> and
987   * returning {@code null} only if the {@linkplain Thread#interrupt()
988   * current thread is interrupted} or this {@linkplain
989   * EventQueueCollection#close() <code>EventQueueCollection</code> is
990   * closing}.
991   *
992   * <h2>Design Notes</h2>
993   *
994   * <p>This method calls an internal method that models the <a
995   * href="https://github.com/kubernetes/client-go/blob/03bfb9bdcfe5482795b999f39ca3ed9ad42ce5bb/tools/cache/delta_fifo.go#L407-L453">{@code
996   * Pop} function in {@code delta_fifo.go}</a>.</p>
997   *
998   * @return an {@link EventQueue}, or {@code null}
999   */
1000  @Blocking
1001  @Override
1002  public final EventQueue<T> get() {
1003    final String cn = this.getClass().getName();
1004    final String mn = "get";
1005    if (this.logger.isLoggable(Level.FINER)) {
1006      this.logger.entering(cn, mn);
1007    }
1008
1009    EventQueue<T> returnValue = null;
1010    try {
1011      returnValue = this.take();
1012    } catch (final InterruptedException interruptedException) {
1013      Thread.currentThread().interrupt();
1014      returnValue = null;
1015    }
1016
1017    if (this.logger.isLoggable(Level.FINER)) {
1018      this.logger.exiting(cn, mn, returnValue);
1019    }
1020    return returnValue;
1021  }
1022
1023  /**
1024   * Blocks until there is an {@link EventQueue} in this {@link
1025   * EventQueueCollection} available for removal whereupon it is
1026   * removed and returned.
1027   *
1028   * <p>This method may return {@code null} if it is invoked while
1029   * this {@linkplain #close() <code>EventQueueCollection</code> is
1030   * closing}.</p>
1031   *
1032   * <h2>Design Notes</h2>
1033   *
1034   * <p>This method models the <a
1035   * href="https://github.com/kubernetes/client-go/blob/03bfb9bdcfe5482795b999f39ca3ed9ad42ce5bb/tools/cache/delta_fifo.go#L407-L453">{@code
1036   * Pop} function in {@code delta_fifo.go}</a>.</p>
1037   *
1038   * <p>This method is called internally only by the {@link #get()}
1039   * method, which is its {@code public}-facing counterpart.  {@link
1040   * #get()} cannot, by contract, throw an {@link
1041   * InterruptedException}; hence this method.</p>
1042   *
1043   * @return the {@link EventQueue} that was removed (taken), or, in
1044   * exceptional circumstances, {@code null}
1045   *
1046   * @exception InterruptedException if the {@link
1047   * Thread#currentThread() current thread} was interrupted
1048   */
1049  @Blocking
1050  private final EventQueue<T> take() throws InterruptedException {
1051    final String cn = this.getClass().getName();
1052    final String mn = "take";
1053    if (this.logger.isLoggable(Level.FINER)) {
1054      this.logger.entering(cn, mn);
1055    }
1056
1057    final EventQueue<T> returnValue;
1058    synchronized (this) {
1059      while (this.isEmpty() && !this.closing) {
1060        this.wait(); // blocks; see add() for corresponding notifyAll()
1061      }
1062
1063      // We can assert this because add() is the method that calls
1064      // notifyAll(), and add() sets populated to true before it calls
1065      // notifyAll().  The notifyAll() call is the only way that the
1066      // blocking wait() call above will exit other than interruption,
1067      // and if it exited via interruption, an InterruptedException
1068      // would have been thrown and we'd never get here.
1069      assert this.populated : "this.populated == false";
1070
1071      if (this.isEmpty()) {
1072        assert this.closing : "this.isEmpty() && !this.closing";
1073        returnValue = null;
1074      } else {
1075
1076        // Pop the first EventQueue off and return it.
1077        final Iterator<EventQueue<T>> iterator = this.eventQueueMap.values().iterator();
1078        assert iterator != null;
1079        assert iterator.hasNext();
1080        returnValue = iterator.next();
1081        assert returnValue != null;
1082        iterator.remove();
1083
1084        if (this.initialPopulationCount > 0) {
1085          // We know we're not populated and our
1086          // initialPopulationCount is not 0, so therefore we are not
1087          // synchronized.  See isSynchronized().
1088          assert !this.isSynchronized();
1089          final int oldInitialPopulationCount = this.initialPopulationCount;
1090          this.initialPopulationCount--;
1091          this.firePropertyChange("initialPopulationCount", oldInitialPopulationCount, this.initialPopulationCount);
1092          this.firePropertyChange("synchronized", false, this.isSynchronized());
1093        }
1094        this.firePropertyChange("empty", false, this.isEmpty());
1095      }
1096    }
1097
1098    if (this.logger.isLoggable(Level.FINER)) {
1099      final String eventQueueString;
1100      synchronized (returnValue) {
1101        eventQueueString = returnValue.toString();
1102      }
1103      this.logger.exiting(cn, mn, eventQueueString);
1104    }
1105    return returnValue;
1106  }
1107
1108  /**
1109   * Creates an {@link Event} using the supplied raw materials and
1110   * returns it.
1111   *
1112   * <p>This method never returns {@code null}.</p>
1113   *
1114   * <p>Implementations of this method must not return {@code
1115   * null}.</p>
1116   *
1117   * <p>Implementations of this method must return a new {@link Event}
1118   * with every invocation.</p>
1119   *
1120   * @param source the {@linkplain Event#getSource() source} of the
1121   * {@link Event} that will be created; must not be null
1122   *
1123   * @param eventType the {@linkplain Event#getType() type} of {@link
1124   * Event} that will be created; must not be {@code null}
1125   *
1126   * @param resource the {@linkplain Event#getResource() resource} of
1127   * the {@link Event} that will be created; must not be
1128   * {@code null}
1129   *
1130   * @return the created {@link Event}; never {@code null}
1131   *
1132   * @exception NullPointerException if any parameter is {@code null}
1133   */
1134  protected Event<T> createEvent(final Object source, final AbstractEvent.Type eventType, final T resource) {
1135    final String cn = this.getClass().getName();
1136    final String mn = "createEvent";
1137    if (this.logger.isLoggable(Level.FINER)) {
1138      this.logger.entering(cn, mn, new Object[] { source, eventType, resource });
1139    }
1140
1141    Objects.requireNonNull(source);
1142    Objects.requireNonNull(eventType);
1143    Objects.requireNonNull(resource);
1144    final Event<T> returnValue = new Event<>(source, eventType, null, resource);
1145
1146    if (this.logger.isLoggable(Level.FINER)) {
1147      this.logger.exiting(cn, mn, returnValue);
1148    }
1149    return returnValue;
1150  }
1151
1152  /**
1153   * Creates a {@link SynchronizationEvent} using the supplied raw
1154   * materials and returns it.
1155   *
1156   * <p>This method never returns {@code null}.</p>
1157   *
1158   * <p>Implementations of this method must not return {@code
1159   * null}.</p>
1160   *
1161   * <p>Implementations of this method must return a new {@link
1162   * SynchronizationEvent} with every invocation.</p>
1163   *
1164   * @param source the {@linkplain SynchronizationEvent#getSource()
1165   * source} of the {@link SynchronizationEvent} that will be created;
1166   * must not be null
1167   *
1168   * @param eventType the {@linkplain Event#getType() type} of {@link
1169   * SynchronizationEvent} that will be created; must not be {@code
1170   * null}
1171   *
1172   * @param resource the {@linkplain Event#getResource() resource} of
1173   * the {@link SynchronizationEvent} that will be created; must not
1174   * be {@code null}
1175   *
1176   * @return the created {@link SynchronizationEvent}; never {@code
1177   * null}
1178   *
1179   * @exception NullPointerException if any parameter is {@code null}
1180   */
1181  protected SynchronizationEvent<T> createSynchronizationEvent(final Object source, final AbstractEvent.Type eventType, final T resource) {
1182    final String cn = this.getClass().getName();
1183    final String mn = "createSynchronizationEvent";
1184    if (this.logger.isLoggable(Level.FINER)) {
1185      this.logger.entering(cn, mn, new Object[] { source, eventType, resource });
1186    }
1187
1188    Objects.requireNonNull(source);
1189    Objects.requireNonNull(eventType);
1190    Objects.requireNonNull(resource);
1191    final SynchronizationEvent<T> returnValue = new SynchronizationEvent<>(source, eventType, null, resource);
1192
1193    if (this.logger.isLoggable(Level.FINER)) {
1194      this.logger.exiting(cn, mn, returnValue);
1195    }
1196    return returnValue;
1197  }
1198
1199  private final SynchronizationEvent<T> addSynchronizationEvent(final Object source,
1200                                                                final AbstractEvent.Type eventType,
1201                                                                final T resource) {
1202    return this.addSynchronizationEvent(source, eventType, resource, true);
1203  }
1204  
1205  private final SynchronizationEvent<T> addSynchronizationEvent(final Object source,
1206                                                                final AbstractEvent.Type eventType,
1207                                                                final T resource,
1208                                                                final boolean populate) {
1209    final String cn = this.getClass().getName();
1210    final String mn = "addSynchronizationEvent";
1211    if (this.logger.isLoggable(Level.FINER)) {
1212      this.logger.entering(cn, mn, new Object[] { source, eventType, resource, Boolean.valueOf(populate) });
1213    }
1214
1215    Objects.requireNonNull(source);
1216    Objects.requireNonNull(eventType);
1217    Objects.requireNonNull(resource);
1218
1219    if (!(eventType.equals(AbstractEvent.Type.ADDITION) || eventType.equals(AbstractEvent.Type.MODIFICATION))) {
1220      throw new IllegalArgumentException("Illegal eventType: " + eventType);
1221    }
1222
1223    final SynchronizationEvent<T> event = this.createSynchronizationEvent(source, eventType, resource);
1224    if (event == null) {
1225      throw new IllegalStateException("createSynchronizationEvent() == null");
1226    }
1227    final SynchronizationEvent<T> returnValue = this.add(event, populate);
1228
1229    if (this.logger.isLoggable(Level.FINER)) {
1230      this.logger.exiting(cn, mn, returnValue);
1231    }
1232    return returnValue;
1233  }
1234
1235  /**
1236   * Adds a new {@link Event} constructed out of the parameters
1237   * supplied to this method to this {@link EventQueueCollection} and
1238   * returns the {@link Event} that was added.
1239   *
1240   * <p>This method may return {@code null}.</p>
1241   *
1242   * <p>This implementation {@linkplain #createEventQueue(Object)
1243   * creates an <code>EventQueue</code>} if necessary for the {@link
1244   * Event} that will be added, and then adds the new {@link Event} to
1245   * the queue.</p>
1246   *
1247   * @param source the {@linkplain Event#getSource() source} of the
1248   * {@link Event} that will be created and added; must not be null
1249   *
1250   * @param eventType the {@linkplain Event#getType() type} of {@link
1251   * Event} that will be created and added; must not be {@code null}
1252   *
1253   * @param resource the {@linkplain Event#getResource() resource} of
1254   * the {@link Event} that will be created and added; must not be
1255   * {@code null}
1256   *
1257   * @return the {@link Event} that was created and added, or {@code
1258   * null} if no {@link Event} was actually added as a result of this
1259   * method's invocation
1260   *
1261   * @exception NullPointerException if any of the parameters is
1262   * {@code null}
1263   *
1264   * @see Event
1265   */
1266  @Override
1267  public final Event<T> add(final Object source, final AbstractEvent.Type eventType, final T resource) {
1268    return this.add(source, eventType, resource, true);
1269  }
1270
1271  /**
1272   * Adds a new {@link Event} constructed out of the parameters
1273   * supplied to this method to this {@link EventQueueCollection} and
1274   * returns the {@link Event} that was added.
1275   *
1276   * <p>This method may return {@code null}.</p>
1277   *
1278   * <p>This implementation {@linkplain #createEventQueue(Object)
1279   * creates an <code>EventQueue</code>} if necessary for the {@link
1280   * Event} that will be added, and then adds the new {@link Event} to
1281   * the queue.</p>
1282   *
1283   * @param source the {@linkplain Event#getSource() source} of the
1284   * {@link Event} that will be created and added; must not be null
1285   *
1286   * @param eventType the {@linkplain Event#getType() type} of {@link
1287   * Event} that will be created and added; must not be {@code null}
1288   *
1289   * @param resource the {@linkplain Event#getResource() resource} of
1290   * the {@link Event} that will be created and added; must not be
1291   * {@code null}
1292   *
1293   * @param populate if {@code true} then this {@link
1294   * EventQueueCollection} will be internally marked as <em>initially
1295   * populated</em>
1296   *
1297   * @return the {@link Event} that was created and added, or {@code
1298   * null} if no {@link Event} was actually added as a result of this
1299   * method's invocation
1300   *
1301   * @exception NullPointerException if any of the parameters is
1302   * {@code null}
1303   *
1304   * @see Event
1305   */
1306  private final Event<T> add(final Object source,
1307                             final AbstractEvent.Type eventType,
1308                             final T resource,
1309                             final boolean populate) {
1310    final String cn = this.getClass().getName();
1311    final String mn = "add";
1312    if (this.logger.isLoggable(Level.FINER)) {
1313      this.logger.entering(cn, mn, new Object[] { source, eventType, resource, Boolean.valueOf(populate)});
1314    }
1315
1316    final Event<T> event = this.createEvent(source, eventType, resource);
1317    if (event == null) {
1318      throw new IllegalStateException("createEvent() == null");
1319    }
1320    final Event<T> returnValue = this.add(event, populate);
1321
1322    if (this.logger.isLoggable(Level.FINER)) {
1323      this.logger.exiting(cn, mn, returnValue);
1324    }
1325    return returnValue;
1326  }
1327
1328  /**
1329   * Adds the supplied {@link Event} to this {@link
1330   * EventQueueCollection} and returns the {@link Event} that was
1331   * added.
1332   *
1333   * <p>This method may return {@code null}.</p>
1334   *
1335   * <p>This implementation {@linkplain #createEventQueue(Object)
1336   * creates an <code>EventQueue</code>} if necessary for the {@link
1337   * Event} that will be added, and then adds the new {@link Event} to
1338   * the queue.</p>
1339   *
1340   * @param <E> an {@link AbstractEvent} type that is both consumed
1341   * and returned
1342   *
1343   * @param event the {@link Event} to add; must not be {@code null}
1344   *
1345   * @param populate if {@code true} then this {@link
1346   * EventQueueCollection} will be internally marked as <em>initially
1347   * populated</em>
1348   *
1349   * @return the {@link Event} that was created and added, or {@code
1350   * null} if no {@link Event} was actually added as a result of this
1351   * method's invocation
1352   *
1353   * @exception NullPointerException if any of the parameters is
1354   * {@code null}
1355   *
1356   * @see Event
1357   */
1358  private final <E extends AbstractEvent<T>> E add(final E event, final boolean populate) {
1359    final String cn = this.getClass().getName();
1360    final String mn = "add";
1361    if (this.logger.isLoggable(Level.FINER)) {
1362      this.logger.entering(cn, mn, new Object[] { event, Boolean.valueOf(populate) });
1363    }
1364
1365    if (this.closing) {
1366      throw new IllegalStateException();
1367    }
1368
1369    Objects.requireNonNull(event);
1370
1371    final Object key = event.getKey();
1372    if (key == null) {
1373      throw new IllegalArgumentException("event.getKey() == null");
1374    }
1375
1376    E returnValue = null;
1377
1378    synchronized (this) {
1379
1380      if (populate && !this.populated) {
1381        this.populated = true;
1382        // TODO: too early?
1383        this.firePropertyChange("populated", false, true);
1384      }
1385
1386      EventQueue<T> eventQueue = this.eventQueueMap.get(key);
1387
1388      final boolean eventQueueExisted = eventQueue != null;
1389
1390      if (!eventQueueExisted) {
1391        eventQueue = this.createEventQueue(key);
1392        if (eventQueue == null) {
1393          throw new IllegalStateException("createEventQueue(key) == null: " + key);
1394        }
1395      }
1396      assert eventQueue != null;
1397
1398      final boolean eventAdded;
1399      final boolean eventQueueIsEmpty;
1400      synchronized (eventQueue) {
1401        eventAdded = eventQueue.addEvent(event);
1402        // Adding an event to an EventQueue can result in compression,
1403        // which may result in the EventQueue becoming empty as a
1404        // result of the add operation.
1405        eventQueueIsEmpty = eventQueue.isEmpty();
1406      }
1407
1408      if (eventAdded) {
1409        returnValue = event;
1410      }
1411
1412      if (eventQueueIsEmpty) {
1413        // Compression might have emptied the queue, so an add could
1414        // result in an empty queue.  We don't permit empty queues.
1415        if (eventQueueExisted) {
1416          returnValue = null;
1417          final boolean oldEmpty = this.isEmpty();
1418          final Object oldEventQueue = this.eventQueueMap.remove(key);
1419          assert oldEventQueue != null;
1420          this.firePropertyChange("empty", oldEmpty, this.isEmpty());
1421        } else {
1422          // Nothing to do; the queue we added the event to was
1423          // created here, and was never added to our internal eventQueueMap, so
1424          // we're done.
1425        }
1426      } else if (!eventQueueExisted) {
1427        // We created the EventQueue we just added to; now we need to
1428        // store it.
1429        final boolean oldEmpty = this.isEmpty();
1430        final Object oldEventQueue = this.eventQueueMap.put(key, eventQueue);
1431        assert oldEventQueue == null;
1432        this.firePropertyChange("empty", oldEmpty, this.isEmpty());
1433        // Notify anyone blocked on our empty state that we're no
1434        // longer empty.
1435        this.notifyAll();
1436      }
1437
1438    }
1439
1440    if (this.logger.isLoggable(Level.FINER)) {
1441      this.logger.exiting(cn, mn, returnValue);
1442    }
1443    return returnValue;
1444  }
1445
1446
1447  /*
1448   * PropertyChangeListener support.
1449   */
1450
1451
1452  /**
1453   * Adds the supplied {@link PropertyChangeListener} to this {@link
1454   * EventQueueCollection}'s collection of such listeners so that it
1455   * will be notified only when the bound property bearing the
1456   * supplied {@code name} changes.
1457   *
1458   * @param name the name of the bound property whose changes are of
1459   * interest; may be {@code null} in which case all property change
1460   * notifications will be dispatched to the supplied {@link
1461   * PropertyChangeListener}
1462   *
1463   * @param listener the {@link PropertyChangeListener} to add; may be
1464   * {@code null} in which case no action will be taken
1465   *
1466   * @see #addPropertyChangeListener(PropertyChangeListener)
1467   */
1468  public final void addPropertyChangeListener(final String name, final PropertyChangeListener listener) {
1469    if (listener != null) {
1470      this.propertyChangeSupport.addPropertyChangeListener(name, listener);
1471    }
1472  }
1473
1474  /**
1475   * Adds the supplied {@link PropertyChangeListener} to this {@link
1476   * EventQueueCollection}'s collection of such listeners so that it
1477   * will be notified whenever any bound property of this {@link
1478   * EventQueueCollection} changes.
1479   *
1480   * @param listener the {@link PropertyChangeListener} to add; may be
1481   * {@code null} in which case no action will be taken
1482   *
1483   * @see #addPropertyChangeListener(String, PropertyChangeListener)
1484   */
1485  public final void addPropertyChangeListener(final PropertyChangeListener listener) {
1486    if (listener != null) {
1487      this.propertyChangeSupport.addPropertyChangeListener(listener);
1488    }
1489  }
1490
1491  /**
1492   * Removes the supplied {@link PropertyChangeListener} from this
1493   * {@link EventQueueCollection} so that it will no longer be
1494   * notified of changes to bound properties bearing the supplied
1495   * {@code name}.
1496   *
1497   * @param name a bound property name; may be {@code null}
1498   *
1499   * @param listener the {@link PropertyChangeListener} to remove; may
1500   * be {@code null} in which case no action will be taken
1501   *
1502   * @see #addPropertyChangeListener(String, PropertyChangeListener)
1503   *
1504   * @see #removePropertyChangeListener(PropertyChangeListener)
1505   */
1506  public final void removePropertyChangeListener(final String name, final PropertyChangeListener listener) {
1507    if (listener != null) {
1508      this.propertyChangeSupport.removePropertyChangeListener(name, listener);
1509    }
1510  }
1511
1512  /**
1513   * Removes the supplied {@link PropertyChangeListener} from this
1514   * {@link EventQueueCollection} so that it will no longer be
1515   * notified of any changes to bound properties.
1516   *
1517   * @param listener the {@link PropertyChangeListener} to remove; may
1518   * be {@code null} in which case no action will be taken
1519   *
1520   * @see #addPropertyChangeListener(PropertyChangeListener)
1521   *
1522   * @see #removePropertyChangeListener(String, PropertyChangeListener)
1523   */
1524  public final void removePropertyChangeListener(final PropertyChangeListener listener) {
1525    if (listener != null) {
1526      this.propertyChangeSupport.removePropertyChangeListener(listener);
1527    }
1528  }
1529
1530  /**
1531   * Returns an array of {@link PropertyChangeListener}s that were
1532   * {@linkplain #addPropertyChangeListener(String,
1533   * PropertyChangeListener) registered} to receive notifications for
1534   * changes to bound properties bearing the supplied {@code name}.
1535   *
1536   * <p>This method never returns {@code null}.</p>
1537   *
1538   * @param name the name of a bound property; may be {@code null} in
1539   * which case an empty array will be returned
1540   *
1541   * @return a non-{@code null} array of {@link
1542   * PropertyChangeListener}s
1543   *
1544   * @see #getPropertyChangeListeners()
1545   *
1546   * @see #addPropertyChangeListener(String, PropertyChangeListener)
1547   *
1548   * @see #removePropertyChangeListener(String,
1549   * PropertyChangeListener)
1550   */
1551  public final PropertyChangeListener[] getPropertyChangeListeners(final String name) {
1552    return this.propertyChangeSupport.getPropertyChangeListeners(name);
1553  }
1554
1555  /**
1556   * Returns an array of {@link PropertyChangeListener}s that were
1557   * {@linkplain #addPropertyChangeListener(String,
1558   * PropertyChangeListener) registered} to receive notifications for
1559   * changes to all bound properties.
1560   *
1561   * <p>This method never returns {@code null}.</p>
1562   *
1563   * @return a non-{@code null} array of {@link
1564   * PropertyChangeListener}s
1565   *
1566   * @see #getPropertyChangeListeners(String)
1567   *
1568   * @see #addPropertyChangeListener(PropertyChangeListener)
1569   *
1570   * @see #removePropertyChangeListener(PropertyChangeListener)
1571   */
1572  public final PropertyChangeListener[] getPropertyChangeListeners() {
1573    return this.propertyChangeSupport.getPropertyChangeListeners();
1574  }
1575
1576  /**
1577   * Fires a {@link PropertyChangeEvent} to {@linkplain
1578   * #addPropertyChangeListener(String, PropertyChangeListener)
1579   * registered <tt>PropertyChangeListener</tt>s} if the supplied
1580   * {@code old} and {@code newValue} objects are non-{@code null} and
1581   * not equal to each other.
1582   *
1583   * @param propertyName the name of the bound property that might
1584   * have changed; may be {@code null} (indicating that some unknown
1585   * set of bound properties has changed)
1586   *
1587   * @param old the old value of the bound property in question; may
1588   * be {@code null}
1589   *
1590   * @param newValue the new value of the bound property; may be
1591   * {@code null}
1592   */
1593  protected final void firePropertyChange(final String propertyName, final Object old, final Object newValue) {
1594    final String cn = this.getClass().getName();
1595    final String mn = "firePropertyChange";
1596    if (this.logger.isLoggable(Level.FINER)) {
1597      this.logger.entering(cn, mn, new Object[] { propertyName, old, newValue });
1598    }
1599    this.propertyChangeSupport.firePropertyChange(propertyName, old, newValue);
1600    if (this.logger.isLoggable(Level.FINER)) {
1601      this.logger.exiting(cn, mn);
1602    }
1603  }
1604
1605  /**
1606   * Fires a {@link PropertyChangeEvent} to {@linkplain
1607   * #addPropertyChangeListener(String, PropertyChangeListener)
1608   * registered <tt>PropertyChangeListener</tt>s} if the supplied
1609   * {@code old} and {@code newValue} objects are non-{@code null} and
1610   * not equal to each other.
1611   *
1612   * @param propertyName the name of the bound property that might
1613   * have changed; may be {@code null} (indicating that some unknown
1614   * set of bound properties has changed)
1615   *
1616   * @param old the old value of the bound property in question
1617   *
1618   * @param newValue the new value of the bound property
1619   */
1620  protected final void firePropertyChange(final String propertyName, final int old, final int newValue) {
1621    final String cn = this.getClass().getName();
1622    final String mn = "firePropertyChange";
1623    if (this.logger.isLoggable(Level.FINER)) {
1624      this.logger.entering(cn, mn, new Object[] { propertyName, Integer.valueOf(old), Integer.valueOf(newValue) });
1625    }
1626    this.propertyChangeSupport.firePropertyChange(propertyName, old, newValue);
1627    if (this.logger.isLoggable(Level.FINER)) {
1628      this.logger.exiting(cn, mn);
1629    }
1630  }
1631
1632  /**
1633   * Fires a {@link PropertyChangeEvent} to {@linkplain
1634   * #addPropertyChangeListener(String, PropertyChangeListener)
1635   * registered <tt>PropertyChangeListener</tt>s} if the supplied
1636   * {@code old} and {@code newValue} objects are non-{@code null} and
1637   * not equal to each other.
1638   *
1639   * @param name the name of the bound property that might
1640   * have changed; may be {@code null} (indicating that some unknown
1641   * set of bound properties has changed)
1642   *
1643   * @param old the old value of the bound property in question
1644   *
1645   * @param newValue the new value of the bound property
1646   */
1647  protected final void firePropertyChange(final String name, final boolean old, final boolean newValue) {
1648    final String cn = this.getClass().getName();
1649    final String mn = "firePropertyChange";
1650    if (this.logger.isLoggable(Level.FINER)) {
1651      this.logger.entering(cn, mn, new Object[] { name, Boolean.valueOf(old), Boolean.valueOf(newValue) });
1652    }
1653    this.propertyChangeSupport.firePropertyChange(name, old, newValue);
1654    if (this.logger.isLoggable(Level.FINER)) {
1655      this.logger.exiting(cn, mn);
1656    }
1657  }
1658
1659
1660  /*
1661   * Inner and nested classes.
1662   */
1663
1664
1665  /**
1666   * A {@link RuntimeException} indicating that a {@link Consumer}
1667   * {@linkplain EventQueueCollection#start(Consumer) started} by an
1668   * {@link EventQueueCollection} has encountered an error that might
1669   * not happen if the consumption operation is retried.
1670   *
1671   * @author <a href="https://about.me/lairdnelson"
1672   * target="_parent">Laird Nelson</a>
1673   *
1674   * @see EventQueueCollection#start(Consumer)
1675   *
1676   * @see EventQueueCollection
1677   */
1678  public static final class TransientException extends RuntimeException {
1679
1680
1681    /*
1682     * Static fields.
1683     */
1684
1685
1686    /**
1687     * The version of this class for {@linkplain Serializable
1688     * serialization purposes}.
1689     *
1690     * @see Serializable
1691     */
1692    private static final long serialVersionUID = 1L;
1693
1694
1695    /*
1696     * Constructors.
1697     */
1698
1699
1700    /**
1701     * Creates a new {@link TransientException}.
1702     */
1703    public TransientException() {
1704      super();
1705    }
1706
1707    /**
1708     * Creates a new {@link TransientException}.
1709     *
1710     * @param message a detail message describing the error; may be
1711     * {@code null}
1712     */
1713    public TransientException(final String message) {
1714      super(message);
1715    }
1716
1717    /**
1718     * Creates a new {@link TransientException}.
1719     *
1720     * @param cause the {@link Throwable} that caused this {@link
1721     * TransientException} to be created; may be {@code null}
1722     */
1723    public TransientException(final Throwable cause) {
1724      super(cause);
1725    }
1726
1727    /**
1728     * Creates a new {@link TransientException}.
1729     *
1730     * @param message a detail message describing the error; may be
1731     * {@code null}
1732     *
1733     * @param cause the {@link Throwable} that caused this {@link
1734     * TransientException} to be created; may be {@code null}
1735     */
1736    public TransientException(final String message, final Throwable cause) {
1737      super(message, cause);
1738    }
1739
1740  }
1741
1742
1743  /**
1744   * A {@link PropertyChangeListener} specifically designed for
1745   * reacting to a change in the synchronization status of an {@link
1746   * EventQueueCollection} as represented by the firing of its {@code
1747   * synchronized} <a
1748   * href="https://docs.oracle.com/javase/tutorial/javabeans/writing/properties.html#bound">bound
1749   * Java Beans property</a>.
1750   *
1751   * @author <a href="https://about.me/lairdnelson"
1752   * target="_parent">Laird Nelson</a>
1753   *
1754   * @see EventQueueCollection#addPropertyChangeListener(String,
1755   * PropertyChangeListener)
1756   *
1757   * @see #await()
1758   *
1759   * @see #propertyChange(PropertyChangeEvent)
1760   */
1761  public static final class SynchronizationAwaitingPropertyChangeListener implements PropertyChangeListener {
1762
1763
1764    /*
1765     * Instance fields.
1766     */
1767
1768
1769    /**
1770     * A {@link CountDownLatch} whose {@link
1771     * CountDownLatch#countDown()} method is invoked in certain cases
1772     * by the {@link #propertyChange(PropertyChangeEvent)} method.
1773     *
1774     * <p>This field is never {@code null}.</p>
1775     *
1776     * @see #propertyChange(PropertyChangeEvent)
1777     */
1778    private final CountDownLatch latch;
1779
1780
1781    /*
1782     * Constructors.
1783     */
1784    
1785
1786    /**
1787     * Creates a new {@link
1788     * SynchronizationAwaitingPropertyChangeListener}.
1789     */
1790    public SynchronizationAwaitingPropertyChangeListener() {
1791      super();
1792      this.latch = new CountDownLatch(1);
1793    }
1794
1795
1796    /*
1797     * Instance methods.
1798     */
1799    
1800
1801    /**
1802     * If the supplied {@link PropertyChangeEvent} is non-{@code
1803     * null}, has a {@linkplain PropertyChangeEvent#getSource()
1804     * source} that is an instance of {@link EventQueueCollection},
1805     * has a {@linkplain PropertyChangeEvent#getPropertyName()
1806     * property name} equal to {@code synchronized} and a {@linkplain
1807     * PropertyChangeEvent#getNewValue() new value} equal to {@link
1808     * Boolean#TRUE}, then it is guaranteed that any calls currently
1809     * blocked on the {@link #await()} or {@link #await(long,
1810     * TimeUnit)} methods will unblock, and subsequent invocations of
1811     * those methods will never block again.
1812     *
1813     * @param event a {@link PropertyChangeEvent} fired by an {@link
1814     * EventQueueCollection}; may be {@code null} in which case no
1815     * action will be taken
1816     *
1817     * @see EventQueueCollection#addPropertyChangeListener(String,
1818     * PropertyChangeListener)
1819     *
1820     * @see EventQueueCollection#isSynchronized()
1821     */
1822    @Override
1823    public final void propertyChange(final PropertyChangeEvent event) {
1824      if (event != null &&
1825          event.getSource() instanceof EventQueueCollection &&
1826          "synchronized".equals(event.getPropertyName()) &&
1827          Boolean.TRUE.equals(event.getNewValue())) {
1828        this.latch.countDown();
1829      }
1830    }
1831
1832    /**
1833     * Blocks until the conditions described in the documentation of
1834     * the {@link #propertyChange(PropertyChangeEvent)} method hold
1835     * true.
1836     *
1837     * @exception InterruptedException if the current {@link Thread}
1838     * is interrupted
1839     */
1840    @Blocking
1841    public final void await() throws InterruptedException {
1842      this.latch.await();
1843    }
1844
1845    /**
1846     * Blocks until the conditions described in the documentation of
1847     * the {@link #propertyChange(PropertyChangeEvent)} method hold
1848     * true or the indicated time has passed.
1849     *
1850     * @param timeout the number of units of time to wait for
1851     *
1852     * @param timeUnit the unit of time designated by the {@code
1853     * timeout} parameter; must not be {@code null}
1854     *
1855     * @return {@code false} if the waiting time elapsed before the
1856     * bound property named {@code synchronized} changed its value to
1857     * {@code true}; {@code true} otherwise
1858     *
1859     * @exception InterruptedException if the current {@link Thread}
1860     * is interrupted
1861     *
1862     * @exception NullPointerException if {@code timeUnit} is {@code
1863     * null}
1864     *
1865     * @see #propertyChange(PropertyChangeEvent)
1866     */
1867    @Blocking
1868    public final boolean await(final long timeout, final TimeUnit timeUnit) throws InterruptedException {
1869      return this.latch.await(timeout, timeUnit);
1870    }
1871    
1872  }
1873
1874}