001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*- 002 * 003 * Copyright © 2017-2018 microBean. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 014 * implied. See the License for the specific language governing 015 * permissions and limitations under the License. 016 */ 017package org.microbean.kubernetes.controller; 018 019import java.io.IOException; 020 021import java.time.Duration; 022import java.time.Instant; 023 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Iterator; 027import java.util.Map; 028import java.util.Objects; 029 030import java.util.concurrent.BlockingQueue; 031import java.util.concurrent.CancellationException; 032import java.util.concurrent.ExecutionException; 033import java.util.concurrent.Executor; 034import java.util.concurrent.Executors; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.Future; 037import java.util.concurrent.CopyOnWriteArrayList; 038import java.util.concurrent.LinkedBlockingQueue; 039import java.util.concurrent.ScheduledExecutorService; 040import java.util.concurrent.ScheduledThreadPoolExecutor; 041import java.util.concurrent.ThreadFactory; 042import java.util.concurrent.TimeUnit; 043 044import java.util.concurrent.atomic.AtomicInteger; 045 046import java.util.concurrent.locks.Lock; 047import java.util.concurrent.locks.ReadWriteLock; 048import java.util.concurrent.locks.ReentrantReadWriteLock; 049 050import java.util.function.Consumer; 051import java.util.function.Function; 052 053import java.util.logging.Level; 054import java.util.logging.Logger; 055 056import io.fabric8.kubernetes.api.model.HasMetadata; 057 058import net.jcip.annotations.Immutable; 059import net.jcip.annotations.GuardedBy; 060import net.jcip.annotations.ThreadSafe; 061 062/** 063 * A {@link ResourceTrackingEventQueueConsumer} that {@linkplain 064 * ResourceTrackingEventQueueConsumer#accept(EventQueue) consumes 065 * <tt>EventQueue</tt> instances} by feeding each {@link 066 * AbstractEvent} in the {@link EventQueue} being consumed to {@link 067 * Consumer}s of {@link AbstractEvent}s that have been {@linkplain 068 * #addConsumer(Consumer) registered}. 069 * 070 * <p>{@link EventDistributor} instances must be {@linkplain #close() 071 * closed} and discarded after use.</p> 072 * 073 * @param <T> a type of Kubernetes resource 074 * 075 * @author <a href="https://about.me/lairdnelson" 076 * target="_parent">Laird Nelson</a> 077 * 078 * @see #addConsumer(Consumer) 079 * 080 * @see #removeConsumer(Consumer) 081 * 082 * @see ResourceTrackingEventQueueConsumer#accept(AbstractEvent) 083 */ 084@Immutable 085@ThreadSafe 086public final class EventDistributor<T extends HasMetadata> extends ResourceTrackingEventQueueConsumer<T> implements AutoCloseable { 087 088 089 /* 090 * Instance fields. 091 */ 092 093 094 @GuardedBy("readLock && writeLock") 095 private final Collection<Pump<T>> pumps; 096 097 @GuardedBy("readLock && writeLock") 098 private final Collection<Pump<T>> synchronizingPumps; 099 100 private final Duration synchronizationInterval; 101 102 private final Lock readLock; 103 104 private final Lock writeLock; 105 106 107 /* 108 * Constructors. 109 */ 110 111 112 /** 113 * Creates a new {@link EventDistributor}. 114 * 115 * @param knownObjects a mutable {@link Map} of Kubernetes resources 116 * that contains or will contain Kubernetes resources known to this 117 * {@link EventDistributor} and whatever mechanism (such as a {@link 118 * Controller}) is feeding it; may be {@code null} 119 * 120 * @see #EventDistributor(Map, Duration) 121 */ 122 public EventDistributor(final Map<Object, T> knownObjects) { 123 this(knownObjects, null); 124 } 125 126 /** 127 * Creates a new {@link EventDistributor}. 128 * 129 * @param knownObjects a mutable {@link Map} of Kubernetes resources 130 * that contains or will contain Kubernetes resources known to this 131 * {@link EventDistributor} and whatever mechanism (such as a {@link 132 * Controller}) is feeding it; may be {@code null} 133 * 134 * @param synchronizationInterval a {@link Duration} representing 135 * the interval after which an attempt to synchronize might happen; 136 * may be {@code null} in which case no synchronization will occur 137 * 138 * @see 139 * ResourceTrackingEventQueueConsumer#ResourceTrackingEventQueueConsumer(Map) 140 */ 141 public EventDistributor(final Map<Object, T> knownObjects, final Duration synchronizationInterval) { 142 super(knownObjects); 143 final ReadWriteLock lock = new ReentrantReadWriteLock(); 144 this.readLock = lock.readLock(); 145 this.writeLock = lock.writeLock(); 146 this.pumps = new ArrayList<>(); 147 this.synchronizingPumps = new ArrayList<>(); 148 this.synchronizationInterval = synchronizationInterval; 149 } 150 151 152 /* 153 * Instance methods. 154 */ 155 156 157 /** 158 * Adds the supplied {@link Consumer} to this {@link 159 * EventDistributor} as a listener that will be notified of each 160 * {@link AbstractEvent} this {@link EventDistributor} receives. 161 * 162 * <p>The supplied {@link Consumer}'s {@link 163 * Consumer#accept(Object)} method may be called later on a separate 164 * thread of execution.</p> 165 * 166 * @param consumer a {@link Consumer} of {@link AbstractEvent}s; may 167 * be {@code null} in which case no action will be taken 168 * 169 * @see #addConsumer(Consumer, Function) 170 * 171 * @see #removeConsumer(Consumer) 172 */ 173 public final void addConsumer(final Consumer<? super AbstractEvent<? extends T>> consumer) { 174 this.addConsumer(consumer, null); 175 } 176 177 /** 178 * Adds the supplied {@link Consumer} to this {@link 179 * EventDistributor} as a listener that will be notified of each 180 * {@link AbstractEvent} this {@link EventDistributor} receives. 181 * 182 * <p>The supplied {@link Consumer}'s {@link 183 * Consumer#accept(Object)} method may be called later on a separate 184 * thread of execution.</p> 185 * 186 * @param consumer a {@link Consumer} of {@link AbstractEvent}s; may 187 * be {@code null} in which case no action will be taken 188 * 189 * @param errorHandler a {@link Function} to handle any {@link 190 * Throwable}s encountered; may be {@code null} in which case a 191 * default error handler will be used instead 192 * 193 * @see #removeConsumer(Consumer) 194 */ 195 public final void addConsumer(final Consumer<? super AbstractEvent<? extends T>> consumer, final Function<? super Throwable, Boolean> errorHandler) { 196 if (consumer != null) { 197 this.writeLock.lock(); 198 try { 199 final Pump<T> pump = new Pump<>(this.synchronizationInterval, consumer, errorHandler); 200 pump.start(); 201 this.pumps.add(pump); 202 this.synchronizingPumps.add(pump); 203 } finally { 204 this.writeLock.unlock(); 205 } 206 } 207 } 208 209 /** 210 * Removes any {@link Consumer} {@linkplain Object#equals(Object) 211 * equal to} a {@link Consumer} previously {@linkplain 212 * #addConsumer(Consumer) added} to this {@link EventDistributor}. 213 * 214 * @param consumer the {@link Consumer} to remove; may be {@code 215 * null} in which case no action will be taken 216 * 217 * @see #addConsumer(Consumer) 218 */ 219 public final void removeConsumer(final Consumer<? super AbstractEvent<? extends T>> consumer) { 220 if (consumer != null) { 221 this.writeLock.lock(); 222 try { 223 final Iterator<? extends Pump<?>> iterator = this.pumps.iterator(); 224 assert iterator != null; 225 while (iterator.hasNext()) { 226 final Pump<?> pump = iterator.next(); 227 if (pump != null && consumer.equals(pump.getEventConsumer())) { 228 pump.close(); 229 iterator.remove(); 230 break; 231 } 232 } 233 } finally { 234 this.writeLock.unlock(); 235 } 236 } 237 } 238 239 /** 240 * Releases resources held by this {@link EventDistributor} during 241 * its execution. 242 */ 243 @Override 244 public final void close() { 245 this.writeLock.lock(); 246 try { 247 this.pumps.stream() 248 .forEach(pump -> { 249 pump.close(); 250 }); 251 this.synchronizingPumps.clear(); 252 this.pumps.clear(); 253 } finally { 254 this.writeLock.unlock(); 255 } 256 } 257 258 /** 259 * Returns {@code true} if this {@link EventDistributor} should 260 * <em>synchronize</em> with its upstream source. 261 * 262 * <h2>Design Notes</h2> 263 * 264 * <p>The Kubernetes {@code tools/cache} package spreads 265 * synchronization out among the reflector, controller, event cache 266 * and event processor constructs for no seemingly good reason. 267 * They should probably be consolidated, particularly in an 268 * object-oriented environment such as Java.</p> 269 * 270 * @return {@code true} if synchronization should occur; {@code 271 * false} otherwise 272 * 273 * @see EventCache#synchronize() 274 */ 275 public final boolean shouldSynchronize() { 276 boolean returnValue = false; 277 this.writeLock.lock(); 278 try { 279 this.synchronizingPumps.clear(); 280 final Instant now = Instant.now(); 281 this.pumps.stream() 282 .filter(pump -> pump.shouldSynchronize(now)) 283 .forEach(pump -> { 284 this.synchronizingPumps.add(pump); 285 pump.determineNextSynchronizationInterval(now); 286 }); 287 returnValue = !this.synchronizingPumps.isEmpty(); 288 } finally { 289 this.writeLock.unlock(); 290 } 291 return returnValue; 292 } 293 294 /** 295 * Consumes the supplied {@link AbstractEvent} by forwarding it to 296 * the {@link Consumer#accept(Object)} method of each {@link 297 * Consumer} {@linkplain #addConsumer(Consumer) registered} with 298 * this {@link EventDistributor}. 299 * 300 * @param event the {@link AbstractEvent} to forward; may be {@code 301 * null} in which case no action is taken 302 * 303 * @see #addConsumer(Consumer) 304 * 305 * @see ResourceTrackingEventQueueConsumer#accept(AbstractEvent) 306 */ 307 @Override 308 protected final void accept(final AbstractEvent<? extends T> event) { 309 if (event != null) { 310 if (event instanceof SynchronizationEvent) { 311 this.accept((SynchronizationEvent<? extends T>)event); 312 } else if (event instanceof Event) { 313 this.accept((Event<? extends T>)event); 314 } else { 315 assert false : "Unexpected event type: " + event.getClass(); 316 } 317 } 318 } 319 320 private final void accept(final SynchronizationEvent<? extends T> event) { 321 this.readLock.lock(); 322 try { 323 if (!this.synchronizingPumps.isEmpty()) { 324 this.synchronizingPumps.stream() 325 .forEach(pump -> pump.accept(event)); 326 } 327 } finally { 328 this.readLock.unlock(); 329 } 330 } 331 332 private final void accept(final Event<? extends T> event) { 333 this.readLock.lock(); 334 try { 335 if (!this.pumps.isEmpty()) { 336 this.pumps.stream() 337 .forEach(pump -> pump.accept(event)); 338 } 339 } finally { 340 this.readLock.unlock(); 341 } 342 } 343 344 345 /* 346 * Inner and nested classes. 347 */ 348 349 350 /** 351 * A {@link Consumer} of {@link AbstractEvent} instances that puts 352 * them on an internal queue and, in a separate thread, removes them 353 * from the queue and forwards them to the "real" {@link Consumer} 354 * supplied at construction time. 355 * 356 * <p>A {@link Pump} differs from a simple {@link Consumer} of 357 * {@link AbstractEvent} instances in that it has its own 358 * {@linkplain #getSynchronizationInterval() synchronization 359 * interval}, and interposes a blocking queue in between the 360 * reception of an {@link AbstractEvent} and its eventual broadcast.</p> 361 * 362 * @author <a href="https://about.me/lairdnelson" 363 * target="_parent">Laird Nelson</a> 364 */ 365 private static final class Pump<T extends HasMetadata> implements Consumer<AbstractEvent<? extends T>>, AutoCloseable { 366 367 private final Logger logger; 368 369 private final Consumer<? super AbstractEvent<? extends T>> eventConsumer; 370 371 private final Function<? super Throwable, Boolean> errorHandler; 372 373 private volatile boolean closing; 374 375 private volatile Instant nextSynchronizationInstant; 376 377 private volatile Duration synchronizationInterval; 378 379 @GuardedBy("this") 380 private ScheduledExecutorService executor; 381 382 @GuardedBy("this") 383 private Future<?> task; 384 385 private volatile Future<?> errorHandlingTask; 386 387 final BlockingQueue<AbstractEvent<? extends T>> queue; 388 389 private Pump(final Duration synchronizationInterval, final Consumer<? super AbstractEvent<? extends T>> eventConsumer) { 390 this(synchronizationInterval, eventConsumer, null); 391 } 392 393 private Pump(final Duration synchronizationInterval, final Consumer<? super AbstractEvent<? extends T>> eventConsumer, final Function<? super Throwable, Boolean> errorHandler) { 394 super(); 395 final String cn = this.getClass().getName(); 396 this.logger = Logger.getLogger(cn); 397 assert this.logger != null; 398 final String mn = "<init>"; 399 if (this.logger.isLoggable(Level.FINER)) { 400 this.logger.entering(cn, mn, new Object[] { synchronizationInterval, eventConsumer, errorHandler }); 401 } 402 403 // TODO: this should be extensible 404 this.queue = new LinkedBlockingQueue<>(); 405 this.eventConsumer = Objects.requireNonNull(eventConsumer); 406 if (errorHandler == null) { 407 this.errorHandler = t -> { 408 if (this.logger.isLoggable(Level.SEVERE)) { 409 this.logger.logp(Level.SEVERE, this.getClass().getName(), "<pumpTask>", t.getMessage(), t); 410 } 411 return true; 412 }; 413 } else { 414 this.errorHandler = errorHandler; 415 } 416 this.setSynchronizationInterval(synchronizationInterval); 417 418 if (this.logger.isLoggable(Level.FINER)) { 419 this.logger.exiting(cn, mn); 420 } 421 } 422 423 private final void start() { 424 final String cn = this.getClass().getName(); 425 final String mn = "start"; 426 if (this.logger.isLoggable(Level.FINER)) { 427 this.logger.entering(cn, mn); 428 } 429 430 synchronized (this) { 431 432 if (this.executor == null) { 433 assert this.task == null; 434 assert this.errorHandlingTask == null; 435 436 this.executor = this.createScheduledThreadPoolExecutor(); 437 if (this.executor == null) { 438 throw new IllegalStateException("createScheduledThreadPoolExecutor() == null"); 439 } 440 441 // Schedule a hopefully never-ending task to pump events from 442 // our queue to the supplied eventConsumer. We *schedule* this, 443 // even though it will never end, instead of simply *executing* 444 // it, so that if for any reason it exits (by definition an 445 // error case) it will get restarted. Cancelling a scheduled 446 // task will also cancel all resubmissions of it, so this is the 447 // most robust thing to do. The delay of one second is 448 // arbitrary. 449 this.task = this.executor.scheduleWithFixedDelay(() -> { 450 while (!Thread.currentThread().isInterrupted()) { 451 try { 452 this.getEventConsumer().accept(this.queue.take()); 453 } catch (final InterruptedException interruptedException) { 454 Thread.currentThread().interrupt(); 455 } catch (final RuntimeException runtimeException) { 456 if (!this.errorHandler.apply(runtimeException)) { 457 throw runtimeException; 458 } 459 } catch (final Error error) { 460 if (!this.errorHandler.apply(error)) { 461 throw error; 462 } 463 } 464 } 465 }, 0L, 1L, TimeUnit.SECONDS); 466 assert this.task != null; 467 468 this.errorHandlingTask = this.executor.submit(() -> { 469 try { 470 while (!Thread.currentThread().isInterrupted()) { 471 // The task is basically never-ending, so this will 472 // block too, unless there's an exception. That's 473 // the whole point. 474 this.task.get(); 475 } 476 } catch (final CancellationException ok) { 477 // The task was cancelled. Possibly redundantly, 478 // cancel it for sure. This is an expected and normal 479 // condition. 480 this.task.cancel(true); 481 } catch (final ExecutionException executionException) { 482 // The task encountered an exception while executing. 483 // Although we got an ExecutionException, the task is 484 // still in a non-cancelled state. We need to cancel 485 // it now to (potentially) have it removed from the 486 // executor queue. 487 this.task.cancel(true); 488 final Future<?> errorHandlingTask = this.errorHandlingTask; 489 if (errorHandlingTask != null) { 490 errorHandlingTask.cancel(true); // cancel ourselves, too! 491 } 492 // Apply the actual error-handling logic to the 493 // exception. 494 // TODO: This should have already been done by the 495 // task itself... 496 this.errorHandler.apply(executionException.getCause()); 497 } catch (final InterruptedException interruptedException) { 498 Thread.currentThread().interrupt(); 499 } 500 if (Thread.currentThread().isInterrupted()) { 501 // The current thread was interrupted, probably 502 // because everything is closing up shop. Cancel 503 // everything and go home. 504 this.task.cancel(true); 505 final Future<?> errorHandlingTask = this.errorHandlingTask; 506 if (errorHandlingTask != null) { 507 errorHandlingTask.cancel(true); // cancel ourselves, too! 508 } 509 } 510 }); 511 } 512 513 } 514 515 if (this.logger.isLoggable(Level.FINER)) { 516 this.logger.entering(cn, mn); 517 } 518 } 519 520 private final ScheduledExecutorService createScheduledThreadPoolExecutor() { 521 final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, new PumpThreadFactory()); 522 executor.setRemoveOnCancelPolicy(true); 523 return executor; 524 } 525 526 private final Consumer<? super AbstractEvent<? extends T>> getEventConsumer() { 527 return this.eventConsumer; 528 } 529 530 /** 531 * Adds the supplied {@link AbstractEvent} to an internal {@link 532 * BlockingQueue}. A task will have already been scheduled to 533 * consume it. 534 * 535 * @param event the {@link AbstractEvent} to add; may be {@code 536 * null} in which case no action is taken 537 */ 538 @Override 539 public final void accept(final AbstractEvent<? extends T> event) { 540 final String cn = this.getClass().getName(); 541 final String mn = "accept"; 542 if (this.logger.isLoggable(Level.FINER)) { 543 this.logger.entering(cn, mn, event); 544 } 545 if (this.closing) { 546 throw new IllegalStateException(); 547 } 548 if (event != null) { 549 final boolean added = this.queue.add(event); 550 assert added; 551 } 552 if (this.logger.isLoggable(Level.FINER)) { 553 this.logger.exiting(cn, mn); 554 } 555 } 556 557 @Override 558 public final void close() { 559 final String cn = this.getClass().getName(); 560 final String mn = "close"; 561 if (this.logger.isLoggable(Level.FINER)) { 562 this.logger.entering(cn, mn); 563 } 564 565 synchronized (this) { 566 if (!this.closing) { 567 try { 568 assert this.executor != null; 569 assert this.task != null; 570 assert this.errorHandlingTask != null; 571 this.closing = true; 572 573 // Stop accepting new tasks. 574 this.executor.shutdown(); 575 576 // Cancel our regular task. 577 this.task.cancel(true); 578 this.task = null; 579 580 // Cancel our task that surfaces errors from the regular task. 581 this.errorHandlingTask.cancel(true); 582 this.errorHandlingTask = null; 583 584 try { 585 // Wait for our executor to shut down normally, and shut 586 // it down forcibly if it doesn't. 587 if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) { 588 this.executor.shutdownNow(); 589 if (!this.executor.awaitTermination(60, TimeUnit.SECONDS)) { 590 if (this.logger.isLoggable(Level.WARNING)) { 591 this.logger.logp(Level.WARNING, cn, mn, "this.executor.awaitTermination() failed"); 592 } 593 } 594 } 595 } catch (final InterruptedException interruptedException) { 596 this.executor.shutdownNow(); 597 Thread.currentThread().interrupt(); 598 } 599 this.executor = null; 600 } finally { 601 this.closing = false; 602 } 603 } 604 } 605 606 if (this.logger.isLoggable(Level.FINER)) { 607 this.logger.exiting(cn, mn); 608 } 609 } 610 611 612 /* 613 * Synchronization-related methods. It seems odd that one of these 614 * listeners would need to report details about synchronization, but 615 * that's what the Go code does. Maybe this functionality could be 616 * relocated "higher up". 617 */ 618 619 620 private final boolean shouldSynchronize(final Instant now) { 621 final String cn = this.getClass().getName(); 622 final String mn = "shouldSynchronize"; 623 if (this.logger.isLoggable(Level.FINER)) { 624 this.logger.entering(cn, mn, now); 625 } 626 final boolean returnValue; 627 if (this.closing) { 628 returnValue = false; 629 } else { 630 final Duration interval = this.getSynchronizationInterval(); 631 if (interval == null || interval.isZero()) { 632 returnValue = false; 633 } else if (now == null) { 634 returnValue = Instant.now().compareTo(this.nextSynchronizationInstant) >= 0; 635 } else { 636 returnValue = now.compareTo(this.nextSynchronizationInstant) >= 0; 637 } 638 } 639 if (this.logger.isLoggable(Level.FINER)) { 640 this.logger.exiting(cn, mn, Boolean.valueOf(returnValue)); 641 } 642 return returnValue; 643 } 644 645 private final void determineNextSynchronizationInterval(final Instant now) { 646 final String cn = this.getClass().getName(); 647 final String mn = "determineNextSynchronizationInterval"; 648 if (this.logger.isLoggable(Level.FINER)) { 649 this.logger.entering(cn, mn, now); 650 } 651 final Duration synchronizationInterval = this.getSynchronizationInterval(); 652 if (synchronizationInterval == null) { 653 if (now == null) { 654 this.nextSynchronizationInstant = Instant.now(); 655 } else { 656 this.nextSynchronizationInstant = now; 657 } 658 } else if (now == null) { 659 this.nextSynchronizationInstant = Instant.now().plus(synchronizationInterval); 660 } else { 661 this.nextSynchronizationInstant = now.plus(synchronizationInterval); 662 } 663 if (this.logger.isLoggable(Level.FINER)) { 664 this.logger.entering(cn, mn); 665 } 666 } 667 668 public final void setSynchronizationInterval(final Duration synchronizationInterval) { 669 this.synchronizationInterval = synchronizationInterval; 670 } 671 672 public final Duration getSynchronizationInterval() { 673 return this.synchronizationInterval; 674 } 675 676 677 /* 678 * Inner and nested classes. 679 */ 680 681 682 /** 683 * A {@link ThreadFactory} that {@linkplain #newThread(Runnable) 684 * produces new <code>Thread</code>s} with sane names. 685 * 686 * @author <a href="https://about.me/lairdnelson" 687 * target="_parent">Laird Nelson</a> 688 */ 689 private static final class PumpThreadFactory implements ThreadFactory { 690 691 private final ThreadGroup group; 692 693 private final AtomicInteger threadNumber = new AtomicInteger(1); 694 695 private PumpThreadFactory() { 696 final SecurityManager s = System.getSecurityManager(); 697 if (s == null) { 698 this.group = Thread.currentThread().getThreadGroup(); 699 } else { 700 this.group = s.getThreadGroup(); 701 } 702 } 703 704 @Override 705 public final Thread newThread(final Runnable runnable) { 706 final Thread returnValue = new Thread(this.group, runnable, "event-pump-thread-" + this.threadNumber.getAndIncrement(), 0); 707 if (returnValue.isDaemon()) { 708 returnValue.setDaemon(false); 709 } 710 if (returnValue.getPriority() != Thread.NORM_PRIORITY) { 711 returnValue.setPriority(Thread.NORM_PRIORITY); 712 } 713 return returnValue; 714 } 715 } 716 717 } 718 719}