001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
002 *
003 * Copyright © 2017-2018 microBean.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
014 * implied.  See the License for the specific language governing
015 * permissions and limitations under the License.
016 */
017package org.microbean.kubernetes.controller;
018
019import java.io.Closeable;
020import java.io.IOException;
021
022import java.lang.reflect.Field;
023import java.lang.reflect.Method;
024
025import java.time.Duration;
026
027import java.time.temporal.ChronoUnit;
028
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.Objects;
033import java.util.Map;
034
035import java.util.concurrent.ExecutionException;
036import java.util.concurrent.Executors;
037import java.util.concurrent.Future;
038import java.util.concurrent.FutureTask;
039import java.util.concurrent.ScheduledExecutorService;
040import java.util.concurrent.ScheduledFuture;
041import java.util.concurrent.ScheduledThreadPoolExecutor;
042import java.util.concurrent.TimeUnit;
043
044import java.util.function.Function;
045
046import java.util.logging.Level;
047import java.util.logging.Logger;
048
049import io.fabric8.kubernetes.client.DefaultKubernetesClient; // for javadoc only
050import io.fabric8.kubernetes.client.KubernetesClientException;
051import io.fabric8.kubernetes.client.Watch; // for javadoc only
052import io.fabric8.kubernetes.client.Watcher;
053
054import io.fabric8.kubernetes.client.dsl.base.BaseOperation;
055import io.fabric8.kubernetes.client.dsl.base.OperationSupport;
056
057import io.fabric8.kubernetes.client.dsl.Listable;
058import io.fabric8.kubernetes.client.dsl.Versionable;
059import io.fabric8.kubernetes.client.dsl.VersionWatchable;
060import io.fabric8.kubernetes.client.dsl.Watchable;
061
062import io.fabric8.kubernetes.client.dsl.internal.CustomResourceOperationsImpl;
063
064import io.fabric8.kubernetes.api.model.HasMetadata;
065import io.fabric8.kubernetes.api.model.ObjectMeta;
066import io.fabric8.kubernetes.api.model.KubernetesResourceList;
067import io.fabric8.kubernetes.api.model.ListMeta;
068
069import net.jcip.annotations.GuardedBy;
070import net.jcip.annotations.ThreadSafe;
071
072import okhttp3.OkHttpClient;
073
074import org.microbean.development.annotation.Hack;
075import org.microbean.development.annotation.Issue;
076import org.microbean.development.annotation.NonBlocking;
077
078/**
079 * A pump of sorts that continuously "pulls" logical events out of
080 * Kubernetes and {@linkplain EventCache#add(Object, AbstractEvent.Type,
081 * HasMetadata) adds them} to an {@link EventCache} so as to logically
082 * "reflect" the contents of Kubernetes into the cache.
083 *
084 * <h2>Thread Safety</h2>
085 *
086 * <p>Instances of this class are safe for concurrent use by multiple
087 * {@link Thread}s.</p>
088 *
089 * <h2>Design Notes</h2>
090 *
091 * <p>This class loosely models the <a
092 * href="https://github.com/kubernetes/client-go/blob/9b03088ac34f23d8ac912f623f2ae73274c38ce8/tools/cache/reflector.go#L47">{@code
093 * Reflector} type in the {@code tools/cache} package of the {@code
094 * client-go} subproject of Kubernetes</a>.</p>
095 *
096 * @param <T> a type of Kubernetes resource
097 *
098 * @author <a href="https://about.me/lairdnelson"
099 * target="_parent">Laird Nelson</a>
100 *
101 * @see EventCache
102 */
103@ThreadSafe
104public class Reflector<T extends HasMetadata> implements Closeable {
105
106
107  /*
108   * Instance fields.
109   */
110
111
112  /**
113   * The operation that was supplied at construction time.
114   *
115   * <p>This field is never {@code null}.</p>
116   *
117   * <p>It is guaranteed that the value of this field may be
118   * assignable to a reference of type {@link Listable Listable&lt;?
119   * extends KubernetesResourceList&gt;} or to a reference of type
120   * {@link VersionWatchable VersionWatchable&lt;? extends Closeable,
121   * Watcher&lt;T&gt;&gt;}.</p>
122   *
123   * @see Listable
124   *
125   * @see VersionWatchable
126   */
127  private final Object operation;
128
129  /**
130   * The resource version that a successful watch operation processed.
131   *
132   * @see #setLastSynchronizationResourceVersion(Object)
133   *
134   * @see WatchHandler#eventReceived(Watcher.Action, HasMetadata)
135   */
136  private volatile Object lastSynchronizationResourceVersion;
137
138  /**
139   * The {@link ScheduledExecutorService} in charge of scheduling
140   * repeated invocations of the {@link #synchronize()} method.
141   *
142   * <p>This field may be {@code null}.</p>
143   *
144   * <h2>Thread Safety</h2>
145   *
146   * <p>This field is not safe for concurrent use by multiple threads
147   * without explicit synchronization on it.</p>
148   *
149   * @see #synchronize()
150   */
151  @GuardedBy("this")
152  private ScheduledExecutorService synchronizationExecutorService;
153
154  /**
155   * A {@link Function} that consumes a {@link Throwable} and returns
156   * {@code true} if the error represented by that {@link Throwable}
157   * was handled in some way.
158   *
159   * <p>This field may be {@code null}.</p>
160   */
161  private final Function<? super Throwable, Boolean> synchronizationErrorHandler;
162
163  /**
164   * A {@link ScheduledFuture} representing the task that is scheduled
165   * to repeatedly invoke the {@link #synchronize()} method.
166   *
167   * <p>This field may be {@code null}.</p>
168   *
169   * <h2>Thread Safety</h2>
170   *
171   * <p>This field is not safe for concurrent use by multiple threads
172   * without explicit synchronization on it.</p>
173   *
174   * @see #synchronize()
175   */
176  @GuardedBy("this")
177  private ScheduledFuture<?> synchronizationTask;
178
179  /**
180   * A flag tracking whether the {@link
181   * #synchronizationExecutorService} should be shut down when this
182   * {@link Reflector} is {@linkplain #close() closed}.  If the
183   * creator of this {@link Reflector} supplied an explicit {@link
184   * ScheduledExecutorService} at construction time, then it will not
185   * be shut down.
186   */
187  private final boolean shutdownSynchronizationExecutorServiceOnClose;
188
189  /**
190   * How many seconds to wait in between scheduled invocations of the
191   * {@link #synchronize()} method.  If the value of this field is
192   * less than or equal to zero then no synchronization will take
193   * place.
194   */
195  private final long synchronizationIntervalInSeconds;
196
197  /**
198   * The watch operation currently in effect.
199   *
200   * <p>This field may be {@code null} at any point.</p>
201   *
202   * <h2>Thread Safety</h2>
203   *
204   * <p>This field is not safe for concurrent use by multiple threads
205   * without explicit synchronization on it.</p>
206   */
207  @GuardedBy("this")
208  private Closeable watch;
209
210  /**
211   * An {@link EventCache} (often an {@link EventQueueCollection})
212   * whose contents will be added to to reflect the current state of
213   * Kubernetes.
214   *
215   * <p>This field is never {@code null}.</p>
216   */
217  @GuardedBy("itself")
218  private final EventCache<T> eventCache;
219
220  /**
221   * A {@link Logger} for use by this {@link Reflector}.
222   *
223   * <p>This field is never {@code null}.</p>
224   *
225   * @see #createLogger()
226   */
227  protected final Logger logger;
228
229
230  /*
231   * Constructors.
232   */
233
234
235  /**
236   * Creates a new {@link Reflector}.
237   *
238   * @param <X> a type that is both an appropriate kind of {@link
239   * Listable} and {@link VersionWatchable}, such as the kind of
240   * operation returned by {@link
241   * DefaultKubernetesClient#configMaps()} and the like
242   *
243   * @param operation a {@link Listable} and a {@link
244   * VersionWatchable} that can report information from a Kubernetes
245   * cluster; must not be {@code null}
246   *
247   * @param eventCache an {@link EventCache} <strong>that will be
248   * synchronized on</strong> and into which {@link Event}s will be
249   * logically "reflected"; must not be {@code null}
250   *
251   * @exception NullPointerException if {@code operation} or {@code
252   * eventCache} is {@code null}
253   *
254   * @exception IllegalStateException if the {@link #createLogger()}
255   * method returns {@code null}
256   *
257   * @see #Reflector(Listable, EventCache, ScheduledExecutorService,
258   * Duration, Function)
259   *
260   * @see #start()
261   */
262  @SuppressWarnings("rawtypes") // kubernetes-client's implementations of KubernetesResourceList use raw types
263  public <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<T>>> Reflector(final X operation,
264                                                                                                                              final EventCache<T> eventCache) {
265    this(operation, eventCache, null, null, null);
266  }
267
268  /**
269   * Creates a new {@link Reflector}.
270   *
271   * @param <X> a type that is both an appropriate kind of {@link
272   * Listable} and {@link VersionWatchable}, such as the kind of
273   * operation returned by {@link
274   * DefaultKubernetesClient#configMaps()} and the like
275   *
276   * @param operation a {@link Listable} and a {@link
277   * VersionWatchable} that can report information from a Kubernetes
278   * cluster; must not be {@code null}
279   *
280   * @param eventCache an {@link EventCache} <strong>that will be
281   * synchronized on</strong> and into which {@link Event}s will be
282   * logically "reflected"; must not be {@code null}
283   *
284   * @param synchronizationInterval a {@link Duration} representing
285   * the time in between one {@linkplain EventCache#synchronize()
286   * synchronization operation} and another; interpreted with a
287   * granularity of seconds; may be {@code null} or semantically equal
288   * to {@code 0} seconds in which case no synchronization will occur
289   *
290   * @exception NullPointerException if {@code operation} or {@code
291   * eventCache} is {@code null}
292   *
293   * @exception IllegalStateException if the {@link #createLogger()}
294   * method returns {@code null}
295   *
296   * @see #Reflector(Listable, EventCache, ScheduledExecutorService,
297   * Duration, Function)
298   *
299   * @see #start()
300   */
301  @SuppressWarnings("rawtypes") // kubernetes-client's implementations of KubernetesResourceList use raw types
302  public <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<T>>> Reflector(final X operation,
303                                                                                                                              final EventCache<T> eventCache,
304                                                                                                                              final Duration synchronizationInterval) {
305    this(operation, eventCache, null, synchronizationInterval, null);
306  }
307
308  /**
309   * Creates a new {@link Reflector}.
310   *
311   * @param <X> a type that is both an appropriate kind of {@link
312   * Listable} and {@link VersionWatchable}, such as the kind of
313   * operation returned by {@link
314   * DefaultKubernetesClient#configMaps()} and the like
315   *
316   * @param operation a {@link Listable} and a {@link
317   * VersionWatchable} that can report information from a Kubernetes
318   * cluster; must not be {@code null}
319   *
320   * @param eventCache an {@link EventCache} <strong>that will be
321   * synchronized on</strong> and into which {@link Event}s will be
322   * logically "reflected"; must not be {@code null}
323   *
324   * @param synchronizationExecutorService a {@link
325   * ScheduledExecutorService} to be used to tell the supplied {@link
326   * EventCache} to {@linkplain EventCache#synchronize() synchronize}
327   * on a schedule; may be {@code null} in which case no
328   * synchronization will occur
329   *
330   * @param synchronizationInterval a {@link Duration} representing
331   * the time in between one {@linkplain EventCache#synchronize()
332   * synchronization operation} and another; may be {@code null} in
333   * which case no synchronization will occur
334   *
335   * @exception NullPointerException if {@code operation} or {@code
336   * eventCache} is {@code null}
337   *
338   * @exception IllegalStateException if the {@link #createLogger()}
339   * method returns {@code null}
340   *
341   * @see #Reflector(Listable, EventCache, ScheduledExecutorService,
342   * Duration, Function)
343   *
344   * @see #start()
345   */
346  @SuppressWarnings("rawtypes") // kubernetes-client's implementations of KubernetesResourceList use raw types
347  public <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<T>>> Reflector(final X operation,
348                                                                                                                              final EventCache<T> eventCache,
349                                                                                                                              final ScheduledExecutorService synchronizationExecutorService,
350                                                                                                                              final Duration synchronizationInterval) {
351    this(operation, eventCache, synchronizationExecutorService, synchronizationInterval, null);
352  }
353
354  /**
355   * Creates a new {@link Reflector}.
356   *
357   * @param <X> a type that is both an appropriate kind of {@link
358   * Listable} and {@link VersionWatchable}, such as the kind of
359   * operation returned by {@link
360   * DefaultKubernetesClient#configMaps()} and the like
361   *
362   * @param operation a {@link Listable} and a {@link
363   * VersionWatchable} that can report information from a Kubernetes
364   * cluster; must not be {@code null}
365   *
366   * @param eventCache an {@link EventCache} <strong>that will be
367   * synchronized on</strong> and into which {@link Event}s will be
368   * logically "reflected"; must not be {@code null}
369   *
370   * @param synchronizationExecutorService a {@link
371   * ScheduledExecutorService} to be used to tell the supplied {@link
372   * EventCache} to {@linkplain EventCache#synchronize() synchronize}
373   * on a schedule; may be {@code null} in which case no
374   * synchronization will occur
375   *
376   * @param synchronizationInterval a {@link Duration} representing
377   * the time in between one {@linkplain EventCache#synchronize()
378   * synchronization operation} and another; may be {@code null} in
379   * which case no synchronization will occur
380   *
381   * @param synchronizationErrorHandler a {@link Function} that
382   * consumes a {@link Throwable} and returns a {@link Boolean}
383   * indicating whether the error represented by the {@link Throwable}
384   * in question was handled or not; may be {@code null}
385   *
386   * @exception NullPointerException if {@code operation} or {@code
387   * eventCache} is {@code null}
388   *
389   * @exception IllegalStateException if the {@link #createLogger()}
390   * method returns {@code null}
391   *
392   * @see #start()
393   */
394  @SuppressWarnings("rawtypes") // kubernetes-client's implementations of KubernetesResourceList use raw types
395  public <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<T>>> Reflector(final X operation,
396                                                                                                                              final EventCache<T> eventCache,
397                                                                                                                              final ScheduledExecutorService synchronizationExecutorService,
398                                                                                                                              final Duration synchronizationInterval,
399                                                                                                                              final Function<? super Throwable, Boolean> synchronizationErrorHandler) {
400    super();
401    this.logger = this.createLogger();
402    if (this.logger == null) {
403      throw new IllegalStateException("createLogger() == null");
404    }
405    final String cn = this.getClass().getName();
406    final String mn = "<init>";
407    if (this.logger.isLoggable(Level.FINER)) {
408      this.logger.entering(cn, mn, new Object[] { operation, eventCache, synchronizationExecutorService, synchronizationInterval });
409    }
410    Objects.requireNonNull(operation);
411    this.eventCache = Objects.requireNonNull(eventCache);
412    // TODO: research: maybe: operation.withField("metadata.resourceVersion", "0")?
413    this.operation = operation.withResourceVersion("0");
414
415    if (synchronizationInterval == null) {
416      this.synchronizationIntervalInSeconds = 0L;
417    } else {
418      this.synchronizationIntervalInSeconds = synchronizationInterval.get(ChronoUnit.SECONDS);
419    }
420    if (this.synchronizationIntervalInSeconds <= 0L) {
421      this.synchronizationExecutorService = null;
422      this.shutdownSynchronizationExecutorServiceOnClose = false;
423      this.synchronizationErrorHandler = null;
424    } else {
425      this.synchronizationExecutorService = synchronizationExecutorService;
426      this.shutdownSynchronizationExecutorServiceOnClose = synchronizationExecutorService == null;
427      if (synchronizationErrorHandler == null) {
428        this.synchronizationErrorHandler = t -> {
429          if (this.logger.isLoggable(Level.SEVERE)) {
430            this.logger.logp(Level.SEVERE,
431                             this.getClass().getName(), "<synchronizationTask>",
432                             t.getMessage(), t);
433          }
434          return true;
435        };
436      } else {
437        this.synchronizationErrorHandler = synchronizationErrorHandler;
438      }
439    }
440
441    if (this.logger.isLoggable(Level.FINER)) {
442      this.logger.exiting(cn, mn);
443    }
444  }
445
446
447  /*
448   * Instance methods.
449   */
450
451
452  /**
453   * Returns a {@link Logger} that will be used for this {@link
454   * Reflector}.
455   *
456   * <p>This method never returns {@code null}.</p>
457   *
458   * <p>Overrides of this method must not return {@code null}.</p>
459   *
460   * @return a non-{@code null} {@link Logger}
461   */
462  protected Logger createLogger() {
463    return Logger.getLogger(this.getClass().getName());
464  }
465
466  /**
467   * Notionally closes this {@link Reflector} by terminating any
468   * {@link Thread}s that it has started and invoking the {@link
469   * #onClose()} method while holding this {@link Reflector}'s
470   * monitor.
471   *
472   * @exception IOException if an error occurs
473   *
474   * @see #onClose()
475   */
476  @Override
477  public synchronized final void close() throws IOException {
478    final String cn = this.getClass().getName();
479    final String mn = "close";
480    if (this.logger.isLoggable(Level.FINER)) {
481      this.logger.entering(cn, mn);
482    }
483    
484    try {
485      this.closeSynchronizationExecutorService();
486      if (this.watch != null) {
487        this.watch.close();
488      }
489    } finally {
490      this.onClose();
491    }
492    
493    if (this.logger.isLoggable(Level.FINER)) {
494      this.logger.exiting(cn, mn);
495    }
496  }
497
498  /**
499   * {@linkplain Future#cancel(boolean) Cancels} scheduled invocations
500   * of the {@link #synchronize()} method.
501   *
502   * <p>This method is invoked by the {@link
503   * #closeSynchronizationExecutorService()} method.</p>
504   *
505   * @see #setUpSynchronization()
506   *
507   * @see #closeSynchronizationExecutorService()
508   */
509  private synchronized final void cancelSynchronization() {
510    final String cn = this.getClass().getName();
511    final String mn = "cancelSynchronization";
512    if (this.logger.isLoggable(Level.FINER)) {
513      this.logger.entering(cn, mn);
514    }
515    
516    if (this.synchronizationTask != null) {
517      this.synchronizationTask.cancel(true /* interrupt the task */);
518      this.synchronizationTask = null; // very important; see setUpSynchronization()
519    }
520    
521    if (this.logger.isLoggable(Level.FINER)) {
522      this.logger.exiting(cn, mn);
523    }
524  }
525
526  /**
527   * {@linkplain #cancelSynchronization Cancels scheduled invocations
528   * of the <code>synchronize()</code> method} and, when appropriate,
529   * shuts down the {@link ScheduledExecutorService} responsible for
530   * the scheduling.
531   *
532   * @see #cancelSynchronization()
533   *
534   * @see #setUpSynchronization()
535   */
536  private synchronized final void closeSynchronizationExecutorService() {
537    final String cn = this.getClass().getName();
538    final String mn = "closeSynchronizationExecutorService";
539    if (this.logger.isLoggable(Level.FINER)) {
540      this.logger.entering(cn, mn);
541    }
542
543    this.cancelSynchronization();
544
545    if (this.synchronizationExecutorService != null && this.shutdownSynchronizationExecutorServiceOnClose) {
546
547      // Stop accepting new tasks.  Not that any will be showing up
548      // anyway, but it's the right thing to do.
549      this.synchronizationExecutorService.shutdown();
550
551      try {
552        if (!this.synchronizationExecutorService.awaitTermination(60L, TimeUnit.SECONDS)) {
553          this.synchronizationExecutorService.shutdownNow();
554          if (!this.synchronizationExecutorService.awaitTermination(60L, TimeUnit.SECONDS)) {
555            if (this.logger.isLoggable(Level.WARNING)) {
556              this.logger.logp(Level.WARNING,
557                               cn, mn,
558                               "synchronizationExecutorService did not terminate cleanly after 60 seconds");
559            }
560          }
561        }
562      } catch (final InterruptedException interruptedException) {
563        this.synchronizationExecutorService.shutdownNow();
564        Thread.currentThread().interrupt();
565      }
566
567    }
568
569    if (this.logger.isLoggable(Level.FINER)) {
570      this.logger.exiting(cn, mn);
571    }
572  }
573
574  /**
575   * As the name implies, sets up <em>synchronization</em>, which is
576   * the act of the downstream event cache telling its associated
577   * event listeners that there are items remaining to be processed,
578   * and returns a {@link Future} reprsenting the scheduled, repeating
579   * task.
580   *
581   * <p>This method schedules repeated invocations of the {@link
582   * #synchronize()} method.</p>
583   *
584   * <p>This method may return {@code null}.</p>
585   *
586   * @return a {@link Future} representing the scheduled repeating
587   * synchronization task, or {@code null} if no such task was
588   * scheduled
589   *
590   * @see #synchronize()
591   *
592   * @see EventCache#synchronize()
593   */
594  private synchronized final Future<?> setUpSynchronization() {
595    final String cn = this.getClass().getName();
596    final String mn = "setUpSynchronization";
597    if (this.logger.isLoggable(Level.FINER)) {
598      this.logger.entering(cn, mn);
599    }
600
601    if (this.synchronizationIntervalInSeconds > 0L) {
602      if (this.synchronizationExecutorService == null || this.synchronizationExecutorService.isTerminated()) {
603        this.synchronizationExecutorService = Executors.newScheduledThreadPool(1);
604        if (this.synchronizationExecutorService instanceof ScheduledThreadPoolExecutor) {
605          ((ScheduledThreadPoolExecutor)this.synchronizationExecutorService).setRemoveOnCancelPolicy(true);
606        }
607      }
608      if (this.synchronizationTask == null) {
609        if (this.logger.isLoggable(Level.INFO)) {
610          this.logger.logp(Level.INFO,
611                           cn, mn,
612                           "Scheduling downstream synchronization every {0} seconds",
613                           Long.valueOf(this.synchronizationIntervalInSeconds));
614        }
615        this.synchronizationTask = this.synchronizationExecutorService.scheduleWithFixedDelay(this::synchronize, 0L, this.synchronizationIntervalInSeconds, TimeUnit.SECONDS);
616      }
617      assert this.synchronizationExecutorService != null;
618      assert this.synchronizationTask != null;
619    }
620
621    if (this.logger.isLoggable(Level.FINER)) {
622      this.logger.exiting(cn, mn, this.synchronizationTask);
623    }
624    return this.synchronizationTask;
625  }
626
627  /**
628   * Calls {@link EventCache#synchronize()} on this {@link
629   * Reflector}'s {@linkplain #eventCache affiliated
630   * <code>EventCache</code>}.
631   *
632   * <p>This method is normally invoked on a schedule by this {@link
633   * Reflector}'s {@linkplain #synchronizationExecutorService
634   * affiliated <code>ScheduledExecutorService</code>}.</p>
635   *
636   * @see #setUpSynchronization()
637   *
638   * @see #shouldSynchronize()
639   */
640  private final void synchronize() {
641    final String cn = this.getClass().getName();
642    final String mn = "synchronize";
643    if (this.logger.isLoggable(Level.FINER)) {
644      this.logger.entering(cn, mn);
645    }
646
647    if (this.shouldSynchronize()) {
648      if (this.logger.isLoggable(Level.FINE)) {
649        this.logger.logp(Level.FINE,
650                         cn, mn,
651                         "Synchronizing event cache with its downstream consumers");
652      }
653      Throwable throwable = null;
654      synchronized (this.eventCache) {
655        try {
656
657          // Tell the EventCache to run a synchronization operation.
658          // This will have the effect of adding SynchronizationEvents
659          // of type MODIFICATION to the EventCache.
660          this.eventCache.synchronize();
661          
662        } catch (final Throwable e) {
663          assert e instanceof RuntimeException || e instanceof Error;
664          throwable = e;
665        }
666      }
667      if (throwable != null && !this.synchronizationErrorHandler.apply(throwable)) {
668        if (throwable instanceof RuntimeException) {
669          throw (RuntimeException)throwable;
670        } else if (throwable instanceof Error) {
671          throw (Error)throwable;
672        } else {
673          assert !(throwable instanceof Exception) : "Signature changed for EventCache#synchronize()";
674        }
675      }
676    }
677
678    if (this.logger.isLoggable(Level.FINER)) {
679      this.logger.exiting(cn, mn);
680    }
681  }
682
683  /**
684   * Returns whether, at any given moment, this {@link Reflector}
685   * should cause its {@link EventCache} to {@linkplain
686   * EventCache#synchronize() synchronize}.
687   *
688   * <p>The default implementation of this method returns {@code true}
689   * if this {@link Reflector} was constructed with an explicit
690   * synchronization interval or {@link ScheduledExecutorService} or
691   * both.</p>
692   *
693   * <h2>Design Notes</h2>
694   *
695   * <p>This code follows the Go code in the Kubernetes {@code
696   * client-go/tools/cache} package.  One thing that becomes clear
697   * when looking at all of this through an object-oriented lens is
698   * that it is the {@link EventCache} (the {@code delta_fifo}, in the
699   * Go code) that is ultimately in charge of synchronizing.  It is
700   * not clear why in the Go code this is a function of a reflector.
701   * In an object-oriented world, perhaps the {@link EventCache}
702   * itself should be in charge of resynchronization schedules, but we
703   * choose to follow the Go code's division of responsibilities
704   * here.</p>
705   *
706   * @return {@code true} if this {@link Reflector} should cause its
707   * {@link EventCache} to {@linkplain EventCache#synchronize()
708   * synchronize}; {@code false} otherwise
709   */
710  protected boolean shouldSynchronize() {
711    final String cn = this.getClass().getName();
712    final String mn = "shouldSynchronize";
713    if (this.logger.isLoggable(Level.FINER)) {
714      this.logger.entering(cn, mn);
715    }
716    final boolean returnValue;
717    synchronized (this) {
718      returnValue = this.synchronizationExecutorService != null;
719    }
720    if (this.logger.isLoggable(Level.FINER)) {
721      this.logger.exiting(cn, mn, Boolean.valueOf(returnValue));
722    }
723    return returnValue;
724  }
725
726  // Not used; not used in the Go code either?!
727  private final Object getLastSynchronizationResourceVersion() {
728    return this.lastSynchronizationResourceVersion;
729  }
730
731  /**
732   * Records the last resource version processed by a successful watch
733   * operation.
734   *
735   * @param resourceVersion the resource version in question; may be
736   * {@code null}
737   *
738   * @see WatchHandler#eventReceived(Watcher.Action, HasMetadata)
739   */
740  private final void setLastSynchronizationResourceVersion(final Object resourceVersion) {
741    // lastSynchronizationResourceVersion is volatile; this is an
742    // atomic assignment
743    this.lastSynchronizationResourceVersion = resourceVersion;
744  }
745
746  /**
747   * Using the {@code operation} supplied at construction time,
748   * {@linkplain Listable#list() lists} appropriate Kubernetes
749   * resources, and then, on a separate {@link Thread}, {@linkplain
750   * VersionWatchable sets up a watch} on them, calling {@link
751   * EventCache#replace(Collection, Object)} and {@link
752   * EventCache#add(Object, AbstractEvent.Type, HasMetadata)} methods
753   * as appropriate.
754   *
755   * <p><strong>For convenience only</strong>, this method returns a
756   * {@link Future} representing any scheduled synchronization task
757   * created as a result of the user's having supplied a {@link
758   * Duration} at construction time.  The return value may be (and
759   * usually is) safely ignored.  Invoking {@link
760   * Future#cancel(boolean)} on the returned {@link Future} will
761   * result in the scheduled synchronization task being cancelled
762   * irrevocably.  <strong>Notably, invoking {@link
763   * Future#cancel(boolean)} on the returned {@link Future} will
764   * <em>not</em> {@linkplain #close() close} this {@link
765   * Reflector}.</strong>
766   *
767   * <p>This method never returns {@code null}.</p>
768   *
769   * <p>The calling {@link Thread} is not blocked by invocations of
770   * this method.</p>
771   *
772   * <h2>Implementation Notes</h2>
773   *
774   * <p>This method loosely models the <a
775   * href="https://github.com/kubernetes/client-go/blob/dcf16a0f3b52098c3d4c1467b6c80c3e88ff65fb/tools/cache/reflector.go#L128-L137">{@code
776   * Run} function in {@code reflector.go} together with the {@code
777   * ListAndWatch} function in the same file</a>.</p>
778   *
779   * @return a {@link Future} representing a scheduled synchronization
780   * operation; never {@code null}
781   *
782   * @exception IOException if a watch has previously been established
783   * and could not be {@linkplain Watch#close() closed}
784   *
785   * @exception KubernetesClientException if the initial attempt to
786   * {@linkplain Listable#list() list} Kubernetes resources fails
787   *
788   * @see #close()
789   */
790  @NonBlocking
791  public final Future<?> start() throws IOException {
792    final String cn = this.getClass().getName();
793    final String mn = "start";
794    if (this.logger.isLoggable(Level.FINER)) {
795      this.logger.entering(cn, mn);
796    }
797
798    Future<?> returnValue = null;
799    synchronized (this) {
800
801      try {
802
803        // If somehow we got called while a watch already exists, then
804        // close the old watch (we'll replace it).  Note that,
805        // critically, the onClose() method of our watch handler sets
806        // this reference to null, so if the watch is in the process
807        // of being closed, this little block won't be executed.
808        if (this.watch != null) {
809          final Closeable watch = this.watch;
810          this.watch = null;
811          if (logger.isLoggable(Level.FINE)) {
812            logger.logp(Level.FINE,
813                        cn, mn,
814                        "Closing pre-existing watch");
815          }
816          watch.close();
817          if (logger.isLoggable(Level.FINE)) {
818            logger.logp(Level.FINE,
819                        cn, mn,
820                        "Closed pre-existing watch");
821          }
822        }
823
824        // Run a list operation, and get the resourceVersion of that list.
825        if (logger.isLoggable(Level.FINE)) {
826          logger.logp(Level.FINE,
827                      cn, mn,
828                      "Listing Kubernetes resources using {0}", this.operation);
829        }
830        @Issue(id = "13", uri = "https://github.com/microbean/microbean-kubernetes-controller/issues/13")
831        @SuppressWarnings("unchecked")
832        final KubernetesResourceList<? extends T> list = ((Listable<? extends KubernetesResourceList<? extends T>>)this.operation).list();
833        assert list != null;
834
835        final ListMeta metadata = list.getMetadata();
836        assert metadata != null;
837
838        final String resourceVersion = metadata.getResourceVersion();
839        assert resourceVersion != null;
840
841        // Using the results of that list operation, do a full replace
842        // on the EventCache with them.
843        final Collection<? extends T> replacementItems;
844        final Collection<? extends T> items = list.getItems();
845        if (items == null || items.isEmpty()) {
846          replacementItems = Collections.emptySet();
847        } else {
848          replacementItems = Collections.unmodifiableCollection(new ArrayList<>(items));
849        }
850        
851        if (logger.isLoggable(Level.FINE)) {
852          logger.logp(Level.FINE, cn, mn, "Replacing resources in the event cache");
853        }
854        synchronized (this.eventCache) {
855          this.eventCache.replace(replacementItems, resourceVersion);
856        }
857        if (logger.isLoggable(Level.FINE)) {
858          logger.logp(Level.FINE, cn, mn, "Done replacing resources in the event cache");
859        }
860
861        // Record the resource version we captured during our list
862        // operation.
863        this.setLastSynchronizationResourceVersion(resourceVersion);
864
865        // Now that we've vetted that our list operation works (i.e. no
866        // syntax errors, no connectivity problems) we can schedule
867        // synchronizations if necessary.
868        //
869        // A synchronization is an operation where, if allowed, our
870        // eventCache goes through its set of known objects and--for
871        // any that are not enqueued for further processing
872        // already--fires a *synchronization* event of type
873        // MODIFICATION.  This happens on a schedule, not in reaction
874        // to an event.  This allows its downstream processors a
875        // chance to try to bring system state in line with desired
876        // state, even if no events have occurred (kind of like a
877        // heartbeat).  See
878        // https://engineering.bitnami.com/articles/a-deep-dive-into-kubernetes-controllers.html#resyncperiod.
879        this.setUpSynchronization();
880        returnValue = this.synchronizationTask;
881
882        // If there wasn't a synchronizationTask, then that means the
883        // user who created this Reflector didn't want any
884        // synchronization to happen.  We return a "dummy" Future that
885        // is already "completed" (isDone() returns true) to avoid
886        // having to return null.  The returned Future can be
887        // cancelled with no effect.
888        if (returnValue == null) {
889          final FutureTask<?> futureTask = new FutureTask<Void>(() -> {}, null);
890          futureTask.run(); // just sets "doneness"
891          assert futureTask.isDone();
892          assert !futureTask.isCancelled();
893          returnValue = futureTask;
894        }
895
896        assert returnValue != null;
897
898        // Now that we've taken care of our list() operation, set up our
899        // watch() operation.
900        if (logger.isLoggable(Level.FINE)) {
901          logger.logp(Level.FINE,
902                      cn, mn,
903                      "Watching Kubernetes resources with resource version {0} using {1}",
904                      new Object[] { resourceVersion, this.operation });
905        }
906        @SuppressWarnings("unchecked")
907        final Versionable<? extends Watchable<? extends Closeable, Watcher<T>>> versionableOperation =
908          (Versionable<? extends Watchable<? extends Closeable, Watcher<T>>>)this.operation;
909        this.watch = versionableOperation.withResourceVersion(resourceVersion).watch(new WatchHandler());
910        if (logger.isLoggable(Level.FINE)) {
911          logger.logp(Level.FINE,
912                      cn, mn,
913                      "Established watch: {0}", this.watch);
914        }
915
916      } catch (final IOException | RuntimeException | Error exception) {
917        this.cancelSynchronization();
918        if (this.watch != null) {
919          try {
920            // TODO: haven't seen it, but reason hard about deadlock
921            // here; see
922            // WatchHandler#onClose(KubernetesClientException) which
923            // *can* call start() (this method) with the monitor.  I
924            // *think* we're in the clear here:
925            // onClose(KubernetesClientException) will only (re-)call
926            // start() if the supplied KubernetesClientException is
927            // non-null.  In this case, it should be, because this is
928            // an ordinary close() call.
929            this.watch.close();
930          } catch (final Throwable suppressMe) {
931            exception.addSuppressed(suppressMe);
932          }
933          this.watch = null;
934        }
935        throw exception;
936      }
937    }
938    
939    if (this.logger.isLoggable(Level.FINER)) {
940      this.logger.exiting(cn, mn, returnValue);
941    }
942    return returnValue;
943  }
944
945  /**
946   * Invoked when {@link #close()} is invoked.
947   *
948   * <p>The default implementation of this method does nothing.</p>
949   *
950   * <p>Overrides of this method must consider that they will be
951   * invoked with this {@link Reflector}'s monitor held.</p>
952   *
953   * <p>Overrides of this method must not call the {@link #close()}
954   * method.</p>
955   *
956   * @see #close()
957   */
958  protected synchronized void onClose() {
959
960  }
961
962
963  /*
964   * Inner and nested classes.
965   */
966
967
968  /**
969   * A {@link Watcher} of Kubernetes resources.
970   *
971   * @author <a href="https://about.me/lairdnelson"
972   * target="_parent">Laird Nelson</a>
973   *
974   * @see Watcher
975   */
976  private final class WatchHandler implements Watcher<T> {
977
978
979    /*
980     * Constructors.
981     */
982
983
984    /**
985     * Creates a new {@link WatchHandler}.
986     */
987    private WatchHandler() {
988      super();
989      final String cn = this.getClass().getName();
990      final String mn = "<init>";
991      if (logger.isLoggable(Level.FINER)) {
992        logger.entering(cn, mn);
993        logger.exiting(cn, mn);
994      }
995    }
996
997
998    /*
999     * Instance methods.
1000     */
1001
1002
1003    /**
1004     * Calls the {@link EventCache#add(Object, AbstractEvent.Type,
1005     * HasMetadata)} method on the enclosing {@link Reflector}'s
1006     * associated {@link EventCache} with information harvested from
1007     * the supplied {@code resource}, and using an {@link
1008     * AbstractEvent.Type} selected appropriately given the supplied
1009     * {@link Watcher.Action}.
1010     *
1011     * @param action the kind of Kubernetes event that happened; must
1012     * not be {@code null}
1013     *
1014     * @param resource the {@link HasMetadata} object that was
1015     * affected; must not be {@code null}
1016     *
1017     * @exception NullPointerException if {@code action} or {@code
1018     * resource} was {@code null}
1019     *
1020     * @exception IllegalStateException if another error occurred
1021     */
1022    @Override
1023    public final void eventReceived(final Watcher.Action action, final T resource) {
1024      final String cn = this.getClass().getName();
1025      final String mn = "eventReceived";
1026      if (logger.isLoggable(Level.FINER)) {
1027        logger.entering(cn, mn, new Object[] { action, resource });
1028      }
1029      Objects.requireNonNull(action);
1030      Objects.requireNonNull(resource);
1031
1032      final ObjectMeta metadata = resource.getMetadata();
1033      assert metadata != null;
1034
1035      final Event.Type eventType;
1036      switch (action) {
1037      case ADDED:
1038        eventType = Event.Type.ADDITION;
1039        break;
1040      case MODIFIED:
1041        eventType = Event.Type.MODIFICATION;
1042        break;
1043      case DELETED:
1044        eventType = Event.Type.DELETION;
1045        break;
1046      case ERROR:
1047        // Uh...the Go code has:
1048        //
1049        //   if event.Type == watch.Error {
1050        //     return apierrs.FromObject(event.Object)
1051        //   }
1052        //
1053        // Now, apierrs.FromObject is here:
1054        // https://github.com/kubernetes/apimachinery/blob/kubernetes-1.9.2/pkg/api/errors/errors.go#L80-L88
1055        // This is looking for a Status object.  But
1056        // WatchConnectionHandler will never forward on such a thing:
1057        // https://github.com/fabric8io/kubernetes-client/blob/v3.1.8/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java#L246-L258
1058        //
1059        // So it follows that if by some chance we get here, resource
1060        // will definitely be a HasMetadata.  We go back to the Go
1061        // code again, and remember that if the type is Error, the
1062        // equivalent of this watch handler simply returns and goes home.
1063        //
1064        // Now, if we were to throw a RuntimeException here, which is
1065        // the idiomatic equivalent of returning and going home, this
1066        // would cause a watch reconnect:
1067        // https://github.com/fabric8io/kubernetes-client/blob/v3.1.8/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java#L159-L205
1068        // ...up to the reconnect limit.
1069        //
1070        // ...which is fine, but I'm not sure that in an error case a
1071        // WatchEvent will ever HAVE a HasMetadata as its payload.
1072        // Which means MAYBE we'll never get here.  But if we do, all
1073        // we can do is throw a RuntimeException...which ends up
1074        // reducing to the same case as the default case below, so we
1075        // fall through.
1076      default:
1077        eventType = null;
1078        throw new IllegalStateException();
1079      }
1080      assert eventType != null;
1081
1082      // Add an Event of the proper kind to our EventCache.  This is
1083      // the heart of this method.
1084      if (logger.isLoggable(Level.FINE)) {
1085        logger.logp(Level.FINE,
1086                    cn, mn,
1087                    "Adding event to cache: {0} {1}", new Object[] { eventType, resource });
1088      }
1089      synchronized (eventCache) {
1090        eventCache.add(Reflector.this, eventType, resource);
1091      }
1092
1093      // Record the most recent resource version we're tracking to be
1094      // that of this last successful watch() operation.  We set it
1095      // earlier during a list() operation.
1096      setLastSynchronizationResourceVersion(metadata.getResourceVersion());
1097
1098      if (logger.isLoggable(Level.FINER)) {
1099        logger.exiting(cn, mn);
1100      }
1101    }
1102
1103    /**
1104     * Invoked when the Kubernetes client connection closes.
1105     *
1106     * @param exception any {@link KubernetesClientException} that
1107     * caused this closing to happen; may be {@code null}
1108     */
1109    @Override
1110    public final void onClose(final KubernetesClientException exception) {
1111      final String cn = this.getClass().getName();
1112      final String mn = "onClose";
1113      if (logger.isLoggable(Level.FINER)) {
1114        logger.entering(cn, mn, exception);
1115      }
1116
1117      synchronized (Reflector.this) {
1118        // Don't close Reflector.this.watch before setting it to null
1119        // here; after all we're being called because it's in the
1120        // process of closing already!
1121        Reflector.this.watch = null;
1122      }
1123
1124      if (exception != null) {
1125        if (logger.isLoggable(Level.WARNING)) {
1126          logger.logp(Level.WARNING,
1127                      cn, mn,
1128                      exception.getMessage(), exception);
1129        }
1130        // See
1131        // https://github.com/kubernetes/client-go/blob/5f85fe426e7aa3c1df401a7ae6c1ba837bd76be9/tools/cache/reflector.go#L204.
1132        if (logger.isLoggable(Level.INFO)) {
1133          logger.logp(Level.INFO, cn, mn, "Restarting Reflector");
1134        }
1135        try {
1136          Reflector.this.start();
1137        } catch (final Throwable suppressMe) {
1138          if (logger.isLoggable(Level.SEVERE)) {
1139            logger.logp(Level.SEVERE,
1140                        cn, mn,
1141                        "Failed to restart Reflector", suppressMe);
1142          }
1143          exception.addSuppressed(suppressMe);
1144        }
1145      }
1146
1147      if (logger.isLoggable(Level.FINER)) {
1148        logger.exiting(cn, mn, exception);
1149      }
1150    }
1151
1152  }
1153
1154}