001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*- 002 * 003 * Copyright © 2017-2018 microBean. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 014 * implied. See the License for the specific language governing 015 * permissions and limitations under the License. 016 */ 017package org.microbean.kubernetes.controller; 018 019import java.io.Closeable; 020import java.io.IOException; 021 022import java.lang.reflect.Field; 023import java.lang.reflect.Method; 024 025import java.time.Duration; 026 027import java.time.temporal.ChronoUnit; 028 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.Objects; 033import java.util.Map; 034 035import java.util.concurrent.ExecutionException; 036import java.util.concurrent.Executors; 037import java.util.concurrent.Future; 038import java.util.concurrent.FutureTask; 039import java.util.concurrent.ScheduledExecutorService; 040import java.util.concurrent.ScheduledFuture; 041import java.util.concurrent.ScheduledThreadPoolExecutor; 042import java.util.concurrent.TimeUnit; 043 044import java.util.function.Function; 045 046import java.util.logging.Level; 047import java.util.logging.Logger; 048 049import io.fabric8.kubernetes.client.DefaultKubernetesClient; // for javadoc only 050import io.fabric8.kubernetes.client.KubernetesClientException; 051import io.fabric8.kubernetes.client.Watch; // for javadoc only 052import io.fabric8.kubernetes.client.Watcher; 053 054import io.fabric8.kubernetes.client.dsl.base.BaseOperation; 055import io.fabric8.kubernetes.client.dsl.base.OperationSupport; 056 057import io.fabric8.kubernetes.client.dsl.Listable; 058import io.fabric8.kubernetes.client.dsl.Versionable; 059import io.fabric8.kubernetes.client.dsl.VersionWatchable; 060import io.fabric8.kubernetes.client.dsl.Watchable; 061 062import io.fabric8.kubernetes.client.dsl.internal.CustomResourceOperationsImpl; 063 064import io.fabric8.kubernetes.api.model.HasMetadata; 065import io.fabric8.kubernetes.api.model.ObjectMeta; 066import io.fabric8.kubernetes.api.model.KubernetesResourceList; 067import io.fabric8.kubernetes.api.model.ListMeta; 068 069import net.jcip.annotations.GuardedBy; 070import net.jcip.annotations.ThreadSafe; 071 072import okhttp3.OkHttpClient; 073 074import org.microbean.development.annotation.Hack; 075import org.microbean.development.annotation.Issue; 076import org.microbean.development.annotation.NonBlocking; 077 078/** 079 * A pump of sorts that continuously "pulls" logical events out of 080 * Kubernetes and {@linkplain EventCache#add(Object, AbstractEvent.Type, 081 * HasMetadata) adds them} to an {@link EventCache} so as to logically 082 * "reflect" the contents of Kubernetes into the cache. 083 * 084 * <h2>Thread Safety</h2> 085 * 086 * <p>Instances of this class are safe for concurrent use by multiple 087 * {@link Thread}s.</p> 088 * 089 * <h2>Design Notes</h2> 090 * 091 * <p>This class loosely models the <a 092 * href="https://github.com/kubernetes/client-go/blob/9b03088ac34f23d8ac912f623f2ae73274c38ce8/tools/cache/reflector.go#L47">{@code 093 * Reflector} type in the {@code tools/cache} package of the {@code 094 * client-go} subproject of Kubernetes</a>.</p> 095 * 096 * @param <T> a type of Kubernetes resource 097 * 098 * @author <a href="https://about.me/lairdnelson" 099 * target="_parent">Laird Nelson</a> 100 * 101 * @see EventCache 102 */ 103@ThreadSafe 104public class Reflector<T extends HasMetadata> implements Closeable { 105 106 107 /* 108 * Instance fields. 109 */ 110 111 112 /** 113 * The operation that was supplied at construction time. 114 * 115 * <p>This field is never {@code null}.</p> 116 * 117 * <p>It is guaranteed that the value of this field may be 118 * assignable to a reference of type {@link Listable Listable<? 119 * extends KubernetesResourceList>} or to a reference of type 120 * {@link VersionWatchable VersionWatchable<? extends Closeable, 121 * Watcher<T>>}.</p> 122 * 123 * @see Listable 124 * 125 * @see VersionWatchable 126 */ 127 private final Object operation; 128 129 /** 130 * The resource version that a successful watch operation processed. 131 * 132 * @see #setLastSynchronizationResourceVersion(Object) 133 * 134 * @see WatchHandler#eventReceived(Watcher.Action, HasMetadata) 135 */ 136 private volatile Object lastSynchronizationResourceVersion; 137 138 /** 139 * The {@link ScheduledExecutorService} in charge of scheduling 140 * repeated invocations of the {@link #synchronize()} method. 141 * 142 * <p>This field may be {@code null}.</p> 143 * 144 * <h2>Thread Safety</h2> 145 * 146 * <p>This field is not safe for concurrent use by multiple threads 147 * without explicit synchronization on it.</p> 148 * 149 * @see #synchronize() 150 */ 151 @GuardedBy("this") 152 private ScheduledExecutorService synchronizationExecutorService; 153 154 /** 155 * A {@link Function} that consumes a {@link Throwable} and returns 156 * {@code true} if the error represented by that {@link Throwable} 157 * was handled in some way. 158 * 159 * <p>This field may be {@code null}.</p> 160 */ 161 private final Function<? super Throwable, Boolean> synchronizationErrorHandler; 162 163 /** 164 * A {@link ScheduledFuture} representing the task that is scheduled 165 * to repeatedly invoke the {@link #synchronize()} method. 166 * 167 * <p>This field may be {@code null}.</p> 168 * 169 * <h2>Thread Safety</h2> 170 * 171 * <p>This field is not safe for concurrent use by multiple threads 172 * without explicit synchronization on it.</p> 173 * 174 * @see #synchronize() 175 */ 176 @GuardedBy("this") 177 private ScheduledFuture<?> synchronizationTask; 178 179 /** 180 * A flag tracking whether the {@link 181 * #synchronizationExecutorService} should be shut down when this 182 * {@link Reflector} is {@linkplain #close() closed}. If the 183 * creator of this {@link Reflector} supplied an explicit {@link 184 * ScheduledExecutorService} at construction time, then it will not 185 * be shut down. 186 */ 187 private final boolean shutdownSynchronizationExecutorServiceOnClose; 188 189 /** 190 * How many seconds to wait in between scheduled invocations of the 191 * {@link #synchronize()} method. If the value of this field is 192 * less than or equal to zero then no synchronization will take 193 * place. 194 */ 195 private final long synchronizationIntervalInSeconds; 196 197 /** 198 * The watch operation currently in effect. 199 * 200 * <p>This field may be {@code null} at any point.</p> 201 * 202 * <h2>Thread Safety</h2> 203 * 204 * <p>This field is not safe for concurrent use by multiple threads 205 * without explicit synchronization on it.</p> 206 */ 207 @GuardedBy("this") 208 private Closeable watch; 209 210 /** 211 * An {@link EventCache} (often an {@link EventQueueCollection}) 212 * whose contents will be added to to reflect the current state of 213 * Kubernetes. 214 * 215 * <p>This field is never {@code null}.</p> 216 */ 217 @GuardedBy("itself") 218 private final EventCache<T> eventCache; 219 220 /** 221 * A {@link Logger} for use by this {@link Reflector}. 222 * 223 * <p>This field is never {@code null}.</p> 224 * 225 * @see #createLogger() 226 */ 227 protected final Logger logger; 228 229 230 /* 231 * Constructors. 232 */ 233 234 235 /** 236 * Creates a new {@link Reflector}. 237 * 238 * @param <X> a type that is both an appropriate kind of {@link 239 * Listable} and {@link VersionWatchable}, such as the kind of 240 * operation returned by {@link 241 * DefaultKubernetesClient#configMaps()} and the like 242 * 243 * @param operation a {@link Listable} and a {@link 244 * VersionWatchable} that can report information from a Kubernetes 245 * cluster; must not be {@code null} 246 * 247 * @param eventCache an {@link EventCache} <strong>that will be 248 * synchronized on</strong> and into which {@link Event}s will be 249 * logically "reflected"; must not be {@code null} 250 * 251 * @exception NullPointerException if {@code operation} or {@code 252 * eventCache} is {@code null} 253 * 254 * @exception IllegalStateException if the {@link #createLogger()} 255 * method returns {@code null} 256 * 257 * @see #Reflector(Listable, EventCache, ScheduledExecutorService, 258 * Duration, Function) 259 * 260 * @see #start() 261 */ 262 @SuppressWarnings("rawtypes") // kubernetes-client's implementations of KubernetesResourceList use raw types 263 public <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<T>>> Reflector(final X operation, 264 final EventCache<T> eventCache) { 265 this(operation, eventCache, null, null, null); 266 } 267 268 /** 269 * Creates a new {@link Reflector}. 270 * 271 * @param <X> a type that is both an appropriate kind of {@link 272 * Listable} and {@link VersionWatchable}, such as the kind of 273 * operation returned by {@link 274 * DefaultKubernetesClient#configMaps()} and the like 275 * 276 * @param operation a {@link Listable} and a {@link 277 * VersionWatchable} that can report information from a Kubernetes 278 * cluster; must not be {@code null} 279 * 280 * @param eventCache an {@link EventCache} <strong>that will be 281 * synchronized on</strong> and into which {@link Event}s will be 282 * logically "reflected"; must not be {@code null} 283 * 284 * @param synchronizationInterval a {@link Duration} representing 285 * the time in between one {@linkplain EventCache#synchronize() 286 * synchronization operation} and another; interpreted with a 287 * granularity of seconds; may be {@code null} or semantically equal 288 * to {@code 0} seconds in which case no synchronization will occur 289 * 290 * @exception NullPointerException if {@code operation} or {@code 291 * eventCache} is {@code null} 292 * 293 * @exception IllegalStateException if the {@link #createLogger()} 294 * method returns {@code null} 295 * 296 * @see #Reflector(Listable, EventCache, ScheduledExecutorService, 297 * Duration, Function) 298 * 299 * @see #start() 300 */ 301 @SuppressWarnings("rawtypes") // kubernetes-client's implementations of KubernetesResourceList use raw types 302 public <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<T>>> Reflector(final X operation, 303 final EventCache<T> eventCache, 304 final Duration synchronizationInterval) { 305 this(operation, eventCache, null, synchronizationInterval, null); 306 } 307 308 /** 309 * Creates a new {@link Reflector}. 310 * 311 * @param <X> a type that is both an appropriate kind of {@link 312 * Listable} and {@link VersionWatchable}, such as the kind of 313 * operation returned by {@link 314 * DefaultKubernetesClient#configMaps()} and the like 315 * 316 * @param operation a {@link Listable} and a {@link 317 * VersionWatchable} that can report information from a Kubernetes 318 * cluster; must not be {@code null} 319 * 320 * @param eventCache an {@link EventCache} <strong>that will be 321 * synchronized on</strong> and into which {@link Event}s will be 322 * logically "reflected"; must not be {@code null} 323 * 324 * @param synchronizationExecutorService a {@link 325 * ScheduledExecutorService} to be used to tell the supplied {@link 326 * EventCache} to {@linkplain EventCache#synchronize() synchronize} 327 * on a schedule; may be {@code null} in which case no 328 * synchronization will occur 329 * 330 * @param synchronizationInterval a {@link Duration} representing 331 * the time in between one {@linkplain EventCache#synchronize() 332 * synchronization operation} and another; may be {@code null} in 333 * which case no synchronization will occur 334 * 335 * @exception NullPointerException if {@code operation} or {@code 336 * eventCache} is {@code null} 337 * 338 * @exception IllegalStateException if the {@link #createLogger()} 339 * method returns {@code null} 340 * 341 * @see #Reflector(Listable, EventCache, ScheduledExecutorService, 342 * Duration, Function) 343 * 344 * @see #start() 345 */ 346 @SuppressWarnings("rawtypes") // kubernetes-client's implementations of KubernetesResourceList use raw types 347 public <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<T>>> Reflector(final X operation, 348 final EventCache<T> eventCache, 349 final ScheduledExecutorService synchronizationExecutorService, 350 final Duration synchronizationInterval) { 351 this(operation, eventCache, synchronizationExecutorService, synchronizationInterval, null); 352 } 353 354 /** 355 * Creates a new {@link Reflector}. 356 * 357 * @param <X> a type that is both an appropriate kind of {@link 358 * Listable} and {@link VersionWatchable}, such as the kind of 359 * operation returned by {@link 360 * DefaultKubernetesClient#configMaps()} and the like 361 * 362 * @param operation a {@link Listable} and a {@link 363 * VersionWatchable} that can report information from a Kubernetes 364 * cluster; must not be {@code null} 365 * 366 * @param eventCache an {@link EventCache} <strong>that will be 367 * synchronized on</strong> and into which {@link Event}s will be 368 * logically "reflected"; must not be {@code null} 369 * 370 * @param synchronizationExecutorService a {@link 371 * ScheduledExecutorService} to be used to tell the supplied {@link 372 * EventCache} to {@linkplain EventCache#synchronize() synchronize} 373 * on a schedule; may be {@code null} in which case no 374 * synchronization will occur 375 * 376 * @param synchronizationInterval a {@link Duration} representing 377 * the time in between one {@linkplain EventCache#synchronize() 378 * synchronization operation} and another; may be {@code null} in 379 * which case no synchronization will occur 380 * 381 * @param synchronizationErrorHandler a {@link Function} that 382 * consumes a {@link Throwable} and returns a {@link Boolean} 383 * indicating whether the error represented by the {@link Throwable} 384 * in question was handled or not; may be {@code null} 385 * 386 * @exception NullPointerException if {@code operation} or {@code 387 * eventCache} is {@code null} 388 * 389 * @exception IllegalStateException if the {@link #createLogger()} 390 * method returns {@code null} 391 * 392 * @see #start() 393 */ 394 @SuppressWarnings("rawtypes") // kubernetes-client's implementations of KubernetesResourceList use raw types 395 public <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<T>>> Reflector(final X operation, 396 final EventCache<T> eventCache, 397 final ScheduledExecutorService synchronizationExecutorService, 398 final Duration synchronizationInterval, 399 final Function<? super Throwable, Boolean> synchronizationErrorHandler) { 400 super(); 401 this.logger = this.createLogger(); 402 if (this.logger == null) { 403 throw new IllegalStateException("createLogger() == null"); 404 } 405 final String cn = this.getClass().getName(); 406 final String mn = "<init>"; 407 if (this.logger.isLoggable(Level.FINER)) { 408 this.logger.entering(cn, mn, new Object[] { operation, eventCache, synchronizationExecutorService, synchronizationInterval }); 409 } 410 Objects.requireNonNull(operation); 411 this.eventCache = Objects.requireNonNull(eventCache); 412 // TODO: research: maybe: operation.withField("metadata.resourceVersion", "0")? 413 this.operation = operation.withResourceVersion("0"); 414 415 if (synchronizationInterval == null) { 416 this.synchronizationIntervalInSeconds = 0L; 417 } else { 418 this.synchronizationIntervalInSeconds = synchronizationInterval.get(ChronoUnit.SECONDS); 419 } 420 if (this.synchronizationIntervalInSeconds <= 0L) { 421 this.synchronizationExecutorService = null; 422 this.shutdownSynchronizationExecutorServiceOnClose = false; 423 this.synchronizationErrorHandler = null; 424 } else { 425 this.synchronizationExecutorService = synchronizationExecutorService; 426 this.shutdownSynchronizationExecutorServiceOnClose = synchronizationExecutorService == null; 427 if (synchronizationErrorHandler == null) { 428 this.synchronizationErrorHandler = t -> { 429 if (this.logger.isLoggable(Level.SEVERE)) { 430 this.logger.logp(Level.SEVERE, 431 this.getClass().getName(), "<synchronizationTask>", 432 t.getMessage(), t); 433 } 434 return true; 435 }; 436 } else { 437 this.synchronizationErrorHandler = synchronizationErrorHandler; 438 } 439 } 440 441 if (this.logger.isLoggable(Level.FINER)) { 442 this.logger.exiting(cn, mn); 443 } 444 } 445 446 447 /* 448 * Instance methods. 449 */ 450 451 452 /** 453 * Returns a {@link Logger} that will be used for this {@link 454 * Reflector}. 455 * 456 * <p>This method never returns {@code null}.</p> 457 * 458 * <p>Overrides of this method must not return {@code null}.</p> 459 * 460 * @return a non-{@code null} {@link Logger} 461 */ 462 protected Logger createLogger() { 463 return Logger.getLogger(this.getClass().getName()); 464 } 465 466 /** 467 * Notionally closes this {@link Reflector} by terminating any 468 * {@link Thread}s that it has started and invoking the {@link 469 * #onClose()} method while holding this {@link Reflector}'s 470 * monitor. 471 * 472 * @exception IOException if an error occurs 473 * 474 * @see #onClose() 475 */ 476 @Override 477 public synchronized final void close() throws IOException { 478 final String cn = this.getClass().getName(); 479 final String mn = "close"; 480 if (this.logger.isLoggable(Level.FINER)) { 481 this.logger.entering(cn, mn); 482 } 483 484 try { 485 this.closeSynchronizationExecutorService(); 486 if (this.watch != null) { 487 this.watch.close(); 488 } 489 } finally { 490 this.onClose(); 491 } 492 493 if (this.logger.isLoggable(Level.FINER)) { 494 this.logger.exiting(cn, mn); 495 } 496 } 497 498 /** 499 * {@linkplain Future#cancel(boolean) Cancels} scheduled invocations 500 * of the {@link #synchronize()} method. 501 * 502 * <p>This method is invoked by the {@link 503 * #closeSynchronizationExecutorService()} method.</p> 504 * 505 * @see #setUpSynchronization() 506 * 507 * @see #closeSynchronizationExecutorService() 508 */ 509 private synchronized final void cancelSynchronization() { 510 final String cn = this.getClass().getName(); 511 final String mn = "cancelSynchronization"; 512 if (this.logger.isLoggable(Level.FINER)) { 513 this.logger.entering(cn, mn); 514 } 515 516 if (this.synchronizationTask != null) { 517 this.synchronizationTask.cancel(true /* interrupt the task */); 518 this.synchronizationTask = null; // very important; see setUpSynchronization() 519 } 520 521 if (this.logger.isLoggable(Level.FINER)) { 522 this.logger.exiting(cn, mn); 523 } 524 } 525 526 /** 527 * {@linkplain #cancelSynchronization Cancels scheduled invocations 528 * of the <code>synchronize()</code> method} and, when appropriate, 529 * shuts down the {@link ScheduledExecutorService} responsible for 530 * the scheduling. 531 * 532 * @see #cancelSynchronization() 533 * 534 * @see #setUpSynchronization() 535 */ 536 private synchronized final void closeSynchronizationExecutorService() { 537 final String cn = this.getClass().getName(); 538 final String mn = "closeSynchronizationExecutorService"; 539 if (this.logger.isLoggable(Level.FINER)) { 540 this.logger.entering(cn, mn); 541 } 542 543 this.cancelSynchronization(); 544 545 if (this.synchronizationExecutorService != null && this.shutdownSynchronizationExecutorServiceOnClose) { 546 547 // Stop accepting new tasks. Not that any will be showing up 548 // anyway, but it's the right thing to do. 549 this.synchronizationExecutorService.shutdown(); 550 551 try { 552 if (!this.synchronizationExecutorService.awaitTermination(60L, TimeUnit.SECONDS)) { 553 this.synchronizationExecutorService.shutdownNow(); 554 if (!this.synchronizationExecutorService.awaitTermination(60L, TimeUnit.SECONDS)) { 555 if (this.logger.isLoggable(Level.WARNING)) { 556 this.logger.logp(Level.WARNING, 557 cn, mn, 558 "synchronizationExecutorService did not terminate cleanly after 60 seconds"); 559 } 560 } 561 } 562 } catch (final InterruptedException interruptedException) { 563 this.synchronizationExecutorService.shutdownNow(); 564 Thread.currentThread().interrupt(); 565 } 566 567 } 568 569 if (this.logger.isLoggable(Level.FINER)) { 570 this.logger.exiting(cn, mn); 571 } 572 } 573 574 /** 575 * As the name implies, sets up <em>synchronization</em>, which is 576 * the act of the downstream event cache telling its associated 577 * event listeners that there are items remaining to be processed, 578 * and returns a {@link Future} reprsenting the scheduled, repeating 579 * task. 580 * 581 * <p>This method schedules repeated invocations of the {@link 582 * #synchronize()} method.</p> 583 * 584 * <p>This method may return {@code null}.</p> 585 * 586 * @return a {@link Future} representing the scheduled repeating 587 * synchronization task, or {@code null} if no such task was 588 * scheduled 589 * 590 * @see #synchronize() 591 * 592 * @see EventCache#synchronize() 593 */ 594 private synchronized final Future<?> setUpSynchronization() { 595 final String cn = this.getClass().getName(); 596 final String mn = "setUpSynchronization"; 597 if (this.logger.isLoggable(Level.FINER)) { 598 this.logger.entering(cn, mn); 599 } 600 601 if (this.synchronizationIntervalInSeconds > 0L) { 602 if (this.synchronizationExecutorService == null || this.synchronizationExecutorService.isTerminated()) { 603 this.synchronizationExecutorService = Executors.newScheduledThreadPool(1); 604 if (this.synchronizationExecutorService instanceof ScheduledThreadPoolExecutor) { 605 ((ScheduledThreadPoolExecutor)this.synchronizationExecutorService).setRemoveOnCancelPolicy(true); 606 } 607 } 608 if (this.synchronizationTask == null) { 609 if (this.logger.isLoggable(Level.INFO)) { 610 this.logger.logp(Level.INFO, 611 cn, mn, 612 "Scheduling downstream synchronization every {0} seconds", 613 Long.valueOf(this.synchronizationIntervalInSeconds)); 614 } 615 this.synchronizationTask = this.synchronizationExecutorService.scheduleWithFixedDelay(this::synchronize, 0L, this.synchronizationIntervalInSeconds, TimeUnit.SECONDS); 616 } 617 assert this.synchronizationExecutorService != null; 618 assert this.synchronizationTask != null; 619 } 620 621 if (this.logger.isLoggable(Level.FINER)) { 622 this.logger.exiting(cn, mn, this.synchronizationTask); 623 } 624 return this.synchronizationTask; 625 } 626 627 /** 628 * Calls {@link EventCache#synchronize()} on this {@link 629 * Reflector}'s {@linkplain #eventCache affiliated 630 * <code>EventCache</code>}. 631 * 632 * <p>This method is normally invoked on a schedule by this {@link 633 * Reflector}'s {@linkplain #synchronizationExecutorService 634 * affiliated <code>ScheduledExecutorService</code>}.</p> 635 * 636 * @see #setUpSynchronization() 637 * 638 * @see #shouldSynchronize() 639 */ 640 private final void synchronize() { 641 final String cn = this.getClass().getName(); 642 final String mn = "synchronize"; 643 if (this.logger.isLoggable(Level.FINER)) { 644 this.logger.entering(cn, mn); 645 } 646 647 if (this.shouldSynchronize()) { 648 if (this.logger.isLoggable(Level.FINE)) { 649 this.logger.logp(Level.FINE, 650 cn, mn, 651 "Synchronizing event cache with its downstream consumers"); 652 } 653 Throwable throwable = null; 654 synchronized (this.eventCache) { 655 try { 656 657 // Tell the EventCache to run a synchronization operation. 658 // This will have the effect of adding SynchronizationEvents 659 // of type MODIFICATION to the EventCache. 660 this.eventCache.synchronize(); 661 662 } catch (final Throwable e) { 663 assert e instanceof RuntimeException || e instanceof Error; 664 throwable = e; 665 } 666 } 667 if (throwable != null && !this.synchronizationErrorHandler.apply(throwable)) { 668 if (throwable instanceof RuntimeException) { 669 throw (RuntimeException)throwable; 670 } else if (throwable instanceof Error) { 671 throw (Error)throwable; 672 } else { 673 assert !(throwable instanceof Exception) : "Signature changed for EventCache#synchronize()"; 674 } 675 } 676 } 677 678 if (this.logger.isLoggable(Level.FINER)) { 679 this.logger.exiting(cn, mn); 680 } 681 } 682 683 /** 684 * Returns whether, at any given moment, this {@link Reflector} 685 * should cause its {@link EventCache} to {@linkplain 686 * EventCache#synchronize() synchronize}. 687 * 688 * <p>The default implementation of this method returns {@code true} 689 * if this {@link Reflector} was constructed with an explicit 690 * synchronization interval or {@link ScheduledExecutorService} or 691 * both.</p> 692 * 693 * <h2>Design Notes</h2> 694 * 695 * <p>This code follows the Go code in the Kubernetes {@code 696 * client-go/tools/cache} package. One thing that becomes clear 697 * when looking at all of this through an object-oriented lens is 698 * that it is the {@link EventCache} (the {@code delta_fifo}, in the 699 * Go code) that is ultimately in charge of synchronizing. It is 700 * not clear why in the Go code this is a function of a reflector. 701 * In an object-oriented world, perhaps the {@link EventCache} 702 * itself should be in charge of resynchronization schedules, but we 703 * choose to follow the Go code's division of responsibilities 704 * here.</p> 705 * 706 * @return {@code true} if this {@link Reflector} should cause its 707 * {@link EventCache} to {@linkplain EventCache#synchronize() 708 * synchronize}; {@code false} otherwise 709 */ 710 protected boolean shouldSynchronize() { 711 final String cn = this.getClass().getName(); 712 final String mn = "shouldSynchronize"; 713 if (this.logger.isLoggable(Level.FINER)) { 714 this.logger.entering(cn, mn); 715 } 716 final boolean returnValue; 717 synchronized (this) { 718 returnValue = this.synchronizationExecutorService != null; 719 } 720 if (this.logger.isLoggable(Level.FINER)) { 721 this.logger.exiting(cn, mn, Boolean.valueOf(returnValue)); 722 } 723 return returnValue; 724 } 725 726 // Not used; not used in the Go code either?! 727 private final Object getLastSynchronizationResourceVersion() { 728 return this.lastSynchronizationResourceVersion; 729 } 730 731 /** 732 * Records the last resource version processed by a successful watch 733 * operation. 734 * 735 * @param resourceVersion the resource version in question; may be 736 * {@code null} 737 * 738 * @see WatchHandler#eventReceived(Watcher.Action, HasMetadata) 739 */ 740 private final void setLastSynchronizationResourceVersion(final Object resourceVersion) { 741 // lastSynchronizationResourceVersion is volatile; this is an 742 // atomic assignment 743 this.lastSynchronizationResourceVersion = resourceVersion; 744 } 745 746 /** 747 * Using the {@code operation} supplied at construction time, 748 * {@linkplain Listable#list() lists} appropriate Kubernetes 749 * resources, and then, on a separate {@link Thread}, {@linkplain 750 * VersionWatchable sets up a watch} on them, calling {@link 751 * EventCache#replace(Collection, Object)} and {@link 752 * EventCache#add(Object, AbstractEvent.Type, HasMetadata)} methods 753 * as appropriate. 754 * 755 * <p><strong>For convenience only</strong>, this method returns a 756 * {@link Future} representing any scheduled synchronization task 757 * created as a result of the user's having supplied a {@link 758 * Duration} at construction time. The return value may be (and 759 * usually is) safely ignored. Invoking {@link 760 * Future#cancel(boolean)} on the returned {@link Future} will 761 * result in the scheduled synchronization task being cancelled 762 * irrevocably. <strong>Notably, invoking {@link 763 * Future#cancel(boolean)} on the returned {@link Future} will 764 * <em>not</em> {@linkplain #close() close} this {@link 765 * Reflector}.</strong> 766 * 767 * <p>This method never returns {@code null}.</p> 768 * 769 * <p>The calling {@link Thread} is not blocked by invocations of 770 * this method.</p> 771 * 772 * <h2>Implementation Notes</h2> 773 * 774 * <p>This method loosely models the <a 775 * href="https://github.com/kubernetes/client-go/blob/dcf16a0f3b52098c3d4c1467b6c80c3e88ff65fb/tools/cache/reflector.go#L128-L137">{@code 776 * Run} function in {@code reflector.go} together with the {@code 777 * ListAndWatch} function in the same file</a>.</p> 778 * 779 * @return a {@link Future} representing a scheduled synchronization 780 * operation; never {@code null} 781 * 782 * @exception IOException if a watch has previously been established 783 * and could not be {@linkplain Watch#close() closed} 784 * 785 * @exception KubernetesClientException if the initial attempt to 786 * {@linkplain Listable#list() list} Kubernetes resources fails 787 * 788 * @see #close() 789 */ 790 @NonBlocking 791 public final Future<?> start() throws IOException { 792 final String cn = this.getClass().getName(); 793 final String mn = "start"; 794 if (this.logger.isLoggable(Level.FINER)) { 795 this.logger.entering(cn, mn); 796 } 797 798 Future<?> returnValue = null; 799 synchronized (this) { 800 801 try { 802 803 // If somehow we got called while a watch already exists, then 804 // close the old watch (we'll replace it). Note that, 805 // critically, the onClose() method of our watch handler sets 806 // this reference to null, so if the watch is in the process 807 // of being closed, this little block won't be executed. 808 if (this.watch != null) { 809 final Closeable watch = this.watch; 810 this.watch = null; 811 if (logger.isLoggable(Level.FINE)) { 812 logger.logp(Level.FINE, 813 cn, mn, 814 "Closing pre-existing watch"); 815 } 816 watch.close(); 817 if (logger.isLoggable(Level.FINE)) { 818 logger.logp(Level.FINE, 819 cn, mn, 820 "Closed pre-existing watch"); 821 } 822 } 823 824 // Run a list operation, and get the resourceVersion of that list. 825 if (logger.isLoggable(Level.FINE)) { 826 logger.logp(Level.FINE, 827 cn, mn, 828 "Listing Kubernetes resources using {0}", this.operation); 829 } 830 @Issue(id = "13", uri = "https://github.com/microbean/microbean-kubernetes-controller/issues/13") 831 @SuppressWarnings("unchecked") 832 final KubernetesResourceList<? extends T> list = ((Listable<? extends KubernetesResourceList<? extends T>>)this.operation).list(); 833 assert list != null; 834 835 final ListMeta metadata = list.getMetadata(); 836 assert metadata != null; 837 838 final String resourceVersion = metadata.getResourceVersion(); 839 assert resourceVersion != null; 840 841 // Using the results of that list operation, do a full replace 842 // on the EventCache with them. 843 final Collection<? extends T> replacementItems; 844 final Collection<? extends T> items = list.getItems(); 845 if (items == null || items.isEmpty()) { 846 replacementItems = Collections.emptySet(); 847 } else { 848 replacementItems = Collections.unmodifiableCollection(new ArrayList<>(items)); 849 } 850 851 if (logger.isLoggable(Level.FINE)) { 852 logger.logp(Level.FINE, cn, mn, "Replacing resources in the event cache"); 853 } 854 synchronized (this.eventCache) { 855 this.eventCache.replace(replacementItems, resourceVersion); 856 } 857 if (logger.isLoggable(Level.FINE)) { 858 logger.logp(Level.FINE, cn, mn, "Done replacing resources in the event cache"); 859 } 860 861 // Record the resource version we captured during our list 862 // operation. 863 this.setLastSynchronizationResourceVersion(resourceVersion); 864 865 // Now that we've vetted that our list operation works (i.e. no 866 // syntax errors, no connectivity problems) we can schedule 867 // synchronizations if necessary. 868 // 869 // A synchronization is an operation where, if allowed, our 870 // eventCache goes through its set of known objects and--for 871 // any that are not enqueued for further processing 872 // already--fires a *synchronization* event of type 873 // MODIFICATION. This happens on a schedule, not in reaction 874 // to an event. This allows its downstream processors a 875 // chance to try to bring system state in line with desired 876 // state, even if no events have occurred (kind of like a 877 // heartbeat). See 878 // https://engineering.bitnami.com/articles/a-deep-dive-into-kubernetes-controllers.html#resyncperiod. 879 this.setUpSynchronization(); 880 returnValue = this.synchronizationTask; 881 882 // If there wasn't a synchronizationTask, then that means the 883 // user who created this Reflector didn't want any 884 // synchronization to happen. We return a "dummy" Future that 885 // is already "completed" (isDone() returns true) to avoid 886 // having to return null. The returned Future can be 887 // cancelled with no effect. 888 if (returnValue == null) { 889 final FutureTask<?> futureTask = new FutureTask<Void>(() -> {}, null); 890 futureTask.run(); // just sets "doneness" 891 assert futureTask.isDone(); 892 assert !futureTask.isCancelled(); 893 returnValue = futureTask; 894 } 895 896 assert returnValue != null; 897 898 // Now that we've taken care of our list() operation, set up our 899 // watch() operation. 900 if (logger.isLoggable(Level.FINE)) { 901 logger.logp(Level.FINE, 902 cn, mn, 903 "Watching Kubernetes resources with resource version {0} using {1}", 904 new Object[] { resourceVersion, this.operation }); 905 } 906 @SuppressWarnings("unchecked") 907 final Versionable<? extends Watchable<? extends Closeable, Watcher<T>>> versionableOperation = 908 (Versionable<? extends Watchable<? extends Closeable, Watcher<T>>>)this.operation; 909 this.watch = versionableOperation.withResourceVersion(resourceVersion).watch(new WatchHandler()); 910 if (logger.isLoggable(Level.FINE)) { 911 logger.logp(Level.FINE, 912 cn, mn, 913 "Established watch: {0}", this.watch); 914 } 915 916 } catch (final IOException | RuntimeException | Error exception) { 917 this.cancelSynchronization(); 918 if (this.watch != null) { 919 try { 920 // TODO: haven't seen it, but reason hard about deadlock 921 // here; see 922 // WatchHandler#onClose(KubernetesClientException) which 923 // *can* call start() (this method) with the monitor. I 924 // *think* we're in the clear here: 925 // onClose(KubernetesClientException) will only (re-)call 926 // start() if the supplied KubernetesClientException is 927 // non-null. In this case, it should be, because this is 928 // an ordinary close() call. 929 this.watch.close(); 930 } catch (final Throwable suppressMe) { 931 exception.addSuppressed(suppressMe); 932 } 933 this.watch = null; 934 } 935 throw exception; 936 } 937 } 938 939 if (this.logger.isLoggable(Level.FINER)) { 940 this.logger.exiting(cn, mn, returnValue); 941 } 942 return returnValue; 943 } 944 945 /** 946 * Invoked when {@link #close()} is invoked. 947 * 948 * <p>The default implementation of this method does nothing.</p> 949 * 950 * <p>Overrides of this method must consider that they will be 951 * invoked with this {@link Reflector}'s monitor held.</p> 952 * 953 * <p>Overrides of this method must not call the {@link #close()} 954 * method.</p> 955 * 956 * @see #close() 957 */ 958 protected synchronized void onClose() { 959 960 } 961 962 963 /* 964 * Inner and nested classes. 965 */ 966 967 968 /** 969 * A {@link Watcher} of Kubernetes resources. 970 * 971 * @author <a href="https://about.me/lairdnelson" 972 * target="_parent">Laird Nelson</a> 973 * 974 * @see Watcher 975 */ 976 private final class WatchHandler implements Watcher<T> { 977 978 979 /* 980 * Constructors. 981 */ 982 983 984 /** 985 * Creates a new {@link WatchHandler}. 986 */ 987 private WatchHandler() { 988 super(); 989 final String cn = this.getClass().getName(); 990 final String mn = "<init>"; 991 if (logger.isLoggable(Level.FINER)) { 992 logger.entering(cn, mn); 993 logger.exiting(cn, mn); 994 } 995 } 996 997 998 /* 999 * Instance methods. 1000 */ 1001 1002 1003 /** 1004 * Calls the {@link EventCache#add(Object, AbstractEvent.Type, 1005 * HasMetadata)} method on the enclosing {@link Reflector}'s 1006 * associated {@link EventCache} with information harvested from 1007 * the supplied {@code resource}, and using an {@link 1008 * AbstractEvent.Type} selected appropriately given the supplied 1009 * {@link Watcher.Action}. 1010 * 1011 * @param action the kind of Kubernetes event that happened; must 1012 * not be {@code null} 1013 * 1014 * @param resource the {@link HasMetadata} object that was 1015 * affected; must not be {@code null} 1016 * 1017 * @exception NullPointerException if {@code action} or {@code 1018 * resource} was {@code null} 1019 * 1020 * @exception IllegalStateException if another error occurred 1021 */ 1022 @Override 1023 public final void eventReceived(final Watcher.Action action, final T resource) { 1024 final String cn = this.getClass().getName(); 1025 final String mn = "eventReceived"; 1026 if (logger.isLoggable(Level.FINER)) { 1027 logger.entering(cn, mn, new Object[] { action, resource }); 1028 } 1029 Objects.requireNonNull(action); 1030 Objects.requireNonNull(resource); 1031 1032 final ObjectMeta metadata = resource.getMetadata(); 1033 assert metadata != null; 1034 1035 final Event.Type eventType; 1036 switch (action) { 1037 case ADDED: 1038 eventType = Event.Type.ADDITION; 1039 break; 1040 case MODIFIED: 1041 eventType = Event.Type.MODIFICATION; 1042 break; 1043 case DELETED: 1044 eventType = Event.Type.DELETION; 1045 break; 1046 case ERROR: 1047 // Uh...the Go code has: 1048 // 1049 // if event.Type == watch.Error { 1050 // return apierrs.FromObject(event.Object) 1051 // } 1052 // 1053 // Now, apierrs.FromObject is here: 1054 // https://github.com/kubernetes/apimachinery/blob/kubernetes-1.9.2/pkg/api/errors/errors.go#L80-L88 1055 // This is looking for a Status object. But 1056 // WatchConnectionHandler will never forward on such a thing: 1057 // https://github.com/fabric8io/kubernetes-client/blob/v3.1.8/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java#L246-L258 1058 // 1059 // So it follows that if by some chance we get here, resource 1060 // will definitely be a HasMetadata. We go back to the Go 1061 // code again, and remember that if the type is Error, the 1062 // equivalent of this watch handler simply returns and goes home. 1063 // 1064 // Now, if we were to throw a RuntimeException here, which is 1065 // the idiomatic equivalent of returning and going home, this 1066 // would cause a watch reconnect: 1067 // https://github.com/fabric8io/kubernetes-client/blob/v3.1.8/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java#L159-L205 1068 // ...up to the reconnect limit. 1069 // 1070 // ...which is fine, but I'm not sure that in an error case a 1071 // WatchEvent will ever HAVE a HasMetadata as its payload. 1072 // Which means MAYBE we'll never get here. But if we do, all 1073 // we can do is throw a RuntimeException...which ends up 1074 // reducing to the same case as the default case below, so we 1075 // fall through. 1076 default: 1077 eventType = null; 1078 throw new IllegalStateException(); 1079 } 1080 assert eventType != null; 1081 1082 // Add an Event of the proper kind to our EventCache. This is 1083 // the heart of this method. 1084 if (logger.isLoggable(Level.FINE)) { 1085 logger.logp(Level.FINE, 1086 cn, mn, 1087 "Adding event to cache: {0} {1}", new Object[] { eventType, resource }); 1088 } 1089 synchronized (eventCache) { 1090 eventCache.add(Reflector.this, eventType, resource); 1091 } 1092 1093 // Record the most recent resource version we're tracking to be 1094 // that of this last successful watch() operation. We set it 1095 // earlier during a list() operation. 1096 setLastSynchronizationResourceVersion(metadata.getResourceVersion()); 1097 1098 if (logger.isLoggable(Level.FINER)) { 1099 logger.exiting(cn, mn); 1100 } 1101 } 1102 1103 /** 1104 * Invoked when the Kubernetes client connection closes. 1105 * 1106 * @param exception any {@link KubernetesClientException} that 1107 * caused this closing to happen; may be {@code null} 1108 */ 1109 @Override 1110 public final void onClose(final KubernetesClientException exception) { 1111 final String cn = this.getClass().getName(); 1112 final String mn = "onClose"; 1113 if (logger.isLoggable(Level.FINER)) { 1114 logger.entering(cn, mn, exception); 1115 } 1116 1117 synchronized (Reflector.this) { 1118 // Don't close Reflector.this.watch before setting it to null 1119 // here; after all we're being called because it's in the 1120 // process of closing already! 1121 Reflector.this.watch = null; 1122 } 1123 1124 if (exception != null) { 1125 if (logger.isLoggable(Level.WARNING)) { 1126 logger.logp(Level.WARNING, 1127 cn, mn, 1128 exception.getMessage(), exception); 1129 } 1130 // See 1131 // https://github.com/kubernetes/client-go/blob/5f85fe426e7aa3c1df401a7ae6c1ba837bd76be9/tools/cache/reflector.go#L204. 1132 if (logger.isLoggable(Level.INFO)) { 1133 logger.logp(Level.INFO, cn, mn, "Restarting Reflector"); 1134 } 1135 try { 1136 Reflector.this.start(); 1137 } catch (final Throwable suppressMe) { 1138 if (logger.isLoggable(Level.SEVERE)) { 1139 logger.logp(Level.SEVERE, 1140 cn, mn, 1141 "Failed to restart Reflector", suppressMe); 1142 } 1143 exception.addSuppressed(suppressMe); 1144 } 1145 } 1146 1147 if (logger.isLoggable(Level.FINER)) { 1148 logger.exiting(cn, mn, exception); 1149 } 1150 } 1151 1152 } 1153 1154}