001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
002 *
003 * Copyright © 2017-2018 microBean.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
014 * implied.  See the License for the specific language governing
015 * permissions and limitations under the License.
016 */
017package org.microbean.kubernetes.controller;
018
019import java.io.IOException;
020
021import java.time.Duration;
022import java.time.Instant;
023
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Iterator;
027import java.util.Map;
028import java.util.Objects;
029
030import java.util.concurrent.BlockingQueue;
031import java.util.concurrent.CancellationException;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.Executor;
034import java.util.concurrent.Executors;
035import java.util.concurrent.ExecutorService;
036import java.util.concurrent.Future;
037import java.util.concurrent.CopyOnWriteArrayList;
038import java.util.concurrent.LinkedBlockingQueue;
039import java.util.concurrent.ScheduledExecutorService;
040import java.util.concurrent.ScheduledThreadPoolExecutor;
041import java.util.concurrent.ThreadFactory;
042import java.util.concurrent.TimeUnit;
043
044import java.util.concurrent.atomic.AtomicInteger;
045
046import java.util.concurrent.locks.Lock;
047import java.util.concurrent.locks.ReadWriteLock;
048import java.util.concurrent.locks.ReentrantReadWriteLock;
049
050import java.util.function.Consumer;
051import java.util.function.Function;
052
053import java.util.logging.Level;
054import java.util.logging.Logger;
055
056import io.fabric8.kubernetes.api.model.HasMetadata;
057
058import net.jcip.annotations.Immutable;
059import net.jcip.annotations.GuardedBy;
060import net.jcip.annotations.ThreadSafe;
061
062/**
063 * A {@link ResourceTrackingEventQueueConsumer} that {@linkplain
064 * ResourceTrackingEventQueueConsumer#accept(EventQueue) consumes
065 * <tt>EventQueue</tt> instances} by feeding each {@link
066 * AbstractEvent} in the {@link EventQueue} being consumed to {@link
067 * Consumer}s of {@link AbstractEvent}s that have been {@linkplain
068 * #addConsumer(Consumer) registered}.
069 *
070 * <p>{@link EventDistributor} instances must be {@linkplain #close()
071 * closed} and discarded after use.</p>
072 *
073 * @param <T> a type of Kubernetes resource
074 *
075 * @author <a href="https://about.me/lairdnelson"
076 * target="_parent">Laird Nelson</a>
077 *
078 * @see #addConsumer(Consumer)
079 *
080 * @see #removeConsumer(Consumer)
081 *
082 * @see ResourceTrackingEventQueueConsumer#accept(AbstractEvent)
083 */
084@Immutable
085@ThreadSafe
086public final class EventDistributor<T extends HasMetadata> extends ResourceTrackingEventQueueConsumer<T> implements AutoCloseable {
087
088
089  /*
090   * Instance fields.
091   */
092
093
094  @GuardedBy("readLock && writeLock")
095  private final Collection<Pump<T>> pumps;
096
097  @GuardedBy("readLock && writeLock")
098  private final Collection<Pump<T>> synchronizingPumps;
099
100  private final Duration synchronizationInterval;
101
102  private final Lock readLock;
103
104  private final Lock writeLock;
105
106
107  /*
108   * Constructors.
109   */
110
111
112  /**
113   * Creates a new {@link EventDistributor}.
114   *
115   * @param knownObjects a mutable {@link Map} of Kubernetes resources
116   * that contains or will contain Kubernetes resources known to this
117   * {@link EventDistributor} and whatever mechanism (such as a {@link
118   * Controller}) is feeding it; may be {@code null}
119   *
120   * @see #EventDistributor(Map, Duration)
121   */
122  public EventDistributor(final Map<Object, T> knownObjects) {
123    this(knownObjects, null);
124  }
125
126  /**
127   * Creates a new {@link EventDistributor}.
128   *
129   * @param knownObjects a mutable {@link Map} of Kubernetes resources
130   * that contains or will contain Kubernetes resources known to this
131   * {@link EventDistributor} and whatever mechanism (such as a {@link
132   * Controller}) is feeding it; may be {@code null}
133   *
134   * @param synchronizationInterval a {@link Duration} representing
135   * the interval after which an attempt to synchronize might happen;
136   * may be {@code null} in which case no synchronization will occur
137   *
138   * @see
139   * ResourceTrackingEventQueueConsumer#ResourceTrackingEventQueueConsumer(Map)
140   */
141  public EventDistributor(final Map<Object, T> knownObjects, final Duration synchronizationInterval) {
142    super(knownObjects);
143    final ReadWriteLock lock = new ReentrantReadWriteLock();
144    this.readLock = lock.readLock();
145    this.writeLock = lock.writeLock();
146    this.pumps = new ArrayList<>();
147    this.synchronizingPumps = new ArrayList<>();
148    this.synchronizationInterval = synchronizationInterval;
149  }
150
151
152  /*
153   * Instance methods.
154   */
155
156
157  /**
158   * Adds the supplied {@link Consumer} to this {@link
159   * EventDistributor} as a listener that will be notified of each
160   * {@link AbstractEvent} this {@link EventDistributor} receives.
161   *
162   * <p>The supplied {@link Consumer}'s {@link
163   * Consumer#accept(Object)} method may be called later on a separate
164   * thread of execution.</p>
165   *
166   * @param consumer a {@link Consumer} of {@link AbstractEvent}s; may
167   * be {@code null} in which case no action will be taken
168   *
169   * @see #addConsumer(Consumer, Function)
170   *
171   * @see #removeConsumer(Consumer)
172   */
173  public final void addConsumer(final Consumer<? super AbstractEvent<? extends T>> consumer) {
174    this.addConsumer(consumer, null);
175  }
176
177  /**
178   * Adds the supplied {@link Consumer} to this {@link
179   * EventDistributor} as a listener that will be notified of each
180   * {@link AbstractEvent} this {@link EventDistributor} receives.
181   *
182   * <p>The supplied {@link Consumer}'s {@link
183   * Consumer#accept(Object)} method may be called later on a separate
184   * thread of execution.</p>
185   *
186   * @param consumer a {@link Consumer} of {@link AbstractEvent}s; may
187   * be {@code null} in which case no action will be taken
188   *
189   * @param errorHandler a {@link Function} to handle any {@link
190   * Throwable}s encountered; may be {@code null} in which case a
191   * default error handler will be used instead
192   *
193   * @see #removeConsumer(Consumer)
194   */
195  public final void addConsumer(final Consumer<? super AbstractEvent<? extends T>> consumer, final Function<? super Throwable, Boolean> errorHandler) {
196    if (consumer != null) {
197      this.writeLock.lock();
198      try {
199        final Pump<T> pump = new Pump<>(this.synchronizationInterval, consumer, errorHandler);
200        pump.start();
201        this.pumps.add(pump);
202        this.synchronizingPumps.add(pump);
203      } finally {
204        this.writeLock.unlock();
205      }
206    }
207  }
208
209  /**
210   * Removes any {@link Consumer} {@linkplain Object#equals(Object)
211   * equal to} a {@link Consumer} previously {@linkplain
212   * #addConsumer(Consumer) added} to this {@link EventDistributor}.
213   *
214   * @param consumer the {@link Consumer} to remove; may be {@code
215   * null} in which case no action will be taken
216   *
217   * @see #addConsumer(Consumer)
218   */
219  public final void removeConsumer(final Consumer<? super AbstractEvent<? extends T>> consumer) {
220    if (consumer != null) {
221      this.writeLock.lock();
222      try {
223        final Iterator<? extends Pump<?>> iterator = this.pumps.iterator();
224        assert iterator != null;
225        while (iterator.hasNext()) {
226          final Pump<?> pump = iterator.next();
227          if (pump != null && consumer.equals(pump.getEventConsumer())) {
228            pump.close();
229            iterator.remove();
230            break;
231          }
232        }
233      } finally {
234        this.writeLock.unlock();
235      }
236    }
237  }
238
239  /**
240   * Releases resources held by this {@link EventDistributor} during
241   * its execution.
242   */
243  @Override
244  public final void close() {
245    this.writeLock.lock();
246    try {
247      this.pumps.stream()
248        .forEach(pump -> {
249            pump.close();
250          });
251      this.synchronizingPumps.clear();
252      this.pumps.clear();
253    } finally {
254      this.writeLock.unlock();
255    }
256  }
257
258  /**
259   * Returns {@code true} if this {@link EventDistributor} should
260   * <em>synchronize</em> with its upstream source.
261   *
262   * <h2>Design Notes</h2>
263   *
264   * <p>The Kubernetes {@code tools/cache} package spreads
265   * synchronization out among the reflector, controller, event cache
266   * and event processor constructs for no seemingly good reason.
267   * They should probably be consolidated, particularly in an
268   * object-oriented environment such as Java.</p>
269   *
270   * @return {@code true} if synchronization should occur; {@code
271   * false} otherwise
272   *
273   * @see EventCache#synchronize()
274   */
275  public final boolean shouldSynchronize() {
276    boolean returnValue = false;
277    this.writeLock.lock();
278    try {
279      this.synchronizingPumps.clear();
280      final Instant now = Instant.now();
281      this.pumps.stream()
282        .filter(pump -> pump.shouldSynchronize(now))
283        .forEach(pump -> {
284            this.synchronizingPumps.add(pump);
285            pump.determineNextSynchronizationInterval(now);
286          });
287      returnValue = !this.synchronizingPumps.isEmpty();
288    } finally {
289      this.writeLock.unlock();
290    }
291    return returnValue;
292  }
293
294  /**
295   * Consumes the supplied {@link AbstractEvent} by forwarding it to
296   * the {@link Consumer#accept(Object)} method of each {@link
297   * Consumer} {@linkplain #addConsumer(Consumer) registered} with
298   * this {@link EventDistributor}.
299   *
300   * @param event the {@link AbstractEvent} to forward; may be {@code
301   * null} in which case no action is taken
302   *
303   * @see #addConsumer(Consumer)
304   *
305   * @see ResourceTrackingEventQueueConsumer#accept(AbstractEvent)
306   */
307  @Override
308  protected final void accept(final AbstractEvent<? extends T> event) {
309    if (event != null) {
310      if (event instanceof SynchronizationEvent) {
311        this.accept((SynchronizationEvent<? extends T>)event);
312      } else if (event instanceof Event) {
313        this.accept((Event<? extends T>)event);
314      } else {
315        assert false : "Unexpected event type: " + event.getClass();
316      }
317    }
318  }
319
320  private final void accept(final SynchronizationEvent<? extends T> event) {
321    this.readLock.lock();
322    try {
323      if (!this.synchronizingPumps.isEmpty()) {
324        this.synchronizingPumps.stream()
325          .forEach(pump -> pump.accept(event));
326      }
327    } finally {
328      this.readLock.unlock();
329    }
330  }
331
332  private final void accept(final Event<? extends T> event) {
333    this.readLock.lock();
334    try {
335      if (!this.pumps.isEmpty()) {
336        this.pumps.stream()
337          .forEach(pump -> pump.accept(event));
338      }
339    } finally {
340      this.readLock.unlock();
341    }
342  }
343
344
345  /*
346   * Inner and nested classes.
347   */
348
349
350  /**
351   * A {@link Consumer} of {@link AbstractEvent} instances that puts
352   * them on an internal queue and, in a separate thread, removes them
353   * from the queue and forwards them to the "real" {@link Consumer}
354   * supplied at construction time.
355   *
356   * <p>A {@link Pump} differs from a simple {@link Consumer} of
357   * {@link AbstractEvent} instances in that it has its own
358   * {@linkplain #getSynchronizationInterval() synchronization
359   * interval}, and interposes a blocking queue in between the
360   * reception of an {@link AbstractEvent} and its eventual broadcast.</p>
361   *
362   * @author <a href="https://about.me/lairdnelson"
363   * target="_parent">Laird Nelson</a>
364   */
365  private static final class Pump<T extends HasMetadata> implements Consumer<AbstractEvent<? extends T>>, AutoCloseable {
366
367    private final Logger logger;
368
369    private final Consumer<? super AbstractEvent<? extends T>> eventConsumer;
370
371    private final Function<? super Throwable, Boolean> errorHandler;
372
373    private volatile boolean closing;
374
375    private volatile Instant nextSynchronizationInstant;
376
377    private volatile Duration synchronizationInterval;
378
379    @GuardedBy("this")
380    private ScheduledExecutorService executor;
381
382    @GuardedBy("this")
383    private Future<?> task;
384
385    private volatile Future<?> errorHandlingTask;
386
387    final BlockingQueue<AbstractEvent<? extends T>> queue;
388
389    private Pump(final Duration synchronizationInterval, final Consumer<? super AbstractEvent<? extends T>> eventConsumer) {
390      this(synchronizationInterval, eventConsumer, null);
391    }
392
393    private Pump(final Duration synchronizationInterval, final Consumer<? super AbstractEvent<? extends T>> eventConsumer, final Function<? super Throwable, Boolean> errorHandler) {
394      super();
395      final String cn = this.getClass().getName();
396      this.logger = Logger.getLogger(cn);
397      assert this.logger != null;
398      final String mn = "<init>";
399      if (this.logger.isLoggable(Level.FINER)) {
400        this.logger.entering(cn, mn, new Object[] { synchronizationInterval, eventConsumer, errorHandler });
401      }
402
403      // TODO: this should be extensible
404      this.queue = new LinkedBlockingQueue<>();
405      this.eventConsumer = Objects.requireNonNull(eventConsumer);
406      if (errorHandler == null) {
407        this.errorHandler = t -> {
408          if (this.logger.isLoggable(Level.SEVERE)) {
409            this.logger.logp(Level.SEVERE, this.getClass().getName(), "<pumpTask>", t.getMessage(), t);
410          }
411          return true;
412        };
413      } else {
414        this.errorHandler = errorHandler;
415      }
416      this.setSynchronizationInterval(synchronizationInterval);
417
418      if (this.logger.isLoggable(Level.FINER)) {
419        this.logger.exiting(cn, mn);
420      }
421    }
422
423    private final void start() {
424      final String cn = this.getClass().getName();
425      final String mn = "start";
426      if (this.logger.isLoggable(Level.FINER)) {
427        this.logger.entering(cn, mn);
428      }
429
430      synchronized (this) {
431
432        if (this.executor == null) {
433          assert this.task == null;
434          assert this.errorHandlingTask == null;
435
436          this.executor = this.createScheduledThreadPoolExecutor();
437          if (this.executor == null) {
438            throw new IllegalStateException("createScheduledThreadPoolExecutor() == null");
439          }
440
441          // Schedule a hopefully never-ending task to pump events from
442          // our queue to the supplied eventConsumer.  We *schedule* this,
443          // even though it will never end, instead of simply *executing*
444          // it, so that if for any reason it exits (by definition an
445          // error case) it will get restarted.  Cancelling a scheduled
446          // task will also cancel all resubmissions of it, so this is the
447          // most robust thing to do.  The delay of one second is
448          // arbitrary.
449          this.task = this.executor.scheduleWithFixedDelay(() -> {
450              while (!Thread.currentThread().isInterrupted()) {
451                try {
452                  this.getEventConsumer().accept(this.queue.take());
453                } catch (final InterruptedException interruptedException) {
454                  Thread.currentThread().interrupt();
455                } catch (final RuntimeException runtimeException) {
456                  if (!this.errorHandler.apply(runtimeException)) {
457                    throw runtimeException;
458                  }
459                } catch (final Error error) {
460                  if (!this.errorHandler.apply(error)) {
461                    throw error;
462                  }
463                }
464              }
465            }, 0L, 1L, TimeUnit.SECONDS);
466          assert this.task != null;
467
468          this.errorHandlingTask = this.executor.submit(() -> {
469              try {
470                while (!Thread.currentThread().isInterrupted()) {
471                  // The task is basically never-ending, so this will
472                  // block too, unless there's an exception.  That's
473                  // the whole point.
474                  this.task.get();
475                }
476              } catch (final CancellationException ok) {
477                // The task was cancelled.  Possibly redundantly,
478                // cancel it for sure.  This is an expected and normal
479                // condition.
480                this.task.cancel(true);
481              } catch (final ExecutionException executionException) {
482                // The task encountered an exception while executing.
483                // Although we got an ExecutionException, the task is
484                // still in a non-cancelled state.  We need to cancel
485                // it now to (potentially) have it removed from the
486                // executor queue.
487                this.task.cancel(true);
488                final Future<?> errorHandlingTask = this.errorHandlingTask;
489                if (errorHandlingTask != null) {
490                  errorHandlingTask.cancel(true); // cancel ourselves, too!
491                }
492                // Apply the actual error-handling logic to the
493                // exception.
494                // TODO: This should have already been done by the
495                // task itself...
496                this.errorHandler.apply(executionException.getCause());
497              } catch (final InterruptedException interruptedException) {
498                Thread.currentThread().interrupt();
499              }
500              if (Thread.currentThread().isInterrupted()) {
501                // The current thread was interrupted, probably
502                // because everything is closing up shop.  Cancel
503                // everything and go home.
504                this.task.cancel(true);
505                final Future<?> errorHandlingTask = this.errorHandlingTask;
506                if (errorHandlingTask != null) {
507                  errorHandlingTask.cancel(true); // cancel ourselves, too!
508                }
509              }              
510            });
511        }
512
513      }
514
515      if (this.logger.isLoggable(Level.FINER)) {
516        this.logger.entering(cn, mn);
517      }
518    }
519
520    private final ScheduledExecutorService createScheduledThreadPoolExecutor() {
521      final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, new PumpThreadFactory());
522      executor.setRemoveOnCancelPolicy(true);
523      return executor;
524    }
525
526    private final Consumer<? super AbstractEvent<? extends T>> getEventConsumer() {
527      return this.eventConsumer;
528    }
529
530    /**
531     * Adds the supplied {@link AbstractEvent} to an internal {@link
532     * BlockingQueue}.  A task will have already been scheduled to
533     * consume it.
534     *
535     * @param event the {@link AbstractEvent} to add; may be {@code
536     * null} in which case no action is taken
537     */
538    @Override
539    public final void accept(final AbstractEvent<? extends T> event) {
540      final String cn = this.getClass().getName();
541      final String mn = "accept";
542      if (this.logger.isLoggable(Level.FINER)) {
543        this.logger.entering(cn, mn, event);
544      }
545      if (this.closing) {
546        throw new IllegalStateException();
547      }
548      if (event != null) {
549        final boolean added = this.queue.add(event);
550        assert added;
551      }
552      if (this.logger.isLoggable(Level.FINER)) {
553        this.logger.exiting(cn, mn);
554      }
555    }
556
557    @Override
558    public final void close() {
559      final String cn = this.getClass().getName();
560      final String mn = "close";
561      if (this.logger.isLoggable(Level.FINER)) {
562        this.logger.entering(cn, mn);
563      }
564
565      synchronized (this) {
566        if (!this.closing) {
567          try {
568            assert this.executor != null;
569            assert this.task != null;
570            assert this.errorHandlingTask != null;
571            this.closing = true;
572            
573            // Stop accepting new tasks.
574            this.executor.shutdown();
575            
576            // Cancel our regular task.
577            this.task.cancel(true);
578            this.task = null;
579            
580            // Cancel our task that surfaces errors from the regular task.
581            this.errorHandlingTask.cancel(true);
582            this.errorHandlingTask = null;
583            
584            try {
585              // Wait for our executor to shut down normally, and shut
586              // it down forcibly if it doesn't.
587              if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
588                this.executor.shutdownNow();
589                if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) {
590                  if (this.logger.isLoggable(Level.WARNING)) {
591                    this.logger.logp(Level.WARNING, cn, mn, "this.executor.awaitTermination() failed");
592                  }
593                }
594              }
595            } catch (final InterruptedException interruptedException) {
596              this.executor.shutdownNow();
597              Thread.currentThread().interrupt();
598            }
599            this.executor = null;
600          } finally {
601            this.closing = false;
602          }
603        }
604      }
605
606      if (this.logger.isLoggable(Level.FINER)) {
607        this.logger.exiting(cn, mn);
608      }
609    }
610
611
612    /*
613     * Synchronization-related methods.  It seems odd that one of these
614     * listeners would need to report details about synchronization, but
615     * that's what the Go code does.  Maybe this functionality could be
616     * relocated "higher up".
617     */
618
619
620    private final boolean shouldSynchronize(final Instant now) {
621      final String cn = this.getClass().getName();
622      final String mn = "shouldSynchronize";
623      if (this.logger.isLoggable(Level.FINER)) {
624        this.logger.entering(cn, mn, now);
625      }
626      final boolean returnValue;
627      if (this.closing) {
628        returnValue = false;
629      } else {
630        final Duration interval = this.getSynchronizationInterval();
631        if (interval == null || interval.isZero()) {
632          returnValue = false;
633        } else if (now == null) {
634          returnValue = Instant.now().compareTo(this.nextSynchronizationInstant) >= 0;
635        } else {
636          returnValue = now.compareTo(this.nextSynchronizationInstant) >= 0;
637        }
638      }
639      if (this.logger.isLoggable(Level.FINER)) {
640        this.logger.exiting(cn, mn, Boolean.valueOf(returnValue));
641      }
642      return returnValue;
643    }
644
645    private final void determineNextSynchronizationInterval(final Instant now) {
646      final String cn = this.getClass().getName();
647      final String mn = "determineNextSynchronizationInterval";
648      if (this.logger.isLoggable(Level.FINER)) {
649        this.logger.entering(cn, mn, now);
650      }
651      final Duration synchronizationInterval = this.getSynchronizationInterval();
652      if (synchronizationInterval == null) {
653        if (now == null) {
654          this.nextSynchronizationInstant = Instant.now();
655        } else {
656          this.nextSynchronizationInstant = now;
657        }
658      } else if (now == null) {
659        this.nextSynchronizationInstant = Instant.now().plus(synchronizationInterval);
660      } else {
661        this.nextSynchronizationInstant = now.plus(synchronizationInterval);
662      }
663      if (this.logger.isLoggable(Level.FINER)) {
664        this.logger.entering(cn, mn);
665      }
666    }
667
668    public final void setSynchronizationInterval(final Duration synchronizationInterval) {
669      this.synchronizationInterval = synchronizationInterval;
670    }
671
672    public final Duration getSynchronizationInterval() {
673      return this.synchronizationInterval;
674    }
675
676
677    /*
678     * Inner and nested classes.
679     */
680    
681    
682    /**
683     * A {@link ThreadFactory} that {@linkplain #newThread(Runnable)
684     * produces new <code>Thread</code>s} with sane names.
685     *
686     * @author <a href="https://about.me/lairdnelson"
687     * target="_parent">Laird Nelson</a>
688     */
689    private static final class PumpThreadFactory implements ThreadFactory {
690
691      private final ThreadGroup group;
692
693      private final AtomicInteger threadNumber = new AtomicInteger(1);
694
695      private PumpThreadFactory() {
696        final SecurityManager s = System.getSecurityManager();
697        if (s == null) {
698          this.group = Thread.currentThread().getThreadGroup();
699        } else {
700          this.group = s.getThreadGroup();
701        }
702      }
703
704      @Override
705      public final Thread newThread(final Runnable runnable) {
706        final Thread returnValue = new Thread(this.group, runnable, "event-pump-thread-" + this.threadNumber.getAndIncrement(), 0);
707        if (returnValue.isDaemon()) {
708          returnValue.setDaemon(false);
709        }
710        if (returnValue.getPriority() != Thread.NORM_PRIORITY) {
711          returnValue.setPriority(Thread.NORM_PRIORITY);
712        }
713        return returnValue;
714      }
715    }
716
717  }
718
719}