001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
002 *
003 * Copyright © 2017-2018 microBean.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
014 * implied.  See the License for the specific language governing
015 * permissions and limitations under the License.
016 */
017package org.microbean.kubernetes.controller;
018
019import java.io.Closeable;
020import java.io.IOException;
021
022import java.time.Duration;
023
024import java.util.Map;
025import java.util.Objects;
026
027import java.util.concurrent.Future;
028import java.util.concurrent.ScheduledExecutorService;
029import java.util.concurrent.TimeUnit;
030
031import java.util.function.Consumer;
032import java.util.function.Function;
033
034import java.util.logging.Level;
035import java.util.logging.Logger;
036
037import io.fabric8.kubernetes.api.model.HasMetadata;
038import io.fabric8.kubernetes.api.model.KubernetesResourceList;
039
040import io.fabric8.kubernetes.client.KubernetesClientException; // for javadoc only
041import io.fabric8.kubernetes.client.Watcher;
042
043import io.fabric8.kubernetes.client.dsl.Listable;
044import io.fabric8.kubernetes.client.dsl.VersionWatchable;
045
046import net.jcip.annotations.Immutable;
047import net.jcip.annotations.ThreadSafe;
048
049import org.microbean.development.annotation.Blocking;
050import org.microbean.development.annotation.NonBlocking;
051
052/**
053 * A convenient combination of a {@link Reflector}, a {@link
054 * VersionWatchable} and {@link Listable} implementation, an
055 * (internal) {@link EventQueueCollection}, a {@link Map} of known
056 * Kubernetes resources and an {@link EventQueue} {@link Consumer}
057 * that {@linkplain Reflector#start() mirrors Kubernetes cluster
058 * events} into a {@linkplain EventQueueCollection collection of
059 * <code>EventQueue</code>s} and {@linkplain
060 * EventQueueCollection#start(Consumer) arranges for their consumption
061 * and processing}.
062 *
063 * <p>{@linkplain #start() Starting} a {@link Controller} {@linkplain
064 * EventQueueCollection#start(Consumer) starts the
065 * <code>Consumer</code>} supplied at construction time, and
066 * {@linkplain Reflector#start() starts the embedded
067 * <code>Reflector</code>}.  {@linkplain #close() Closing} a {@link
068 * Controller} {@linkplain Reflector#close() closes its embedded
069 * <code>Reflector</code>} and {@linkplain
070 * EventQueueCollection#close() causes the <code>Consumer</code>
071 * supplied at construction time to stop receiving
072 * <code>Event</code>s}.</p>
073 *
074 * <p>Several {@code protected} methods in this class exist to make
075 * customization easier; none require overriding and their default
076 * behavior is usually just fine.</p>
077 *
078 * <h2>Thread Safety</h2>
079 *
080 * <p>Instances of this class are safe for concurrent use by multiple
081 * threads.</p>
082 *
083 * <h2>Design Notes</h2>
084 *
085 * <p>This class loosely models a combination of a <a
086 * href="https://github.com/kubernetes/client-go/blob/79cb21f5b3b1dd8f8b23bd3f79925b4fda4e2562/tools/cache/controller.go#L82-L86">{@code
087 * Controller} type</a> and a <a
088 * href="https://github.com/kubernetes/client-go/blob/79cb21f5b3b1dd8f8b23bd3f79925b4fda4e2562/tools/cache/shared_informer.go#L66-L71">{@code
089 * SharedIndexInformer} type</a> as found in <a
090 * href="https://github.com/kubernetes/client-go/blob/master/tools/cache/controller.go">{@code
091 * controller.go}</a> and <a
092 * href="https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go">{@code
093 * shared_informer.go}</a> respectively.</p>
094 *
095 * @param <T> a Kubernetes resource type
096 *
097 * @author <a href="https://about.me/lairdnelson"
098 * target="_parent">Laird Nelson</a>
099 *
100 * @see Reflector
101 *
102 * @see EventQueueCollection
103 *
104 * @see ResourceTrackingEventQueueConsumer
105 *
106 * @see #start()
107 *
108 * @see #close()
109 */
110@Immutable
111@ThreadSafe
112public class Controller<T extends HasMetadata> implements Closeable {
113
114
115  /*
116   * Instance fields.
117   */
118
119
120  /**
121   * A {@link Logger} used by this {@link Controller}.
122   *
123   * <p>This field is never {@code null}.</p>
124   *
125   * @see #createLogger()
126   */
127  protected final Logger logger;
128
129  /**
130   * The {@link Reflector} used by this {@link Controller} to mirror
131   * Kubernetes events.
132   *
133   * <p>This field is never {@code null}.</p>
134   */
135  private final Reflector<T> reflector;
136
137  /**
138   * The {@link EventQueueCollection} used by the {@link #reflector
139   * Reflector} and by the {@link Consumer} supplied at construction
140   * time.
141   *
142   * <p>This field is never {@code null}.</p>
143   *
144   * @see EventQueueCollection#add(Object, AbstractEvent.Type,
145   * HasMetadata)
146   *
147   * @see EventQueueCollection#replace(Collection, Object)
148   *
149   * @see EventQueueCollection#synchronize()
150   *
151   * @see EventQueueCollection#start(Consumer)
152   */
153  private final EventQueueCollection<T> eventQueueCollection;
154
155  private final EventQueueCollection.SynchronizationAwaitingPropertyChangeListener synchronizationAwaiter;
156
157  /**
158   * A {@link Consumer} of {@link EventQueue}s that processes {@link
159   * Event}s produced, ultimately, by the {@link #reflector
160   * Reflector}.
161   *
162   * <p>This field is never {@code null}.</p>
163   */
164  private final Consumer<? super EventQueue<? extends T>> eventQueueConsumer;
165
166  
167  /*
168   * Constructors.
169   */
170
171
172  /**
173   * Creates a new {@link Controller} but does not {@linkplain
174   * #start() start it}.
175   *
176   * @param <X> a {@link Listable} and {@link VersionWatchable} that
177   * will be used by the embedded {@link Reflector}; must not be
178   * {@code null}
179   *
180   * @param operation a {@link Listable} and a {@link
181   * VersionWatchable} that produces Kubernetes events; must not be
182   * {@code null}
183   *
184   * @param eventQueueConsumer the {@link Consumer} that will process
185   * each {@link EventQueue} as it becomes ready; must not be {@code
186   * null}
187   *
188   * @exception NullPointerException if {@code operation} or {@code
189   * eventQueueConsumer} is {@code null}
190   *
191   * @see #Controller(Listable, ScheduledExecutorService, Duration,
192   * Map, Consumer)
193   *
194   * @see #start()
195   */
196  @SuppressWarnings("rawtypes")
197  public <X extends Listable<? extends KubernetesResourceList>
198                    & VersionWatchable<? extends Closeable,
199                                                 Watcher<T>>> Controller(final X operation,
200                                                                         final Consumer<? super EventQueue<? extends T>> eventQueueConsumer) {
201    this(operation, null, null, null, eventQueueConsumer);
202  }
203
204  /**
205   * Creates a new {@link Controller} but does not {@linkplain
206   * #start() start it}.
207   *
208   * @param <X> a {@link Listable} and {@link VersionWatchable} that
209   * will be used by the embedded {@link Reflector}; must not be
210   * {@code null}
211   *
212   * @param operation a {@link Listable} and a {@link
213   * VersionWatchable} that produces Kubernetes events; must not be
214   * {@code null}
215   *
216   * @param knownObjects a {@link Map} containing the last known state
217   * of Kubernetes resources the embedded {@link EventQueueCollection}
218   * is caching events for; may be {@code null} if this {@link
219   * Controller} is not interested in tracking deletions of objects;
220   * if non-{@code null} <strong>will be synchronized on by this
221   * class</strong> during retrieval and traversal operations
222   *
223   * @param eventQueueConsumer the {@link Consumer} that will process
224   * each {@link EventQueue} as it becomes ready; must not be {@code
225   * null}
226   *
227   * @exception NullPointerException if {@code operation} or {@code
228   * eventQueueConsumer} is {@code null}
229   *
230   * @see #Controller(Listable, ScheduledExecutorService, Duration,
231   * Map, Consumer)
232   *
233   * @see #start()
234   */
235  @SuppressWarnings("rawtypes")
236  public <X extends Listable<? extends KubernetesResourceList>
237                    & VersionWatchable<? extends Closeable,
238                                                 Watcher<T>>> Controller(final X operation,
239                                                                         final Map<Object, T> knownObjects,
240                                                                         final Consumer<? super EventQueue<? extends T>> eventQueueConsumer) {
241    this(operation, null, null, knownObjects, eventQueueConsumer);
242  }
243
244  /**
245   * Creates a new {@link Controller} but does not {@linkplain
246   * #start() start it}.
247   *
248   * @param <X> a {@link Listable} and {@link VersionWatchable} that
249   * will be used by the embedded {@link Reflector}; must not be
250   * {@code null}
251   *
252   * @param operation a {@link Listable} and a {@link
253   * VersionWatchable} that produces Kubernetes events; must not be
254   * {@code null}
255   *
256   * @param synchronizationInterval a {@link Duration} representing
257   * the time in between one {@linkplain EventCache#synchronize()
258   * synchronization operation} and another; may be {@code null} in
259   * which case no synchronization will occur
260   *
261   * @param eventQueueConsumer the {@link Consumer} that will process
262   * each {@link EventQueue} as it becomes ready; must not be {@code
263   * null}
264   *
265   * @exception NullPointerException if {@code operation} or {@code
266   * eventQueueConsumer} is {@code null}
267   *
268   * @see #Controller(Listable, ScheduledExecutorService, Duration,
269   * Map, Consumer)
270   *
271   * @see #start()
272   */
273  @SuppressWarnings("rawtypes")
274  public <X extends Listable<? extends KubernetesResourceList>
275                    & VersionWatchable<? extends Closeable,
276                                                 Watcher<T>>> Controller(final X operation,
277                                                                         final Duration synchronizationInterval,
278                                                                         final Consumer<? super EventQueue<? extends T>> eventQueueConsumer) {
279    this(operation, null, synchronizationInterval, null, eventQueueConsumer);
280  }
281
282  /**
283   * Creates a new {@link Controller} but does not {@linkplain
284   * #start() start it}.
285   *
286   * @param <X> a {@link Listable} and {@link VersionWatchable} that
287   * will be used by the embedded {@link Reflector}; must not be
288   * {@code null}
289   *
290   * @param operation a {@link Listable} and a {@link
291   * VersionWatchable} that produces Kubernetes events; must not be
292   * {@code null}
293   *
294   * @param synchronizationInterval a {@link Duration} representing
295   * the time in between one {@linkplain EventCache#synchronize()
296   * synchronization operation} and another; may be {@code null} in
297   * which case no synchronization will occur
298   *
299   * @param knownObjects a {@link Map} containing the last known state
300   * of Kubernetes resources the embedded {@link EventQueueCollection}
301   * is caching events for; may be {@code null} if this {@link
302   * Controller} is not interested in tracking deletions of objects;
303   * if non-{@code null} <strong>will be synchronized on by this
304   * class</strong> during retrieval and traversal operations
305   *
306   * @param eventQueueConsumer the {@link Consumer} that will process
307   * each {@link EventQueue} as it becomes ready; must not be {@code
308   * null}
309   *
310   * @exception NullPointerException if {@code operation} or {@code
311   * eventQueueConsumer} is {@code null}
312   *
313   * @see #Controller(Listable, ScheduledExecutorService, Duration,
314   * Map, Consumer)
315   *
316   * @see #start()
317   */
318  @SuppressWarnings("rawtypes")
319  public <X extends Listable<? extends KubernetesResourceList>
320                    & VersionWatchable<? extends Closeable,
321                                                 Watcher<T>>> Controller(final X operation,
322                                                                         final Duration synchronizationInterval,
323                                                                         final Map<Object, T> knownObjects,
324                                                                         final Consumer<? super EventQueue<? extends T>> eventQueueConsumer) {
325    this(operation, null, synchronizationInterval, knownObjects, eventQueueConsumer);
326  }
327
328  /**
329   * Creates a new {@link Controller} but does not {@linkplain
330   * #start() start it}.
331   *
332   * @param <X> a {@link Listable} and {@link VersionWatchable} that
333   * will be used by the embedded {@link Reflector}; must not be
334   * {@code null}
335   *
336   * @param operation a {@link Listable} and a {@link
337   * VersionWatchable} that produces Kubernetes events; must not be
338   * {@code null}
339   *
340   * @param synchronizationExecutorService the {@link
341   * ScheduledExecutorService} that will be passed to the {@link
342   * Reflector} constructor; may be {@code null} in which case a
343   * default {@link ScheduledExecutorService} may be used instead
344   *
345   * @param synchronizationInterval a {@link Duration} representing
346   * the time in between one {@linkplain EventCache#synchronize()
347   * synchronization operation} and another; may be {@code null} in
348   * which case no synchronization will occur
349   *
350   * @param knownObjects a {@link Map} containing the last known state
351   * of Kubernetes resources the embedded {@link EventQueueCollection}
352   * is caching events for; may be {@code null} if this {@link
353   * Controller} is not interested in tracking deletions of objects;
354   * if non-{@code null} <strong>will be synchronized on by this
355   * class</strong> during retrieval and traversal operations
356   *
357   * @param eventQueueConsumer the {@link Consumer} that will process
358   * each {@link EventQueue} as it becomes ready; must not be {@code
359   * null}
360   *
361   * @exception NullPointerException if {@code operation} or {@code
362   * eventQueueConsumer} is {@code null}
363   *
364   * @see #start()
365   */
366  @SuppressWarnings("rawtypes")
367  public <X extends Listable<? extends KubernetesResourceList>
368                    & VersionWatchable<? extends Closeable,
369                                                 Watcher<T>>> Controller(final X operation,
370                                                                         final ScheduledExecutorService synchronizationExecutorService,
371                                                                         final Duration synchronizationInterval,
372                                                                         final Map<Object, T> knownObjects,
373                                                                         final Consumer<? super EventQueue<? extends T>> eventQueueConsumer) {
374    this(operation, synchronizationExecutorService, synchronizationInterval, null, knownObjects, eventQueueConsumer);
375  }
376
377  /**
378   * Creates a new {@link Controller} but does not {@linkplain
379   * #start() start it}.
380   *
381   * @param <X> a {@link Listable} and {@link VersionWatchable} that
382   * will be used by the embedded {@link Reflector}; must not be
383   * {@code null}
384   *
385   * @param operation a {@link Listable} and a {@link
386   * VersionWatchable} that produces Kubernetes events; must not be
387   * {@code null}
388   *
389   * @param synchronizationExecutorService the {@link
390   * ScheduledExecutorService} that will be passed to the {@link
391   * Reflector} constructor; may be {@code null} in which case a
392   * default {@link ScheduledExecutorService} may be used instead
393   *
394   * @param synchronizationInterval a {@link Duration} representing
395   * the time in between one {@linkplain EventCache#synchronize()
396   * synchronization operation} and another; may be {@code null} in
397   * which case no synchronization will occur
398   *
399   * @param errorHandler a {@link Function} that accepts a {@link
400   * Throwable} and returns a {@link Boolean} indicating whether the
401   * error was handled or not; used to handle truly unanticipated
402   * errors from within a {@link ScheduledExecutorService} used
403   * during {@linkplain EventCache#synchronize() synchronization} and
404   * event consumption activities; may be {@code null}
405   *
406   * @param knownObjects a {@link Map} containing the last known state
407   * of Kubernetes resources the embedded {@link EventQueueCollection}
408   * is caching events for; may be {@code null} if this {@link
409   * Controller} is not interested in tracking deletions of objects;
410   * if non-{@code null} <strong>will be synchronized on by this
411   * class</strong> during retrieval and traversal operations
412   *
413   * @param eventQueueConsumer the {@link Consumer} that will process
414   * each {@link EventQueue} as it becomes ready; must not be {@code
415   * null}
416   *
417   * @exception NullPointerException if {@code operation} or {@code
418   * eventQueueConsumer} is {@code null}
419   *
420   * @see #start()
421   */
422  @SuppressWarnings("rawtypes")
423  public <X extends Listable<? extends KubernetesResourceList>
424                    & VersionWatchable<? extends Closeable,
425                                                 Watcher<T>>> Controller(final X operation,
426                                                                         final ScheduledExecutorService synchronizationExecutorService,
427                                                                         final Duration synchronizationInterval,
428                                                                         final Function<? super Throwable, Boolean> errorHandler,
429                                                                         final Map<Object, T> knownObjects,
430                                                                         final Consumer<? super EventQueue<? extends T>> eventQueueConsumer) {
431    super();
432    this.logger = this.createLogger();
433    if (this.logger == null) {
434      throw new IllegalStateException("createLogger() == null");
435    }
436    final String cn = this.getClass().getName();
437    final String mn = "<init>";
438    if (this.logger.isLoggable(Level.FINER)) {
439      this.logger.entering(cn, mn, new Object[] { operation, synchronizationExecutorService, synchronizationInterval, errorHandler, knownObjects, eventQueueConsumer });
440    }
441    this.eventQueueConsumer = Objects.requireNonNull(eventQueueConsumer);
442    this.eventQueueCollection = new ControllerEventQueueCollection(knownObjects, errorHandler, 16, 0.75f);
443    this.synchronizationAwaiter = new EventQueueCollection.SynchronizationAwaitingPropertyChangeListener();
444    this.eventQueueCollection.addPropertyChangeListener(this.synchronizationAwaiter);
445    this.reflector = new ControllerReflector(operation, synchronizationExecutorService, synchronizationInterval, errorHandler);
446    if (this.logger.isLoggable(Level.FINER)) {
447      this.logger.exiting(cn, mn);
448    }
449  }
450
451
452  /*
453   * Instance methods.
454   */
455
456
457  /**
458   * Returns a {@link Logger} for use by this {@link Controller}.
459   *
460   * <p>This method never returns {@code null}.</p>
461   *
462   * <p>Overrides of this method must not return {@code null}.</p>
463   *
464   * @return a non-{@code null} {@link Logger}
465   */
466  protected Logger createLogger() {
467    return Logger.getLogger(this.getClass().getName());
468  }
469
470  /**
471   * Blocks until the {@link EventQueueCollection} affiliated with
472   * this {@link Controller} {@linkplain
473   * EventQueueCollection#isSynchronized() has synchronized}.
474   *
475   * @exception InterruptedException if the current {@link Thread} was
476   * interrupted
477   */
478  @Blocking
479  public final void awaitEventCacheSynchronization() throws InterruptedException {
480    this.synchronizationAwaiter.await();
481  }
482
483  /**
484   * Blocks for the desired amount of time until the {@link
485   * EventQueueCollection} affiliated with this {@link Controller}
486   * {@linkplain EventQueueCollection#isSynchronized() has
487   * synchronized} or the amount of time has elapsed.
488   *
489   * @param timeout the amount of time to wait
490   *
491   * @param timeUnit the {@link TimeUnit} designating the amount of
492   * time to wait; must not be {@code null}
493   *
494   * @return {@code false} if the waiting time elapsed before the
495   * event cache synchronized; {@code true} otherwise
496   *
497   * @exception InterruptedException if the current {@link Thread} was
498   * interrupted
499   *
500   * @exception NullPointerException if {@code timeUnit} is {@code
501   * null}
502   *
503   * @see EventQueueCollection.SynchronizationAwaitingPropertyChangeListener
504   */
505  @Blocking
506  public final boolean awaitEventCacheSynchronization(final long timeout, final TimeUnit timeUnit) throws InterruptedException {
507    return this.synchronizationAwaiter.await(timeout, timeUnit);
508  }
509  
510  /**
511   * {@linkplain EventQueueCollection#start(Consumer) Starts the
512   * embedded <code>EventQueueCollection</code> consumption machinery}
513   * and then {@linkplain Reflector#start() starts the embedded
514   * <code>Reflector</code>}.
515   *
516   * @exception IOException if {@link Reflector#start()} throws an
517   * {@link IOException}
518   *
519   * @exception KubernetesClientException if the {@linkplain Reflector
520   * embedded <code>Reflector</code>} could not be started
521   *
522   * @see EventQueueCollection#start(Consumer)
523   *
524   * @see Reflector#start()
525   */
526  @NonBlocking
527  public final void start() throws IOException {
528    final String cn = this.getClass().getName();
529    final String mn = "start";
530    if (this.logger.isLoggable(Level.FINER)) {
531      this.logger.entering(cn, mn);
532    }
533
534    // Start the consumer that is going to drain our associated
535    // EventQueueCollection.
536    if (this.logger.isLoggable(Level.INFO)) {
537      this.logger.logp(Level.INFO, cn, mn, "Starting {0}", this.eventQueueConsumer);
538    }
539    final Future<?> eventQueueConsumerTask = this.eventQueueCollection.start(this.eventQueueConsumer);
540    assert eventQueueConsumerTask != null;
541
542    // Start the Reflector--the machinery that is going to connect to
543    // Kubernetes and "reflect" its (relevant) contents into the
544    // EventQueueCollection.
545    if (this.logger.isLoggable(Level.INFO)) {
546      this.logger.logp(Level.INFO, cn, mn, "Starting {0}", this.reflector);
547    }
548    try {
549      this.reflector.start();
550    } catch (final IOException | RuntimeException | Error reflectorStartFailure) {
551      try {
552        // TODO: this is problematic, I think; reflector.close() means
553        // that (potentially) it will never be able to restart it.
554        // The Go code appears to make some feints in the direction of
555        // restartability, and then just basically gives up.  I think
556        // we can do better here.
557        this.reflector.close();
558      } catch (final Throwable suppressMe) {
559        reflectorStartFailure.addSuppressed(suppressMe);
560      }
561      eventQueueConsumerTask.cancel(true);
562      assert eventQueueConsumerTask.isDone();
563      try {
564        this.eventQueueCollection.close();
565      } catch (final Throwable suppressMe) {
566        reflectorStartFailure.addSuppressed(suppressMe);
567      }
568      throw reflectorStartFailure;
569    }
570    
571    if (this.logger.isLoggable(Level.FINER)) {
572      this.logger.exiting(cn, mn);
573    }
574  }
575
576  /**
577   * {@linkplain Reflector#close() Closes the embedded
578   * <code>Reflector</code>} and then {@linkplain
579   * EventQueueCollection#close() closes the embedded
580   * <code>EventQueueCollection</code>}, handling exceptions
581   * appropriately.
582   *
583   * @exception IOException if the {@link Reflector} could not
584   * {@linkplain Reflector#close() close} properly
585   *
586   * @see Reflector#close()
587   *
588   * @see EventQueueCollection#close()
589   */
590  @Override
591  public final void close() throws IOException {
592    final String cn = this.getClass().getName();
593    final String mn = "close";
594    if (this.logger.isLoggable(Level.FINER)) {
595      this.logger.entering(cn, mn);
596    }
597    Exception throwMe = null;    
598    try {
599      if (this.logger.isLoggable(Level.INFO)) {
600        this.logger.logp(Level.INFO, cn, mn, "Closing {0}", this.reflector);
601      }
602      this.reflector.close();
603    } catch (final Exception everything) {
604      throwMe = everything;
605    }
606
607    try {
608      if (this.logger.isLoggable(Level.INFO)) {
609        this.logger.logp(Level.INFO, cn, mn, "Closing {0}", this.eventQueueCollection);
610      }
611      this.eventQueueCollection.close();
612    } catch (final RuntimeException | Error runtimeException) {
613      if (throwMe == null) {
614        throw runtimeException;
615      }
616      throwMe.addSuppressed(runtimeException);
617    }
618
619    if (throwMe instanceof IOException) {
620      throw (IOException)throwMe;
621    } else if (throwMe instanceof RuntimeException) {
622      throw (RuntimeException)throwMe;
623    } else if (throwMe != null) {
624      throw new IllegalStateException(throwMe.getMessage(), throwMe);
625    }
626
627    if (this.logger.isLoggable(Level.FINER)) {
628      this.logger.exiting(cn, mn);
629    }
630  }
631
632  /**
633   * Returns if the embedded {@link Reflector} should {@linkplain
634   * Reflector#shouldSynchronize() synchronize}.
635   *
636   * <p>This implementation returns {@code true}.</p>
637   *
638   * @return {@code true} if the embedded {@link Reflector} should
639   * {@linkplain Reflector#shouldSynchronize() synchronize}; {@code
640   * false} otherwise
641   */
642  protected boolean shouldSynchronize() {
643    final String cn = this.getClass().getName();
644    final String mn = "shouldSynchronize";
645    if (this.logger.isLoggable(Level.FINER)) {
646      this.logger.entering(cn, mn);
647    }
648    final boolean returnValue = true;
649    if (this.logger.isLoggable(Level.FINER)) {
650      this.logger.exiting(cn, mn, Boolean.valueOf(returnValue));
651    }
652    return returnValue;
653  }
654
655  /**
656   * Invoked after the embedded {@link Reflector} {@linkplain
657   * Reflector#onClose() closes}.
658   *
659   * <p>This implementation does nothing.</p>
660   *
661   * @see Reflector#close()
662   *
663   * @see Reflector#onClose()
664   */
665  protected void onClose() {
666
667  }
668
669  /**
670   * Returns a key that can be used to identify the supplied {@link
671   * HasMetadata}.
672   *
673   * <p>This method never returns {@code null}.</p>
674   *
675   * <p>Overrides of this method must not return {@code null}.</p>
676   *
677   * <p>The default implementation of this method returns the return
678   * value of invoking the {@link HasMetadatas#getKey(HasMetadata)}
679   * method.</p>
680   *
681   * @param resource the Kubernetes resource for which a key is
682   * desired; must not be {@code null}
683   *
684   * @return a non-{@code null} key for the supplied {@link
685   * HasMetadata}
686   *
687   * @exception NullPointerException if {@code resource} is {@code
688   * null}
689   */
690  protected Object getKey(final T resource) {
691    final String cn = this.getClass().getName();
692    final String mn = "getKey";
693    if (this.logger.isLoggable(Level.FINER)) {
694      this.logger.entering(cn, mn, resource);
695    }
696    final Object returnValue = HasMetadatas.getKey(Objects.requireNonNull(resource));
697    if (this.logger.isLoggable(Level.FINER)) {
698      this.logger.exiting(cn, mn, returnValue);
699    }
700    return returnValue;
701  }
702
703  /**
704   * Creates a new {@link Event} when invoked.
705   *
706   * <p>This method never returns {@code null}.</p>
707   *
708   * <p>Overrides of this method must not return {@code null}.</p>
709   *
710   * <p>Overrides of this method must return a new {@link Event} or
711   * subclass with each invocation.</p>
712   *
713   * @param source the source of the new {@link Event}; must not be
714   * {@code null}
715   *
716   * @param eventType the {@link Event.Type} for the new {@link
717   * Event}; must not be {@code null}
718   *
719   * @param resource the {@link HasMetadata} that the new {@link
720   * Event} concerns; must not be {@code null}
721   *
722   * @return a new, non-{@code null} {@link Event}
723   *
724   * @exception NullPointerException if any of the parameters is
725   * {@code null}
726   */
727  protected Event<T> createEvent(final Object source, final Event.Type eventType, final T resource) {
728    final String cn = this.getClass().getName();
729    final String mn = "createEvent";
730    if (this.logger.isLoggable(Level.FINER)) {
731      this.logger.entering(cn, mn, new Object[] { source, eventType, resource });
732    }
733    final Event<T> returnValue = new Event<>(Objects.requireNonNull(source), Objects.requireNonNull(eventType), null, Objects.requireNonNull(resource));
734    if (this.logger.isLoggable(Level.FINER)) {
735      this.logger.exiting(cn, mn, returnValue);
736    }
737    return returnValue;
738  }
739
740  /**
741   * Creates a new {@link EventQueue} when invoked.
742   *
743   * <p>This method never returns {@code null}.</p>
744   *
745   * <p>Overrides of this method must not return {@code null}.</p>
746   *
747   * <p>Overrides of this method must return a new {@link EventQueue}
748   * or subclass with each invocation.</p>
749   *
750   * @param key the key to create the new {@link EventQueue} with;
751   * must not be {@code null}
752   *
753   * @return a new, non-{@code null} {@link EventQueue}
754   *
755   * @exception NullPointerException if {@code key} is {@code null}
756   */
757  protected EventQueue<T> createEventQueue(final Object key) {
758    final String cn = this.getClass().getName();
759    final String mn = "createEventQueue";
760    if (this.logger.isLoggable(Level.FINER)) {
761      this.logger.entering(cn, mn, key);
762    }
763    final EventQueue<T> returnValue = new EventQueue<>(key);
764    if (this.logger.isLoggable(Level.FINER)) {
765      this.logger.exiting(cn, mn, returnValue);
766    }
767    return returnValue;
768  }
769
770
771  /*
772   * Inner and nested classes.
773   */
774
775
776  /**
777   * An {@link EventQueueCollection} that delegates its overridable
778   * methods to their equivalents in the {@link Controller} class.
779   *
780   * @author <a href="https://about.me/lairdnelson"
781   * target="_parent">Laird Nelson</a>
782   *
783   * @see EventQueueCollection
784   *
785   * @see EventCache
786   */
787  private final class ControllerEventQueueCollection extends EventQueueCollection<T> {
788
789
790    /*
791     * Constructors.
792     */
793
794    
795    private ControllerEventQueueCollection(final Map<?, ? extends T> knownObjects,
796                                           final Function<? super Throwable, Boolean> errorHandler,
797                                           final int initialCapacity,
798                                           final float loadFactor) {
799      super(knownObjects, errorHandler, initialCapacity, loadFactor);
800    }
801
802
803    /*
804     * Instance methods.
805     */
806
807    
808    @Override
809    protected final Event<T> createEvent(final Object source, final Event.Type eventType, final T resource) {
810      return Controller.this.createEvent(source, eventType, resource);
811    }
812    
813    @Override
814    protected final EventQueue<T> createEventQueue(final Object key) {
815      return Controller.this.createEventQueue(key);
816    }
817    
818    @Override
819    protected final Object getKey(final T resource) {
820      return Controller.this.getKey(resource);
821    }
822    
823  }
824
825  
826  /**
827   * A {@link Reflector} that delegates its overridable
828   * methods to their equivalents in the {@link Controller} class.
829   *
830   * @author <a href="https://about.me/lairdnelson"
831   * target="_parent">Laird Nelson</a>
832   *
833   * @see Reflector
834   */
835  private final class ControllerReflector extends Reflector<T> {
836
837
838    /*
839     * Constructors.
840     */
841
842    
843    @SuppressWarnings("rawtypes")
844    private <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<T>>> ControllerReflector(final X operation,
845                                                                                                                                           final ScheduledExecutorService synchronizationExecutorService,
846                                                                                                                                           final Duration synchronizationInterval, final Function<? super Throwable, Boolean> synchronizationErrorHandler) {
847      super(operation, Controller.this.eventQueueCollection, synchronizationExecutorService, synchronizationInterval, synchronizationErrorHandler);
848    }
849
850
851    /*
852     * Instance methods.
853     */
854    
855
856    /**
857     * Invokes the {@link Controller#shouldSynchronize()} method and
858     * returns its result.
859     *
860     * @return the result of invoking the {@link
861     * Controller#shouldSynchronize()} method
862     *
863     * @see Controller#shouldSynchronize()
864     */
865    @Override
866    protected final boolean shouldSynchronize() {
867      return Controller.this.shouldSynchronize();
868    }
869    
870    /**
871     * Invokes the {@link Controller#onClose()} method.
872     *
873     * @see Controller#onClose()
874     */
875    @Override
876    protected final void onClose() {
877      Controller.this.onClose();
878    }
879  }
880  
881}