001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*- 002 * 003 * Copyright © 2017-2018 microBean. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 014 * implied. See the License for the specific language governing 015 * permissions and limitations under the License. 016 */ 017package org.microbean.kubernetes.controller; 018 019import java.io.Closeable; 020import java.io.IOException; 021 022import java.time.Duration; 023 024import java.util.Map; 025import java.util.Objects; 026 027import java.util.concurrent.Future; 028import java.util.concurrent.ScheduledExecutorService; 029import java.util.concurrent.TimeUnit; 030 031import java.util.function.Consumer; 032import java.util.function.Function; 033 034import java.util.logging.Level; 035import java.util.logging.Logger; 036 037import io.fabric8.kubernetes.api.model.HasMetadata; 038import io.fabric8.kubernetes.api.model.KubernetesResourceList; 039 040import io.fabric8.kubernetes.client.KubernetesClientException; // for javadoc only 041import io.fabric8.kubernetes.client.Watcher; 042 043import io.fabric8.kubernetes.client.dsl.Listable; 044import io.fabric8.kubernetes.client.dsl.VersionWatchable; 045 046import net.jcip.annotations.Immutable; 047import net.jcip.annotations.ThreadSafe; 048 049import org.microbean.development.annotation.Blocking; 050import org.microbean.development.annotation.NonBlocking; 051 052/** 053 * A convenient combination of a {@link Reflector}, a {@link 054 * VersionWatchable} and {@link Listable} implementation, an 055 * (internal) {@link EventQueueCollection}, a {@link Map} of known 056 * Kubernetes resources and an {@link EventQueue} {@link Consumer} 057 * that {@linkplain Reflector#start() mirrors Kubernetes cluster 058 * events} into a {@linkplain EventQueueCollection collection of 059 * <code>EventQueue</code>s} and {@linkplain 060 * EventQueueCollection#start(Consumer) arranges for their consumption 061 * and processing}. 062 * 063 * <p>{@linkplain #start() Starting} a {@link Controller} {@linkplain 064 * EventQueueCollection#start(Consumer) starts the 065 * <code>Consumer</code>} supplied at construction time, and 066 * {@linkplain Reflector#start() starts the embedded 067 * <code>Reflector</code>}. {@linkplain #close() Closing} a {@link 068 * Controller} {@linkplain Reflector#close() closes its embedded 069 * <code>Reflector</code>} and {@linkplain 070 * EventQueueCollection#close() causes the <code>Consumer</code> 071 * supplied at construction time to stop receiving 072 * <code>Event</code>s}.</p> 073 * 074 * <p>Several {@code protected} methods in this class exist to make 075 * customization easier; none require overriding and their default 076 * behavior is usually just fine.</p> 077 * 078 * <h2>Thread Safety</h2> 079 * 080 * <p>Instances of this class are safe for concurrent use by multiple 081 * threads.</p> 082 * 083 * <h2>Design Notes</h2> 084 * 085 * <p>This class loosely models a combination of a <a 086 * href="https://github.com/kubernetes/client-go/blob/79cb21f5b3b1dd8f8b23bd3f79925b4fda4e2562/tools/cache/controller.go#L82-L86">{@code 087 * Controller} type</a> and a <a 088 * href="https://github.com/kubernetes/client-go/blob/79cb21f5b3b1dd8f8b23bd3f79925b4fda4e2562/tools/cache/shared_informer.go#L66-L71">{@code 089 * SharedIndexInformer} type</a> as found in <a 090 * href="https://github.com/kubernetes/client-go/blob/master/tools/cache/controller.go">{@code 091 * controller.go}</a> and <a 092 * href="https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go">{@code 093 * shared_informer.go}</a> respectively.</p> 094 * 095 * @param <T> a Kubernetes resource type 096 * 097 * @author <a href="https://about.me/lairdnelson" 098 * target="_parent">Laird Nelson</a> 099 * 100 * @see Reflector 101 * 102 * @see EventQueueCollection 103 * 104 * @see ResourceTrackingEventQueueConsumer 105 * 106 * @see #start() 107 * 108 * @see #close() 109 */ 110@Immutable 111@ThreadSafe 112public class Controller<T extends HasMetadata> implements Closeable { 113 114 115 /* 116 * Instance fields. 117 */ 118 119 120 /** 121 * A {@link Logger} used by this {@link Controller}. 122 * 123 * <p>This field is never {@code null}.</p> 124 * 125 * @see #createLogger() 126 */ 127 protected final Logger logger; 128 129 /** 130 * The {@link Reflector} used by this {@link Controller} to mirror 131 * Kubernetes events. 132 * 133 * <p>This field is never {@code null}.</p> 134 */ 135 private final Reflector<T> reflector; 136 137 /** 138 * The {@link EventQueueCollection} used by the {@link #reflector 139 * Reflector} and by the {@link Consumer} supplied at construction 140 * time. 141 * 142 * <p>This field is never {@code null}.</p> 143 * 144 * @see EventQueueCollection#add(Object, AbstractEvent.Type, 145 * HasMetadata) 146 * 147 * @see EventQueueCollection#replace(Collection, Object) 148 * 149 * @see EventQueueCollection#synchronize() 150 * 151 * @see EventQueueCollection#start(Consumer) 152 */ 153 private final EventQueueCollection<T> eventQueueCollection; 154 155 private final EventQueueCollection.SynchronizationAwaitingPropertyChangeListener synchronizationAwaiter; 156 157 /** 158 * A {@link Consumer} of {@link EventQueue}s that processes {@link 159 * Event}s produced, ultimately, by the {@link #reflector 160 * Reflector}. 161 * 162 * <p>This field is never {@code null}.</p> 163 */ 164 private final Consumer<? super EventQueue<? extends T>> eventQueueConsumer; 165 166 167 /* 168 * Constructors. 169 */ 170 171 172 /** 173 * Creates a new {@link Controller} but does not {@linkplain 174 * #start() start it}. 175 * 176 * @param <X> a {@link Listable} and {@link VersionWatchable} that 177 * will be used by the embedded {@link Reflector}; must not be 178 * {@code null} 179 * 180 * @param operation a {@link Listable} and a {@link 181 * VersionWatchable} that produces Kubernetes events; must not be 182 * {@code null} 183 * 184 * @param eventQueueConsumer the {@link Consumer} that will process 185 * each {@link EventQueue} as it becomes ready; must not be {@code 186 * null} 187 * 188 * @exception NullPointerException if {@code operation} or {@code 189 * eventQueueConsumer} is {@code null} 190 * 191 * @see #Controller(Listable, ScheduledExecutorService, Duration, 192 * Map, Consumer) 193 * 194 * @see #start() 195 */ 196 @SuppressWarnings("rawtypes") 197 public <X extends Listable<? extends KubernetesResourceList> 198 & VersionWatchable<? extends Closeable, 199 Watcher<T>>> Controller(final X operation, 200 final Consumer<? super EventQueue<? extends T>> eventQueueConsumer) { 201 this(operation, null, null, null, eventQueueConsumer); 202 } 203 204 /** 205 * Creates a new {@link Controller} but does not {@linkplain 206 * #start() start it}. 207 * 208 * @param <X> a {@link Listable} and {@link VersionWatchable} that 209 * will be used by the embedded {@link Reflector}; must not be 210 * {@code null} 211 * 212 * @param operation a {@link Listable} and a {@link 213 * VersionWatchable} that produces Kubernetes events; must not be 214 * {@code null} 215 * 216 * @param knownObjects a {@link Map} containing the last known state 217 * of Kubernetes resources the embedded {@link EventQueueCollection} 218 * is caching events for; may be {@code null} if this {@link 219 * Controller} is not interested in tracking deletions of objects; 220 * if non-{@code null} <strong>will be synchronized on by this 221 * class</strong> during retrieval and traversal operations 222 * 223 * @param eventQueueConsumer the {@link Consumer} that will process 224 * each {@link EventQueue} as it becomes ready; must not be {@code 225 * null} 226 * 227 * @exception NullPointerException if {@code operation} or {@code 228 * eventQueueConsumer} is {@code null} 229 * 230 * @see #Controller(Listable, ScheduledExecutorService, Duration, 231 * Map, Consumer) 232 * 233 * @see #start() 234 */ 235 @SuppressWarnings("rawtypes") 236 public <X extends Listable<? extends KubernetesResourceList> 237 & VersionWatchable<? extends Closeable, 238 Watcher<T>>> Controller(final X operation, 239 final Map<Object, T> knownObjects, 240 final Consumer<? super EventQueue<? extends T>> eventQueueConsumer) { 241 this(operation, null, null, knownObjects, eventQueueConsumer); 242 } 243 244 /** 245 * Creates a new {@link Controller} but does not {@linkplain 246 * #start() start it}. 247 * 248 * @param <X> a {@link Listable} and {@link VersionWatchable} that 249 * will be used by the embedded {@link Reflector}; must not be 250 * {@code null} 251 * 252 * @param operation a {@link Listable} and a {@link 253 * VersionWatchable} that produces Kubernetes events; must not be 254 * {@code null} 255 * 256 * @param synchronizationInterval a {@link Duration} representing 257 * the time in between one {@linkplain EventCache#synchronize() 258 * synchronization operation} and another; may be {@code null} in 259 * which case no synchronization will occur 260 * 261 * @param eventQueueConsumer the {@link Consumer} that will process 262 * each {@link EventQueue} as it becomes ready; must not be {@code 263 * null} 264 * 265 * @exception NullPointerException if {@code operation} or {@code 266 * eventQueueConsumer} is {@code null} 267 * 268 * @see #Controller(Listable, ScheduledExecutorService, Duration, 269 * Map, Consumer) 270 * 271 * @see #start() 272 */ 273 @SuppressWarnings("rawtypes") 274 public <X extends Listable<? extends KubernetesResourceList> 275 & VersionWatchable<? extends Closeable, 276 Watcher<T>>> Controller(final X operation, 277 final Duration synchronizationInterval, 278 final Consumer<? super EventQueue<? extends T>> eventQueueConsumer) { 279 this(operation, null, synchronizationInterval, null, eventQueueConsumer); 280 } 281 282 /** 283 * Creates a new {@link Controller} but does not {@linkplain 284 * #start() start it}. 285 * 286 * @param <X> a {@link Listable} and {@link VersionWatchable} that 287 * will be used by the embedded {@link Reflector}; must not be 288 * {@code null} 289 * 290 * @param operation a {@link Listable} and a {@link 291 * VersionWatchable} that produces Kubernetes events; must not be 292 * {@code null} 293 * 294 * @param synchronizationInterval a {@link Duration} representing 295 * the time in between one {@linkplain EventCache#synchronize() 296 * synchronization operation} and another; may be {@code null} in 297 * which case no synchronization will occur 298 * 299 * @param knownObjects a {@link Map} containing the last known state 300 * of Kubernetes resources the embedded {@link EventQueueCollection} 301 * is caching events for; may be {@code null} if this {@link 302 * Controller} is not interested in tracking deletions of objects; 303 * if non-{@code null} <strong>will be synchronized on by this 304 * class</strong> during retrieval and traversal operations 305 * 306 * @param eventQueueConsumer the {@link Consumer} that will process 307 * each {@link EventQueue} as it becomes ready; must not be {@code 308 * null} 309 * 310 * @exception NullPointerException if {@code operation} or {@code 311 * eventQueueConsumer} is {@code null} 312 * 313 * @see #Controller(Listable, ScheduledExecutorService, Duration, 314 * Map, Consumer) 315 * 316 * @see #start() 317 */ 318 @SuppressWarnings("rawtypes") 319 public <X extends Listable<? extends KubernetesResourceList> 320 & VersionWatchable<? extends Closeable, 321 Watcher<T>>> Controller(final X operation, 322 final Duration synchronizationInterval, 323 final Map<Object, T> knownObjects, 324 final Consumer<? super EventQueue<? extends T>> eventQueueConsumer) { 325 this(operation, null, synchronizationInterval, knownObjects, eventQueueConsumer); 326 } 327 328 /** 329 * Creates a new {@link Controller} but does not {@linkplain 330 * #start() start it}. 331 * 332 * @param <X> a {@link Listable} and {@link VersionWatchable} that 333 * will be used by the embedded {@link Reflector}; must not be 334 * {@code null} 335 * 336 * @param operation a {@link Listable} and a {@link 337 * VersionWatchable} that produces Kubernetes events; must not be 338 * {@code null} 339 * 340 * @param synchronizationExecutorService the {@link 341 * ScheduledExecutorService} that will be passed to the {@link 342 * Reflector} constructor; may be {@code null} in which case a 343 * default {@link ScheduledExecutorService} may be used instead 344 * 345 * @param synchronizationInterval a {@link Duration} representing 346 * the time in between one {@linkplain EventCache#synchronize() 347 * synchronization operation} and another; may be {@code null} in 348 * which case no synchronization will occur 349 * 350 * @param knownObjects a {@link Map} containing the last known state 351 * of Kubernetes resources the embedded {@link EventQueueCollection} 352 * is caching events for; may be {@code null} if this {@link 353 * Controller} is not interested in tracking deletions of objects; 354 * if non-{@code null} <strong>will be synchronized on by this 355 * class</strong> during retrieval and traversal operations 356 * 357 * @param eventQueueConsumer the {@link Consumer} that will process 358 * each {@link EventQueue} as it becomes ready; must not be {@code 359 * null} 360 * 361 * @exception NullPointerException if {@code operation} or {@code 362 * eventQueueConsumer} is {@code null} 363 * 364 * @see #start() 365 */ 366 @SuppressWarnings("rawtypes") 367 public <X extends Listable<? extends KubernetesResourceList> 368 & VersionWatchable<? extends Closeable, 369 Watcher<T>>> Controller(final X operation, 370 final ScheduledExecutorService synchronizationExecutorService, 371 final Duration synchronizationInterval, 372 final Map<Object, T> knownObjects, 373 final Consumer<? super EventQueue<? extends T>> eventQueueConsumer) { 374 this(operation, synchronizationExecutorService, synchronizationInterval, null, knownObjects, eventQueueConsumer); 375 } 376 377 /** 378 * Creates a new {@link Controller} but does not {@linkplain 379 * #start() start it}. 380 * 381 * @param <X> a {@link Listable} and {@link VersionWatchable} that 382 * will be used by the embedded {@link Reflector}; must not be 383 * {@code null} 384 * 385 * @param operation a {@link Listable} and a {@link 386 * VersionWatchable} that produces Kubernetes events; must not be 387 * {@code null} 388 * 389 * @param synchronizationExecutorService the {@link 390 * ScheduledExecutorService} that will be passed to the {@link 391 * Reflector} constructor; may be {@code null} in which case a 392 * default {@link ScheduledExecutorService} may be used instead 393 * 394 * @param synchronizationInterval a {@link Duration} representing 395 * the time in between one {@linkplain EventCache#synchronize() 396 * synchronization operation} and another; may be {@code null} in 397 * which case no synchronization will occur 398 * 399 * @param errorHandler a {@link Function} that accepts a {@link 400 * Throwable} and returns a {@link Boolean} indicating whether the 401 * error was handled or not; used to handle truly unanticipated 402 * errors from within a {@link ScheduledExecutorService} used 403 * during {@linkplain EventCache#synchronize() synchronization} and 404 * event consumption activities; may be {@code null} 405 * 406 * @param knownObjects a {@link Map} containing the last known state 407 * of Kubernetes resources the embedded {@link EventQueueCollection} 408 * is caching events for; may be {@code null} if this {@link 409 * Controller} is not interested in tracking deletions of objects; 410 * if non-{@code null} <strong>will be synchronized on by this 411 * class</strong> during retrieval and traversal operations 412 * 413 * @param eventQueueConsumer the {@link Consumer} that will process 414 * each {@link EventQueue} as it becomes ready; must not be {@code 415 * null} 416 * 417 * @exception NullPointerException if {@code operation} or {@code 418 * eventQueueConsumer} is {@code null} 419 * 420 * @see #start() 421 */ 422 @SuppressWarnings("rawtypes") 423 public <X extends Listable<? extends KubernetesResourceList> 424 & VersionWatchable<? extends Closeable, 425 Watcher<T>>> Controller(final X operation, 426 final ScheduledExecutorService synchronizationExecutorService, 427 final Duration synchronizationInterval, 428 final Function<? super Throwable, Boolean> errorHandler, 429 final Map<Object, T> knownObjects, 430 final Consumer<? super EventQueue<? extends T>> eventQueueConsumer) { 431 super(); 432 this.logger = this.createLogger(); 433 if (this.logger == null) { 434 throw new IllegalStateException("createLogger() == null"); 435 } 436 final String cn = this.getClass().getName(); 437 final String mn = "<init>"; 438 if (this.logger.isLoggable(Level.FINER)) { 439 this.logger.entering(cn, mn, new Object[] { operation, synchronizationExecutorService, synchronizationInterval, errorHandler, knownObjects, eventQueueConsumer }); 440 } 441 this.eventQueueConsumer = Objects.requireNonNull(eventQueueConsumer); 442 this.eventQueueCollection = new ControllerEventQueueCollection(knownObjects, errorHandler, 16, 0.75f); 443 this.synchronizationAwaiter = new EventQueueCollection.SynchronizationAwaitingPropertyChangeListener(); 444 this.eventQueueCollection.addPropertyChangeListener(this.synchronizationAwaiter); 445 this.reflector = new ControllerReflector(operation, synchronizationExecutorService, synchronizationInterval, errorHandler); 446 if (this.logger.isLoggable(Level.FINER)) { 447 this.logger.exiting(cn, mn); 448 } 449 } 450 451 452 /* 453 * Instance methods. 454 */ 455 456 457 /** 458 * Returns a {@link Logger} for use by this {@link Controller}. 459 * 460 * <p>This method never returns {@code null}.</p> 461 * 462 * <p>Overrides of this method must not return {@code null}.</p> 463 * 464 * @return a non-{@code null} {@link Logger} 465 */ 466 protected Logger createLogger() { 467 return Logger.getLogger(this.getClass().getName()); 468 } 469 470 /** 471 * Blocks until the {@link EventQueueCollection} affiliated with 472 * this {@link Controller} {@linkplain 473 * EventQueueCollection#isSynchronized() has synchronized}. 474 * 475 * @exception InterruptedException if the current {@link Thread} was 476 * interrupted 477 */ 478 @Blocking 479 public final void awaitEventCacheSynchronization() throws InterruptedException { 480 this.synchronizationAwaiter.await(); 481 } 482 483 /** 484 * Blocks for the desired amount of time until the {@link 485 * EventQueueCollection} affiliated with this {@link Controller} 486 * {@linkplain EventQueueCollection#isSynchronized() has 487 * synchronized} or the amount of time has elapsed. 488 * 489 * @param timeout the amount of time to wait 490 * 491 * @param timeUnit the {@link TimeUnit} designating the amount of 492 * time to wait; must not be {@code null} 493 * 494 * @return {@code false} if the waiting time elapsed before the 495 * event cache synchronized; {@code true} otherwise 496 * 497 * @exception InterruptedException if the current {@link Thread} was 498 * interrupted 499 * 500 * @exception NullPointerException if {@code timeUnit} is {@code 501 * null} 502 * 503 * @see EventQueueCollection.SynchronizationAwaitingPropertyChangeListener 504 */ 505 @Blocking 506 public final boolean awaitEventCacheSynchronization(final long timeout, final TimeUnit timeUnit) throws InterruptedException { 507 return this.synchronizationAwaiter.await(timeout, timeUnit); 508 } 509 510 /** 511 * {@linkplain EventQueueCollection#start(Consumer) Starts the 512 * embedded <code>EventQueueCollection</code> consumption machinery} 513 * and then {@linkplain Reflector#start() starts the embedded 514 * <code>Reflector</code>}. 515 * 516 * @exception IOException if {@link Reflector#start()} throws an 517 * {@link IOException} 518 * 519 * @exception KubernetesClientException if the {@linkplain Reflector 520 * embedded <code>Reflector</code>} could not be started 521 * 522 * @see EventQueueCollection#start(Consumer) 523 * 524 * @see Reflector#start() 525 */ 526 @NonBlocking 527 public final void start() throws IOException { 528 final String cn = this.getClass().getName(); 529 final String mn = "start"; 530 if (this.logger.isLoggable(Level.FINER)) { 531 this.logger.entering(cn, mn); 532 } 533 534 // Start the consumer that is going to drain our associated 535 // EventQueueCollection. 536 if (this.logger.isLoggable(Level.INFO)) { 537 this.logger.logp(Level.INFO, cn, mn, "Starting {0}", this.eventQueueConsumer); 538 } 539 final Future<?> eventQueueConsumerTask = this.eventQueueCollection.start(this.eventQueueConsumer); 540 assert eventQueueConsumerTask != null; 541 542 // Start the Reflector--the machinery that is going to connect to 543 // Kubernetes and "reflect" its (relevant) contents into the 544 // EventQueueCollection. 545 if (this.logger.isLoggable(Level.INFO)) { 546 this.logger.logp(Level.INFO, cn, mn, "Starting {0}", this.reflector); 547 } 548 try { 549 this.reflector.start(); 550 } catch (final IOException | RuntimeException | Error reflectorStartFailure) { 551 try { 552 // TODO: this is problematic, I think; reflector.close() means 553 // that (potentially) it will never be able to restart it. 554 // The Go code appears to make some feints in the direction of 555 // restartability, and then just basically gives up. I think 556 // we can do better here. 557 this.reflector.close(); 558 } catch (final Throwable suppressMe) { 559 reflectorStartFailure.addSuppressed(suppressMe); 560 } 561 eventQueueConsumerTask.cancel(true); 562 assert eventQueueConsumerTask.isDone(); 563 try { 564 this.eventQueueCollection.close(); 565 } catch (final Throwable suppressMe) { 566 reflectorStartFailure.addSuppressed(suppressMe); 567 } 568 throw reflectorStartFailure; 569 } 570 571 if (this.logger.isLoggable(Level.FINER)) { 572 this.logger.exiting(cn, mn); 573 } 574 } 575 576 /** 577 * {@linkplain Reflector#close() Closes the embedded 578 * <code>Reflector</code>} and then {@linkplain 579 * EventQueueCollection#close() closes the embedded 580 * <code>EventQueueCollection</code>}, handling exceptions 581 * appropriately. 582 * 583 * @exception IOException if the {@link Reflector} could not 584 * {@linkplain Reflector#close() close} properly 585 * 586 * @see Reflector#close() 587 * 588 * @see EventQueueCollection#close() 589 */ 590 @Override 591 public final void close() throws IOException { 592 final String cn = this.getClass().getName(); 593 final String mn = "close"; 594 if (this.logger.isLoggable(Level.FINER)) { 595 this.logger.entering(cn, mn); 596 } 597 Exception throwMe = null; 598 try { 599 if (this.logger.isLoggable(Level.INFO)) { 600 this.logger.logp(Level.INFO, cn, mn, "Closing {0}", this.reflector); 601 } 602 this.reflector.close(); 603 } catch (final Exception everything) { 604 throwMe = everything; 605 } 606 607 try { 608 if (this.logger.isLoggable(Level.INFO)) { 609 this.logger.logp(Level.INFO, cn, mn, "Closing {0}", this.eventQueueCollection); 610 } 611 this.eventQueueCollection.close(); 612 } catch (final RuntimeException | Error runtimeException) { 613 if (throwMe == null) { 614 throw runtimeException; 615 } 616 throwMe.addSuppressed(runtimeException); 617 } 618 619 if (throwMe instanceof IOException) { 620 throw (IOException)throwMe; 621 } else if (throwMe instanceof RuntimeException) { 622 throw (RuntimeException)throwMe; 623 } else if (throwMe != null) { 624 throw new IllegalStateException(throwMe.getMessage(), throwMe); 625 } 626 627 if (this.logger.isLoggable(Level.FINER)) { 628 this.logger.exiting(cn, mn); 629 } 630 } 631 632 /** 633 * Returns if the embedded {@link Reflector} should {@linkplain 634 * Reflector#shouldSynchronize() synchronize}. 635 * 636 * <p>This implementation returns {@code true}.</p> 637 * 638 * @return {@code true} if the embedded {@link Reflector} should 639 * {@linkplain Reflector#shouldSynchronize() synchronize}; {@code 640 * false} otherwise 641 */ 642 protected boolean shouldSynchronize() { 643 final String cn = this.getClass().getName(); 644 final String mn = "shouldSynchronize"; 645 if (this.logger.isLoggable(Level.FINER)) { 646 this.logger.entering(cn, mn); 647 } 648 final boolean returnValue = true; 649 if (this.logger.isLoggable(Level.FINER)) { 650 this.logger.exiting(cn, mn, Boolean.valueOf(returnValue)); 651 } 652 return returnValue; 653 } 654 655 /** 656 * Invoked after the embedded {@link Reflector} {@linkplain 657 * Reflector#onClose() closes}. 658 * 659 * <p>This implementation does nothing.</p> 660 * 661 * @see Reflector#close() 662 * 663 * @see Reflector#onClose() 664 */ 665 protected void onClose() { 666 667 } 668 669 /** 670 * Returns a key that can be used to identify the supplied {@link 671 * HasMetadata}. 672 * 673 * <p>This method never returns {@code null}.</p> 674 * 675 * <p>Overrides of this method must not return {@code null}.</p> 676 * 677 * <p>The default implementation of this method returns the return 678 * value of invoking the {@link HasMetadatas#getKey(HasMetadata)} 679 * method.</p> 680 * 681 * @param resource the Kubernetes resource for which a key is 682 * desired; must not be {@code null} 683 * 684 * @return a non-{@code null} key for the supplied {@link 685 * HasMetadata} 686 * 687 * @exception NullPointerException if {@code resource} is {@code 688 * null} 689 */ 690 protected Object getKey(final T resource) { 691 final String cn = this.getClass().getName(); 692 final String mn = "getKey"; 693 if (this.logger.isLoggable(Level.FINER)) { 694 this.logger.entering(cn, mn, resource); 695 } 696 final Object returnValue = HasMetadatas.getKey(Objects.requireNonNull(resource)); 697 if (this.logger.isLoggable(Level.FINER)) { 698 this.logger.exiting(cn, mn, returnValue); 699 } 700 return returnValue; 701 } 702 703 /** 704 * Creates a new {@link Event} when invoked. 705 * 706 * <p>This method never returns {@code null}.</p> 707 * 708 * <p>Overrides of this method must not return {@code null}.</p> 709 * 710 * <p>Overrides of this method must return a new {@link Event} or 711 * subclass with each invocation.</p> 712 * 713 * @param source the source of the new {@link Event}; must not be 714 * {@code null} 715 * 716 * @param eventType the {@link Event.Type} for the new {@link 717 * Event}; must not be {@code null} 718 * 719 * @param resource the {@link HasMetadata} that the new {@link 720 * Event} concerns; must not be {@code null} 721 * 722 * @return a new, non-{@code null} {@link Event} 723 * 724 * @exception NullPointerException if any of the parameters is 725 * {@code null} 726 */ 727 protected Event<T> createEvent(final Object source, final Event.Type eventType, final T resource) { 728 final String cn = this.getClass().getName(); 729 final String mn = "createEvent"; 730 if (this.logger.isLoggable(Level.FINER)) { 731 this.logger.entering(cn, mn, new Object[] { source, eventType, resource }); 732 } 733 final Event<T> returnValue = new Event<>(Objects.requireNonNull(source), Objects.requireNonNull(eventType), null, Objects.requireNonNull(resource)); 734 if (this.logger.isLoggable(Level.FINER)) { 735 this.logger.exiting(cn, mn, returnValue); 736 } 737 return returnValue; 738 } 739 740 /** 741 * Creates a new {@link EventQueue} when invoked. 742 * 743 * <p>This method never returns {@code null}.</p> 744 * 745 * <p>Overrides of this method must not return {@code null}.</p> 746 * 747 * <p>Overrides of this method must return a new {@link EventQueue} 748 * or subclass with each invocation.</p> 749 * 750 * @param key the key to create the new {@link EventQueue} with; 751 * must not be {@code null} 752 * 753 * @return a new, non-{@code null} {@link EventQueue} 754 * 755 * @exception NullPointerException if {@code key} is {@code null} 756 */ 757 protected EventQueue<T> createEventQueue(final Object key) { 758 final String cn = this.getClass().getName(); 759 final String mn = "createEventQueue"; 760 if (this.logger.isLoggable(Level.FINER)) { 761 this.logger.entering(cn, mn, key); 762 } 763 final EventQueue<T> returnValue = new EventQueue<>(key); 764 if (this.logger.isLoggable(Level.FINER)) { 765 this.logger.exiting(cn, mn, returnValue); 766 } 767 return returnValue; 768 } 769 770 771 /* 772 * Inner and nested classes. 773 */ 774 775 776 /** 777 * An {@link EventQueueCollection} that delegates its overridable 778 * methods to their equivalents in the {@link Controller} class. 779 * 780 * @author <a href="https://about.me/lairdnelson" 781 * target="_parent">Laird Nelson</a> 782 * 783 * @see EventQueueCollection 784 * 785 * @see EventCache 786 */ 787 private final class ControllerEventQueueCollection extends EventQueueCollection<T> { 788 789 790 /* 791 * Constructors. 792 */ 793 794 795 private ControllerEventQueueCollection(final Map<?, ? extends T> knownObjects, 796 final Function<? super Throwable, Boolean> errorHandler, 797 final int initialCapacity, 798 final float loadFactor) { 799 super(knownObjects, errorHandler, initialCapacity, loadFactor); 800 } 801 802 803 /* 804 * Instance methods. 805 */ 806 807 808 @Override 809 protected final Event<T> createEvent(final Object source, final Event.Type eventType, final T resource) { 810 return Controller.this.createEvent(source, eventType, resource); 811 } 812 813 @Override 814 protected final EventQueue<T> createEventQueue(final Object key) { 815 return Controller.this.createEventQueue(key); 816 } 817 818 @Override 819 protected final Object getKey(final T resource) { 820 return Controller.this.getKey(resource); 821 } 822 823 } 824 825 826 /** 827 * A {@link Reflector} that delegates its overridable 828 * methods to their equivalents in the {@link Controller} class. 829 * 830 * @author <a href="https://about.me/lairdnelson" 831 * target="_parent">Laird Nelson</a> 832 * 833 * @see Reflector 834 */ 835 private final class ControllerReflector extends Reflector<T> { 836 837 838 /* 839 * Constructors. 840 */ 841 842 843 @SuppressWarnings("rawtypes") 844 private <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<T>>> ControllerReflector(final X operation, 845 final ScheduledExecutorService synchronizationExecutorService, 846 final Duration synchronizationInterval, final Function<? super Throwable, Boolean> synchronizationErrorHandler) { 847 super(operation, Controller.this.eventQueueCollection, synchronizationExecutorService, synchronizationInterval, synchronizationErrorHandler); 848 } 849 850 851 /* 852 * Instance methods. 853 */ 854 855 856 /** 857 * Invokes the {@link Controller#shouldSynchronize()} method and 858 * returns its result. 859 * 860 * @return the result of invoking the {@link 861 * Controller#shouldSynchronize()} method 862 * 863 * @see Controller#shouldSynchronize() 864 */ 865 @Override 866 protected final boolean shouldSynchronize() { 867 return Controller.this.shouldSynchronize(); 868 } 869 870 /** 871 * Invokes the {@link Controller#onClose()} method. 872 * 873 * @see Controller#onClose() 874 */ 875 @Override 876 protected final void onClose() { 877 Controller.this.onClose(); 878 } 879 } 880 881}