001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*- 002 * 003 * Copyright © 2018 microBean. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 014 * implied. See the License for the specific language governing 015 * permissions and limitations under the License. 016 */ 017package org.microbean.kubernetes.controller.cdi; 018 019import java.io.Closeable; 020import java.io.IOException; 021 022import java.lang.annotation.Annotation; 023import java.lang.annotation.ElementType; 024import java.lang.annotation.Retention; 025import java.lang.annotation.RetentionPolicy; 026import java.lang.annotation.Target; 027 028import java.lang.reflect.ParameterizedType; 029import java.lang.reflect.Type; 030 031import java.time.Duration; 032 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Collection; 036import java.util.Collections; 037import java.util.HashMap; 038import java.util.HashSet; 039import java.util.IdentityHashMap; 040import java.util.Iterator; 041import java.util.LinkedHashSet; 042import java.util.Map; 043import java.util.Objects; 044import java.util.Optional; 045import java.util.Set; 046 047import java.util.concurrent.CompletionStage; 048import java.util.concurrent.CountDownLatch; 049 050import java.util.function.Consumer; 051import java.util.function.Function; 052 053import java.util.logging.Level; 054import java.util.logging.Logger; 055 056import javax.annotation.Priority; 057 058import javax.enterprise.context.ApplicationScoped; 059import javax.enterprise.context.BeforeDestroyed; 060import javax.enterprise.context.ContextNotActiveException; 061import javax.enterprise.context.Initialized; 062 063import javax.enterprise.context.spi.AlterableContext; 064import javax.enterprise.context.spi.Contextual; 065import javax.enterprise.context.spi.CreationalContext; 066 067import javax.enterprise.event.NotificationOptions; 068import javax.enterprise.event.Observes; 069 070import javax.enterprise.inject.Default; // for javadoc only 071 072import javax.enterprise.inject.spi.AfterBeanDiscovery; 073import javax.enterprise.inject.spi.BeanAttributes; 074import javax.enterprise.inject.spi.Bean; 075import javax.enterprise.inject.spi.BeanManager; 076import javax.enterprise.inject.spi.CDI; 077import javax.enterprise.inject.spi.DeploymentException; 078import javax.enterprise.inject.spi.EventContext; 079import javax.enterprise.inject.spi.ObserverMethod; 080import javax.enterprise.inject.spi.ProcessBean; 081import javax.enterprise.inject.spi.ProcessManagedBean; 082import javax.enterprise.inject.spi.ProcessObserverMethod; 083import javax.enterprise.inject.spi.ProcessProducerField; 084import javax.enterprise.inject.spi.ProcessProducerMethod; 085import javax.enterprise.inject.spi.ProcessSyntheticBean; 086import javax.enterprise.inject.spi.ProcessSyntheticObserverMethod; 087 088import javax.enterprise.inject.spi.configurator.ObserverMethodConfigurator.EventConsumer; 089 090import javax.inject.Qualifier; // for javadoc only 091import javax.inject.Scope; 092 093import io.fabric8.kubernetes.api.model.ConfigMap; // for javadoc only 094import io.fabric8.kubernetes.api.model.HasMetadata; 095import io.fabric8.kubernetes.api.model.KubernetesResourceList; 096 097import io.fabric8.kubernetes.client.KubernetesClient; // for javadoc only 098import io.fabric8.kubernetes.client.Watcher; 099 100import io.fabric8.kubernetes.client.dsl.Listable; 101import io.fabric8.kubernetes.client.dsl.VersionWatchable; 102import io.fabric8.kubernetes.client.dsl.Operation; // for javadoc only 103 104import org.microbean.cdi.AbstractBlockingExtension; 105import org.microbean.cdi.Annotations; 106 107import org.microbean.configuration.api.Configurations; 108 109import org.microbean.development.annotation.Issue; 110 111import org.microbean.kubernetes.controller.AbstractEvent; 112import org.microbean.kubernetes.controller.Controller; 113import org.microbean.kubernetes.controller.EventDistributor; 114import org.microbean.kubernetes.controller.SynchronizationEvent; 115 116import org.microbean.kubernetes.controller.cdi.annotation.Added; 117import org.microbean.kubernetes.controller.cdi.annotation.Deleted; 118import org.microbean.kubernetes.controller.cdi.annotation.Modified; 119import org.microbean.kubernetes.controller.cdi.annotation.KubernetesEventSelector; 120import org.microbean.kubernetes.controller.cdi.annotation.Prior; 121 122import static javax.interceptor.Interceptor.Priority.LIBRARY_AFTER; 123import static javax.interceptor.Interceptor.Priority.LIBRARY_BEFORE; 124 125/** 126 * An {@link AbstractBlockingExtension} that distributes Kubernetes 127 * events to interested listeners asynchronously. 128 * 129 * <h1>Usage</h1> 130 * 131 * <p>To use this extension, simply place it on your classpath (along 132 * with your selection from a menu of certain required runtime 133 * dependencies described below). If you have a mechanism for 134 * describing the kinds of Kubernetes resources you'd like to watch 135 * for events, and if you have observer methods created to do 136 * something with those events, both of which are described below, 137 * then this extension will take care of connecting to the Kubernetes 138 * API server and listing and watching for new events for you.</p> 139 * 140 * <h2>Dependency Choices</h2> 141 * 142 * <p>This extension relies on the presence of certain CDI beans. In 143 * some cases, those beans are not produced by this extension. For 144 * maximum flexibility, this project does not mandate how certain 145 * beans are produced. Below is a list of the beans that are 146 * required, and suggested—but not required—ways of 147 * producing them.</p> 148 * 149 * <h2>Maven</h2> 150 * 151 * <p>If you are using Maven, you may indicate that you want this 152 * extension to be included on your project's runtime classpath with 153 * the following dependency stanza:</p> 154 * 155 * <blockquote><pre><dependency> 156 * <groupId>org.microbean</groupId> 157 * <artifactId>microbean-kubernetes-controller-cdi</artifactId> 158 * <version>0.2.1</version> 159 * <scope>runtime</scope> 160 *</dependency></pre></blockquote> 161 * 162 * <h3>{@link KubernetesClient} Bean</h3> 163 * 164 * <p>This extension indirectly requires that a {@link 165 * KubernetesClient} be available in the CDI container (qualified with 166 * {@link Default @Default}). You can use the <a 167 * href="https://microbean.github.io/microbean-kubernetes-client-cdi/">microBean 168 * Kubernetes Client CDI</a> project for this, or you can arrange to 169 * fulfil this requirement yourself.</p> 170 * 171 * <p>If you are going to use the <a 172 * href="https://microbean.github.io/microbean-kubernetes-client-cdi/">microBean 173 * Kubernetes Client CDI</a> project to provide a {@link 174 * Default}-qualified {@link KubernetesClient}, you can indicate that 175 * you want it to be included on your project's runtime classpath with 176 * the following dependency stanza:</p> 177 * 178 * <blockquote><pre><dependency> 179 * <groupId>org.microbean</groupId> 180 * <artifactId>microbean-kubernetes-client-cdi</artifactId> 181 * <version>0.3.1</version> 182 * <scope>runtime</scope> 183 *</dependency></pre></blockquote> 184 * 185 * <h3>Configuration Beans</h3> 186 * 187 * <p>You'll need an implementation of the <a 188 * href="https://microbean.github.io/microbean-configuration-api/">microBean 189 * Configuration API</a>. Usually, the <a 190 * href="https://microbean.github.io/microbean-configuration/">microBean 191 * Configuration</a> project is what you want. You can indicate that 192 * you want it to be included on your project's runtime classpath with 193 * the following dependency stanza:</p> 194 * 195 * <blockquote><pre><dependency> 196 * <groupId>org.microbean</groupId> 197 * <artifactId>microbean-configuration</artifactId> 198 * <version>0.4.2</version> 199 * <scope>runtime</scope> 200 *</dependency></pre></blockquote> 201 * 202 * <p>You'll need a means of getting that configuration implementation 203 * into CDI. Usually, you would use the <a 204 * href="https://microbean.github.io/microbean-configuration-cdi/">microBean 205 * Configuration CDI</a> project. You can indicate that you want it 206 * to be included on your project's runtime classpath with the 207 * following dependency stanza:</p> 208 * 209 * <blockquote><pre><dependency> 210 * <groupId>org.microbean</groupId> 211 * <artifactId>microbean-configuration-cdi</artifactId> 212 * <version>0.4.2</version> 213 * <scope>runtime</scope> 214 *</dependency></pre></blockquote> 215 * 216 * <h2>Event Selectors</h2> 217 * 218 * <p>To describe the kinds of Kubernetes resources you're interested 219 * in, you'll need one or more <em>event selectors</em> in your CDI 220 * application. An event selector, for the purposes of this class, is 221 * a CDI bean (either a managed bean or a producer method, most 222 * commonly) with certain types in its {@linkplain Bean#getTypes() set 223 * of bean types}. Specifically, the event selector type, {@code X}, 224 * must conform to this specification:</p> 225 * 226 * <blockquote>{@code <X extends Listable<? extends 227 * KubernetesResourceList> & VersionWatchable<? extends Closeable, 228 * Watcher<? extends HasMetadata>>>}</blockquote> 229 * 230 * <p>Many return types of methods belonging to {@link 231 * KubernetesClient} conveniently conform to this specification.</p> 232 * 233 * <p>The event selector will also need to be annotated with an 234 * annotation that you write describing the sort of event selection it 235 * is. This annotation does not need any elements, but must itself be 236 * annotated with the {@link 237 * KubernetesEventSelector @KubernetesEventSelector} annotation. It 238 * must be applicable to {@linkplain ElementType#PARAMETER parameters} 239 * and your event selector beans, so if as is most common you are 240 * writing a producer method it must be applicable to {@linkplain 241 * ElementType#METHOD methods} as well.</p> 242 * 243 * <p>Here is an example producer method that will cause this 244 * extension to look for all ConfigMap events:</p> 245 * 246 * <blockquote><pre>@Produces 247 *@{@link ApplicationScoped} 248 *@AllConfigMapEvents // see declaration below 249 *private static final {@link Operation}<{@link ConfigMap}, ConfigMapList, DoneableConfigMap, Resource<ConfigMap, DoneableConfigMap>> selectAllConfigMaps(final {@link KubernetesClient} client) { 250 * return {@link KubernetesClient#configMaps() client.configMaps()}; 251 *}}</pre></blockquote> 252 * 253 * <p>Note in particular that {@link Operation} implements both {@link 254 * Listable} and {@link VersionWatchable} with the proper type 255 * parameters.</p> 256 * 257 * <p>The {@code @AllConfigMapEvents} annotation is simply:</p> 258 * 259 * <blockquote><pre>@Documented 260 *@{@link KubernetesEventSelector} 261 *@{@link Qualifier} 262 *@Retention(value = RetentionPolicy.RUNTIME) 263 *@Target({ ElementType.METHOD, ElementType.PARAMETER }) 264 *public @interface AllConfigMapEvents { 265 * 266 *}</pre></blockquote> 267 * 268 * <h2>Observer Methods</h2> 269 * 270 * <p>Observer methods are where your CDI application actually takes 271 * delivery of a Kubernetes resource as a CDI event.</p> 272 * 273 * <p>You will need a notional pair consisting of an event selector 274 * and an observer method that conforms to certain requirements that 275 * help "link" it to its associated event selector. To realize this 276 * pair, you write a normal CDI observer method that adheres to the 277 * following additional requirements:</p> 278 * 279 * <ol> 280 * 281 * <li>Its <em>observed event type</em> is a concrete class that 282 * extends {@link HasMetadata} and is the same type with which the 283 * associated event selector is primarily concerned. For example, if 284 * your event selector is primarily concerned with {@link ConfigMap}s, 285 * then your observer method's observed event type should also be 286 * {@link ConfigMap}.</li> 287 * 288 * <li>Its observed event type is qualified with the same annotation 289 * that (a) qualifies the event selector and (b) is, in turn, 290 * qualified with {@link 291 * KubernetesEventSelector @KubernetesEventSelector}. For example, if 292 * {@code @AllConfigMapEvents} appears on your event selector producer 293 * method, then it should appear on your observer method's {@link 294 * ConfigMap} parameter that is annotated with {@link 295 * Observes @Observes}.</li> 296 * 297 * <li>Its observed event type is qualified with one of {@link 298 * Added @Added}, {@link Modified @Modified} or {@link 299 * Deleted @Deleted}.</li> 300 * 301 * <li>If you need access to the prior state of the Kubernetes 302 * resource your observer method is observing, you may add it as a 303 * standard (injected) parameter in the method's parameter list, but 304 * it must be (a) qualified with the {@link Prior @Prior} annotation 305 * and (b) of a type identical to that of the observed event type. 306 * For example, if your observed event type is {@link ConfigMap}, then 307 * your prior state parameter must also be of type {@link ConfigMap} 308 * and must be annotated with {@link Prior @Prior}.</li> 309 * 310 * </ol> 311 * 312 * <p>Building upon the prior example, here is an example of an 313 * observer method that is "paired" with the event selector above:</p> 314 * 315 * <blockquote><pre>private final void onConfigMapModification({@link Observes @Observes} @AllConfigMapEvents {@link Modified @Modified} final {@link ConfigMap} configMap, {@link Prior @Prior} final Optional<{@link ConfigMap}> prior) { 316 * assert configMap != null; 317 * // do something interesting with this modified {@link ConfigMap} 318 *}</pre></blockquote> 319 * 320 * @author <a href="https://about.me/lairdnelson" 321 * target="_parent">Laird Nelson</a> 322 * 323 * @see AbstractBlockingExtension 324 * 325 * @see Controller 326 * 327 * @see KubernetesEventSelector 328 * 329 * @see Added 330 * 331 * @see Modified 332 * 333 * @see Deleted 334 * 335 * @see Prior 336 */ 337public class KubernetesControllerExtension extends AbstractBlockingExtension { 338 339 340 /* 341 * Instance fields. 342 */ 343 344 345 private final Collection<Controller<?>> controllers; 346 347 private final Map<Set<Annotation>, Bean<?>> eventSelectorBeans; 348 349 private final Set<Bean<?>> beans; 350 351 private final Set<Class<? extends HasMetadata>> priorTypes; 352 353 private boolean asyncNeeded; 354 355 private boolean syncNeeded; 356 357 private final PriorContext priorContext; 358 359 private final KubernetesEventContext kubernetesEventContext; 360 361 362 /* 363 * Constructors. 364 */ 365 366 367 /** 368 * Creates a new {@link KubernetesControllerExtension}. 369 * 370 * @see #KubernetesControllerExtension(CountDownLatch) 371 */ 372 public KubernetesControllerExtension() { 373 this(new CountDownLatch(1)); 374 } 375 376 /** 377 * Creates a new {@link KubernetesControllerExtension}. 378 * 379 * <p>Most users should prefer the {@linkplain 380 * #KubernetesControllerExtension() zero-argument constructor} 381 * instead.</p> 382 * 383 * @param latch a {@link CountDownLatch} passed to the {@link 384 * AbstractBlockingExtension#AbstractBlockingExtension(CountDownLatch)} 385 * constructor; must not be {@code null} 386 * 387 * @see #KubernetesControllerExtension() 388 * 389 * @see 390 * AbstractBlockingExtension#AbstractBlockingExtension(CountDownLatch) 391 */ 392 protected KubernetesControllerExtension(final CountDownLatch latch) { 393 super(latch); 394 if (this.logger == null) { 395 throw new IllegalStateException("createLogger() == null"); 396 } 397 final String cn = this.getClass().getName(); 398 final String mn = "<init>"; 399 if (this.logger.isLoggable(Level.FINER)) { 400 this.logger.entering(cn, mn, latch); 401 } 402 403 this.eventSelectorBeans = new HashMap<>(); 404 this.beans = new HashSet<>(); 405 this.priorTypes = new HashSet<>(); 406 this.controllers = new ArrayList<>(); 407 this.priorContext = new PriorContext(); 408 this.kubernetesEventContext = new KubernetesEventContext(); 409 410 if (this.logger.isLoggable(Level.FINER)) { 411 this.logger.exiting(cn, mn); 412 } 413 } 414 415 416 /* 417 * Instance methods. 418 */ 419 420 421 /** 422 * {@linkplain Observes Observes} the supplied {@link 423 * ProcessProducerMethod} event and calls the {@link 424 * #processPotentialEventSelectorBean(Bean, BeanManager)} method 425 * with the return value of the event's {@link 426 * ProcessProducerMethod#getBean()} method and the supplied {@link 427 * BeanManager}. 428 * 429 * @param <X> a type that is both {@link Listable} and {@link 430 * VersionWatchable} 431 * 432 * @param event the container lifecycle event being observed; may be 433 * {@code null} in which case no action will be performed 434 * 435 * @param beanManager the {@link BeanManager} for the current CDI 436 * container; may be {@code null} 437 * 438 * @see #processPotentialEventSelectorBean(Bean, BeanManager) 439 */ 440 // Ideally, we could do this all in a ProcessBean observer method. 441 // See https://issues.jboss.org/browse/WELD-2461. 442 @SuppressWarnings("rawtypes") 443 private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processProducerMethod(@Observes final ProcessProducerMethod<X, ?> event, 444 final BeanManager beanManager) { 445 final String cn = this.getClass().getName(); 446 final String mn = "processProducerMethod"; 447 if (this.logger.isLoggable(Level.FINER)) { 448 this.logger.entering(cn, mn, new Object[] { event, beanManager }); 449 } 450 451 if (event != null) { 452 this.processPotentialEventSelectorBean(event.getBean(), beanManager); 453 } 454 455 if (this.logger.isLoggable(Level.FINER)) { 456 this.logger.exiting(cn, mn); 457 } 458 } 459 460 /** 461 * {@linkplain Observes Observes} the supplied {@link 462 * ProcessProducerField} event and calls the {@link 463 * #processPotentialEventSelectorBean(Bean, BeanManager)} method 464 * with the return value of the event's {@link 465 * ProcessProducerField#getBean()} method and the supplied {@link 466 * BeanManager}. 467 * 468 * @param <X> a type that is both {@link Listable} and {@link 469 * VersionWatchable} 470 * 471 * @param event the container lifecycle event being observed; may be 472 * {@code null} in which case no action will be performed 473 * 474 * @param beanManager the {@link BeanManager} for the current CDI 475 * container; may be {@code null} 476 * 477 * @see #processPotentialEventSelectorBean(Bean, BeanManager) 478 */ 479 // Ideally, we could do this all in a ProcessBean observer method. 480 // See https://issues.jboss.org/browse/WELD-2461. 481 @SuppressWarnings("rawtypes") 482 private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processProducerField(@Observes final ProcessProducerField<X, ?> event, 483 final BeanManager beanManager) { 484 final String cn = this.getClass().getName(); 485 final String mn = "processProducerField"; 486 if (this.logger.isLoggable(Level.FINER)) { 487 this.logger.entering(cn, mn, new Object[] { event, beanManager }); 488 } 489 490 if (event != null) { 491 this.processPotentialEventSelectorBean(event.getBean(), beanManager); 492 } 493 494 if (this.logger.isLoggable(Level.FINER)) { 495 this.logger.exiting(cn, mn); 496 } 497 } 498 499 /** 500 * {@linkplain Observes Observes} the supplied {@link 501 * ProcessManagedBean} event and calls the {@link 502 * #processPotentialEventSelectorBean(Bean, BeanManager)} method 503 * with the return value of the event's {@link 504 * ProcessManagedBean#getBean()} method and the supplied {@link 505 * BeanManager}. 506 * 507 * @param <X> a type that is both {@link Listable} and {@link 508 * VersionWatchable} 509 * 510 * @param event the container lifecycle event being observed; may be 511 * {@code null} in which case no action will be performed 512 * 513 * @param beanManager the {@link BeanManager} for the current CDI 514 * container; may be {@code null} 515 * 516 * @see #processPotentialEventSelectorBean(Bean, BeanManager) 517 */ 518 // Ideally, we could do this all in a ProcessBean observer method. 519 // See https://issues.jboss.org/browse/WELD-2461. 520 @SuppressWarnings("rawtypes") 521 private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processManagedBean(@Observes final ProcessManagedBean<X> event, 522 final BeanManager beanManager) { 523 final String cn = this.getClass().getName(); 524 final String mn = "processManagedBean"; 525 if (this.logger.isLoggable(Level.FINER)) { 526 this.logger.entering(cn, mn, new Object[] { event, beanManager }); 527 } 528 529 if (event != null) { 530 this.processPotentialEventSelectorBean(event.getBean(), beanManager); 531 } 532 533 if (this.logger.isLoggable(Level.FINER)) { 534 this.logger.exiting(cn, mn); 535 } 536 } 537 538 /** 539 * {@linkplain Observes Observes} the supplied {@link 540 * ProcessSyntheticBean} event and calls the {@link 541 * #processPotentialEventSelectorBean(Bean, BeanManager)} method 542 * with the return value of the event's {@link 543 * ProcessSyntheticBean#getBean()} method and the supplied {@link 544 * BeanManager}. 545 * 546 * @param <X> a type that is both {@link Listable} and {@link 547 * VersionWatchable} 548 * 549 * @param event the container lifecycle event being observed; may be 550 * {@code null} in which case no action will be performed 551 * 552 * @param beanManager the {@link BeanManager} for the current CDI 553 * container; may be {@code null} 554 * 555 * @see #processPotentialEventSelectorBean(Bean, BeanManager) 556 */ 557 @SuppressWarnings("rawtypes") 558 private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processSyntheticBean(@Observes final ProcessSyntheticBean<X> event, 559 final BeanManager beanManager) { 560 final String cn = this.getClass().getName(); 561 final String mn = "processSyntheticBean"; 562 if (this.logger.isLoggable(Level.FINER)) { 563 this.logger.entering(cn, mn, new Object[] { event, beanManager }); 564 } 565 566 if (event != null) { 567 this.processPotentialEventSelectorBean(event.getBean(), beanManager); 568 } 569 570 if (this.logger.isLoggable(Level.FINER)) { 571 this.logger.exiting(cn, mn); 572 } 573 } 574 575 private final <T extends HasMetadata> void validateScopeOfCacheBean(@Observes final ProcessBean<Map<Object, T>> event) { 576 final String cn = this.getClass().getName(); 577 final String mn = "validateScopeOfCacheBean"; 578 if (this.logger.isLoggable(Level.FINER)) { 579 this.logger.entering(cn, mn, new Object[] { event }); 580 } 581 if (event != null) { 582 final Bean<?> bean = event.getBean(); 583 if (bean != null && !ApplicationScoped.class.equals(bean.getScope()) && this.logger.isLoggable(Level.WARNING)) { 584 this.logger.logp(Level.WARNING, cn, mn, "{0} is not in application scope.", bean); 585 } 586 } 587 if (this.logger.isLoggable(Level.FINER)) { 588 this.logger.exiting(cn, mn); 589 } 590 } 591 592 private final <T extends HasMetadata> void validateScopeOfCacheBean(@Observes final ProcessProducerField<Map<Object, T>, ?> event) { 593 final String cn = this.getClass().getName(); 594 final String mn = "validateScopeOfCacheBean"; 595 if (this.logger.isLoggable(Level.FINER)) { 596 this.logger.entering(cn, mn, new Object[] { event }); 597 } 598 if (event != null) { 599 final Bean<?> bean = event.getBean(); 600 if (bean != null && !ApplicationScoped.class.equals(bean.getScope()) && this.logger.isLoggable(Level.WARNING)) { 601 this.logger.logp(Level.WARNING, cn, mn, "{0} is not in application scope.", bean); 602 } 603 } 604 if (this.logger.isLoggable(Level.FINER)) { 605 this.logger.exiting(cn, mn); 606 } 607 } 608 609 private final <T extends HasMetadata> void validateScopeOfCacheBean(@Observes final ProcessProducerMethod<Map<Object, T>, ?> event) { 610 final String cn = this.getClass().getName(); 611 final String mn = "validateScopeOfCacheBean"; 612 if (this.logger.isLoggable(Level.FINER)) { 613 this.logger.entering(cn, mn, new Object[] { event }); 614 } 615 if (event != null) { 616 final Bean<?> bean = event.getBean(); 617 if (bean != null && !ApplicationScoped.class.equals(bean.getScope()) && this.logger.isLoggable(Level.WARNING)) { 618 this.logger.logp(Level.WARNING, cn, mn, "{0} is not in application scope.", bean); 619 } 620 } 621 if (this.logger.isLoggable(Level.FINER)) { 622 this.logger.exiting(cn, mn); 623 } 624 } 625 626 /** 627 * {@linkplain Observes Observes} the supplied {@link 628 * ProcessObserverMethod} event and calls the {@link 629 * #processPotentialEventSelectorObserverMethod(ProcessObserverMethod, 630 * BeanManager)} method with the return value of the event's {@link 631 * ProcessObserverMethod#getObserverMethod()} method and the 632 * supplied {@link BeanManager}. 633 * 634 * @param <X> a type that extends {@link HasMetadata} and therefore 635 * represents a persistent Kubernetes resource 636 * 637 * @param event the container lifecycle event being observed; may be 638 * {@code null} in which case no action will be performed 639 * 640 * @param beanManager the {@link BeanManager} for the current CDI 641 * container; may be {@code null} 642 * 643 * @see 644 * #processPotentialEventSelectorObserverMethod(ProcessObserverMethod, 645 * BeanManager) 646 */ 647 // Observer method processors are guaranteed by the specification to 648 // be invoked after ProcessBean events. 649 private final <X extends HasMetadata> void processObserverMethod(@Observes final ProcessObserverMethod<X, ?> event, 650 final BeanManager beanManager) { 651 final String cn = this.getClass().getName(); 652 final String mn = "processObserverMethod"; 653 if (this.logger.isLoggable(Level.FINER)) { 654 this.logger.entering(cn, mn, new Object[] { event, beanManager }); 655 } 656 657 if (event != null) { 658 this.processPotentialEventSelectorObserverMethod(event, beanManager); 659 } 660 661 if (this.logger.isLoggable(Level.FINER)) { 662 this.logger.exiting(cn, mn); 663 } 664 } 665 666 /** 667 * {@linkplain Observes Observes} the supplied {@link 668 * ProcessSyntheticObserverMethod} event and calls the {@link 669 * #processPotentialEventSelectorObserverMethod(ProcessObserverMethod, 670 * BeanManager)} method with the return value of the event's {@link 671 * ProcessSyntheticObserverMethod#getObserverMethod()} method and 672 * the supplied {@link BeanManager}. 673 * 674 * @param <X> a type that extends {@link HasMetadata} and therefore 675 * represents a persistent Kubernetes resource 676 * 677 * @param event the container lifecycle event being observed; may be 678 * {@code null} in which case no action will be performed 679 * 680 * @param beanManager the {@link BeanManager} for the current CDI 681 * container; may be {@code null} 682 * 683 * @see 684 * #processPotentialEventSelectorObserverMethod(ProcessObserverMethod, 685 * BeanManager) 686 */ 687 // Observer method processors are guaranteed by the specification to 688 // be invoked after ProcessBean events. 689 private final <X extends HasMetadata> void processSyntheticObserverMethod(@Observes final ProcessSyntheticObserverMethod<X, ?> event, 690 final BeanManager beanManager) { 691 final String cn = this.getClass().getName(); 692 final String mn = "processSyntheticObserverMethod"; 693 if (this.logger.isLoggable(Level.FINER)) { 694 this.logger.entering(cn, mn, new Object[] { event, beanManager }); 695 } 696 697 if (event != null) { 698 this.processPotentialEventSelectorObserverMethod(event, beanManager); 699 } 700 701 if (this.logger.isLoggable(Level.FINER)) { 702 this.logger.exiting(cn, mn); 703 } 704 } 705 706 /** 707 * {@linkplain Observes Observes} the supplied {@link 708 * AfterBeanDiscovery} event and, since all bean discovery is done, 709 * clears out the contents of the {@link #eventSelectorBeans} field. 710 * 711 * @param event the container lifecycle event being observed; may be 712 * {@code null} in which case no action will be performed 713 * 714 * @see #eventSelectorBeans 715 */ 716 private final void processAfterBeanDiscovery(@Observes final AfterBeanDiscovery event) { 717 final String cn = this.getClass().getName(); 718 final String mn = "processAfterBeanDiscovery"; 719 if (this.logger.isLoggable(Level.FINER)) { 720 this.logger.entering(cn, mn, event); 721 } 722 723 if (event != null) { 724 event.addContext(this.priorContext); 725 event.addContext(this.kubernetesEventContext); 726 727 this.eventSelectorBeans.clear(); 728 // TODO: consider: we have the ability to create Controller 729 // beans here out of other bean raw materials 730 // (e.g. appropriately-qualified knownObjects etc.). 731 732 synchronized (this.priorTypes) { 733 if (!this.priorTypes.isEmpty()) { 734 for (final Type priorType : this.priorTypes) { 735 assert priorType != null; 736 737 event.addBean() 738 // This Bean is never created via this (required by CDI) 739 // callback; it is always supplied by 740 // PriorContext#get(Bean), so the container will think 741 // that it is eternal. 742 .createWith(cc -> { throw new UnsupportedOperationException(); }) 743 .qualifiers(Prior.Literal.INSTANCE) 744 .scope(PriorScoped.class) 745 .types(new ParameterizedTypeImpl(null, Optional.class, new Type[] { priorType })); 746 747 } 748 this.priorTypes.clear(); 749 } 750 } 751 752 } 753 754 if (this.logger.isLoggable(Level.FINER)) { 755 this.logger.exiting(cn, mn); 756 } 757 } 758 759 /** 760 * {@linkplain Observes Observes} the {@linkplain Initialized 761 * initialization} of the {@linkplain ApplicationScoped application 762 * scope} at {@link 763 * javax.interceptor.Interceptor.Priority#LIBRARY_AFTER 764 * LIBRARY_AFTER} {@linkplain Priority priority} and, now that this 765 * extension is in a position to know all event observers that might 766 * be interested in Kubernetes events, arranges to funnel such 767 * events through a resilient pipeline into their hands 768 * asynchronously. 769 * 770 * @param <T> a type that extends {@link HasMetadata}; e.g. a 771 * Kubernetes resource type 772 * 773 * @param <X> a type that is both a {@link Listable} and a {@link 774 * VersionWatchable} 775 * 776 * @param ignored the event itself; ignored by this implementation; 777 * may be {@code null} 778 * 779 * @param beanManager the {@link BeanManager} in effect for this CDI 780 * container; may be {@code null} in which case no action will be 781 * taken 782 * 783 * @see #stopControllers(Object) 784 */ 785 @SuppressWarnings("rawtypes") 786 private final <T extends HasMetadata, 787 X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<T>>> 788 void startControllers(@Observes 789 @Initialized(ApplicationScoped.class) 790 @Priority(LIBRARY_AFTER) 791 final Object ignored, 792 final BeanManager beanManager) { 793 final String cn = this.getClass().getName(); 794 final String mn = "startControllers"; 795 if (this.logger.isLoggable(Level.FINER)) { 796 this.logger.entering(cn, mn, new Object[] { ignored, beanManager }); 797 } 798 799 if (beanManager != null && !this.beans.isEmpty()) { 800 801 // Use the microbean-configuration-cdi library to abstract away 802 // configuration details. But we can't just put a 803 // Configurations object in our incoming method parameters, 804 // because according to the specification that will result in 805 // non-portable behavior. So we look it up "by hand". 806 final Bean<?> configurationsBean = beanManager.resolve(beanManager.getBeans(Configurations.class)); 807 assert configurationsBean != null; 808 final Configurations configurations = 809 (Configurations)beanManager.getReference(configurationsBean, 810 Configurations.class, 811 beanManager.createCreationalContext(configurationsBean)); 812 assert configurations != null; 813 814 final Duration synchronizationInterval = configurations.getValue("synchronizationInterval", Duration.class); 815 816 for (final Bean<?> bean : this.beans) { 817 assert bean != null; 818 819 final Set<Annotation> qualifiers = bean.getQualifiers(); 820 final Annotation[] qualifiersArray; 821 if (qualifiers == null) { 822 qualifiersArray = null; 823 } else { 824 qualifiersArray = qualifiers.toArray(new Annotation[qualifiers.size()]); 825 } 826 827 @Issue(id = "6", uri = "https://github.com/microbean/microbean-kubernetes-controller-cdi/issues/6") 828 final Type cacheType = new ParameterizedTypeImpl(Map.class, new Type[] { Object.class, extractConcreteKubernetesResourceClass(bean) }); 829 830 final Map<Object, T> cache; 831 final Set<Bean<?>> cacheBeans = beanManager.getBeans(cacheType, qualifiersArray); 832 if (cacheBeans == null || cacheBeans.isEmpty()) { 833 cache = null; 834 } else { 835 final Bean<?> cacheBean = beanManager.resolve(cacheBeans); 836 if (cacheBean == null) { 837 cache = null; 838 } else { 839 @SuppressWarnings("unchecked") 840 final Map<Object, T> temp = 841 (Map<Object, T>)beanManager.getReference(cacheBean, 842 cacheType, 843 beanManager.createCreationalContext(cacheBean)); 844 cache = temp; 845 } 846 } 847 if (cache == null && this.logger.isLoggable(Level.INFO)) { 848 this.logger.logp(Level.INFO, cn, mn, 849 "No Kubernetes resource cache found for qualifiers: {0}", 850 qualifiers); 851 } 852 853 final NotificationOptions notificationOptions; 854 final Bean<?> notificationOptionsBean = 855 beanManager.resolve(beanManager.getBeans(NotificationOptions.class, qualifiersArray)); 856 if (notificationOptionsBean == null) { 857 notificationOptions = null; 858 } else { 859 notificationOptions = 860 (NotificationOptions)beanManager.getReference(notificationOptionsBean, 861 NotificationOptions.class, 862 beanManager.createCreationalContext(notificationOptionsBean)); 863 } 864 865 @SuppressWarnings("unchecked") 866 final X contextualReference = 867 (X)beanManager.getReference(bean, 868 getListableVersionWatchableType(bean), 869 beanManager.createCreationalContext(bean)); 870 871 final Controller<T> controller = 872 new CDIController<>(contextualReference, 873 synchronizationInterval, 874 cache, 875 new CDIEventDistributor<>(this.priorContext, 876 this.kubernetesEventContext, 877 qualifiers, 878 notificationOptions, 879 this.syncNeeded, 880 this.asyncNeeded), 881 t -> { 882 if (this.logger.isLoggable(Level.SEVERE)) { 883 this.logger.logp(Level.SEVERE, cn, mn, t.getMessage(), t); 884 } 885 return true; 886 }); 887 888 if (this.logger.isLoggable(Level.INFO)) { 889 this.logger.logp(Level.INFO, cn, mn, "Starting {0}", controller); 890 } 891 try { 892 controller.start(); 893 } catch (final IOException ioException) { 894 throw new DeploymentException(ioException.getMessage(), ioException); 895 } 896 897 synchronized (this.controllers) { 898 this.controllers.add(controller); 899 } 900 } 901 902 } 903 904 if (this.logger.isLoggable(Level.FINER)) { 905 this.logger.exiting(cn, mn); 906 } 907 } 908 909 /** 910 * {@linkplain Observes Observes} the {@linkplain BeforeDestroyed 911 * imminent destruction} of the {@linkplain ApplicationScoped 912 * application scope} at {@link 913 * javax.interceptor.Interceptor.Priority#LIBRARY_BEFORE 914 * LIBRARY_BEFORE} {@linkplain Priority priority} and stops any 915 * {@linkplain #controllers <tt>Controller</tt> instances that were 916 * started} by {@linkplain Controller#close() closing} them. 917 * 918 * @param event the actual {@link BeforeDestroyed} event; ignored; 919 * may be {@code null} 920 * 921 * @exception IOException if {@link Controller#close()} throws an 922 * {@link IOException} 923 * 924 * @see #startControllers(Object, BeanManager) 925 */ 926 private final void stopControllers(@Observes 927 @BeforeDestroyed(ApplicationScoped.class) 928 @Priority(LIBRARY_BEFORE) 929 final Object event) 930 throws IOException { 931 final String cn = this.getClass().getName(); 932 final String mn = "stopControllers"; 933 if (this.logger.isLoggable(Level.FINER)) { 934 this.logger.entering(cn, mn, event); 935 } 936 937 Exception exception = null; 938 synchronized (this.controllers) { 939 for (final Controller<?> controller : this.controllers) { 940 assert controller != null; 941 try { 942 controller.close(); 943 } catch (final IOException | RuntimeException closeException) { 944 if (exception == null) { 945 exception = closeException; 946 } else { 947 exception.addSuppressed(closeException); 948 } 949 } 950 } 951 } 952 953 if (exception instanceof IOException) { 954 throw (IOException)exception; 955 } else if (exception instanceof RuntimeException) { 956 throw (RuntimeException)exception; 957 } else if (exception != null) { 958 throw new IllegalStateException(exception.getMessage(), exception); 959 } 960 961 if (this.logger.isLoggable(Level.FINER)) { 962 this.logger.exiting(cn, mn); 963 } 964 } 965 966 967 /* 968 * Non-observer methods. 969 */ 970 971 972 /** 973 * Given a {@link Bean}, checks to see if it is annotated with at 974 * least one annotation that is, in turn, annotated with {@link 975 * KubernetesEventSelector}, and, if so, adds it to a list of 976 * candidate sources of objects that are both {@link Listable} and 977 * {@link VersionWatchable}. 978 * 979 * @param bean the {@link Bean} to inspect; may be {@code null} in 980 * which case no action will be taken 981 * 982 * @param beanManager the {@link BeanManager} in effect for the 983 * current CDI container; may be {@code null} 984 * 985 * @see Annotations#retainAnnotationsQualifiedWith(Collection, 986 * Class, BeanManager) 987 * 988 * @see KubernetesEventSelector 989 */ 990 private final void processPotentialEventSelectorBean(final Bean<?> bean, final BeanManager beanManager) { 991 final String cn = this.getClass().getName(); 992 final String mn = "processPotentialEventSelectorBean"; 993 if (this.logger.isLoggable(Level.FINER)) { 994 this.logger.entering(cn, mn, new Object[] { bean, beanManager }); 995 } 996 997 if (bean != null) { 998 final Type listableVersionWatchableType = getListableVersionWatchableType(bean); 999 if (listableVersionWatchableType != null) { 1000 final Set<Annotation> kubernetesEventSelectors = Annotations.retainAnnotationsQualifiedWith(bean.getQualifiers(), KubernetesEventSelector.class, beanManager); 1001 if (kubernetesEventSelectors != null && !kubernetesEventSelectors.isEmpty()) { 1002 synchronized (this.eventSelectorBeans) { 1003 this.eventSelectorBeans.put(kubernetesEventSelectors, bean); 1004 } 1005 } 1006 } 1007 } 1008 1009 if (this.logger.isLoggable(Level.FINER)) { 1010 this.logger.exiting(cn, mn); 1011 } 1012 } 1013 1014 /** 1015 * Given an {@link ObserverMethod}, checks to see if its event 1016 * parameter is annotated with at least one annotation that is, in 1017 * turn, annotated with {@link KubernetesEventSelector}, and, if so, 1018 * makes sure that any {@link Bean}s whose {@linkplain 1019 * Bean#getQualifiers() qualifiers} line up are retained by this 1020 * extension as sources of {@link Listable} and {@link 1021 * VersionWatchable} instances. 1022 * 1023 * @param <X> a type that extends {@link HasMetadata} and therefore 1024 * represents a persistent Kubernetes resource 1025 * 1026 * @param event the {@link ProcessObserverMethod} event to inspect; 1027 * may be {@code null} in which case no action will be taken 1028 * 1029 * @param beanManager the {@link BeanManager} in effect for the 1030 * current CDI container; may be {@code null} 1031 * 1032 * @see Annotations#retainAnnotationsQualifiedWith(Collection, 1033 * Class, BeanManager) 1034 * 1035 * @see KubernetesEventSelector 1036 */ 1037 private final <X extends HasMetadata> void processPotentialEventSelectorObserverMethod(final ProcessObserverMethod<X, ?> event, final BeanManager beanManager) { 1038 final String cn = this.getClass().getName(); 1039 final String mn = "processPotentialEventSelectorObserverMethod"; 1040 if (this.logger.isLoggable(Level.FINER)) { 1041 this.logger.entering(cn, mn, new Object[] { event, beanManager }); 1042 } 1043 1044 if (event != null) { 1045 final ObserverMethod<X> observerMethod = event.getObserverMethod(); 1046 if (observerMethod != null) { 1047 final Set<Annotation> kubernetesEventSelectors = Annotations.retainAnnotationsQualifiedWith(observerMethod.getObservedQualifiers(), KubernetesEventSelector.class, beanManager); 1048 if (kubernetesEventSelectors != null && !kubernetesEventSelectors.isEmpty()) { 1049 event.configureObserverMethod() 1050 .notifyWith(new Notifier<>(this.priorContext, this.kubernetesEventContext, observerMethod)); 1051 if (observerMethod.isAsync()) { 1052 if (!this.asyncNeeded) { 1053 this.asyncNeeded = true; 1054 } 1055 } else if (!this.syncNeeded) { 1056 this.syncNeeded = true; 1057 } 1058 final Bean<?> bean; 1059 synchronized (this.eventSelectorBeans) { 1060 bean = this.eventSelectorBeans.remove(kubernetesEventSelectors); 1061 } 1062 if (bean != null) { 1063 boolean added; 1064 synchronized (this.beans) { 1065 added = this.beans.add(bean); 1066 } 1067 if (added) { 1068 final Class<? extends HasMetadata> concreteKubernetesResourceClass = extractConcreteKubernetesResourceClass(bean); 1069 assert concreteKubernetesResourceClass != null; 1070 synchronized (this.priorTypes) { 1071 this.priorTypes.add(concreteKubernetesResourceClass); 1072 } 1073 } 1074 } 1075 } 1076 } 1077 } 1078 1079 if (this.logger.isLoggable(Level.FINER)) { 1080 this.logger.exiting(cn, mn); 1081 } 1082 } 1083 1084 1085 /* 1086 * Static methods. 1087 */ 1088 1089 1090 /** 1091 * A bit of a hack to return the {@link Type} that is the "right 1092 * kind" of {@link Listable} and {@link VersionWatchable} 1093 * implementation from the supplied {@link Bean}'s {@linkplain 1094 * Bean#getTypes() types}, if it is present among them, and to 1095 * return {@code null} otherwise. 1096 * 1097 * <p>This method may return {@code null}.</p> 1098 * 1099 * <p>{@link Operation Operation.class} is the most general 1100 * interface that implements both {@link Listable} and {@link 1101 * VersionWatchable}, which is a common constraint for {@link 1102 * Controller} operations, and is often what this method returns in 1103 * {@link ParameterizedType} form. 1104 * 1105 * @param bean the {@link Bean} to inspect; may be {@code null} in 1106 * which case {@code null} will be returned 1107 * 1108 * @return a {@link Type} that is both {@link Listable} and {@link 1109 * VersionWatchable}, or {@code null} 1110 * 1111 * @see Operation 1112 * 1113 * @see Listable 1114 * 1115 * @see VersionWatchable 1116 */ 1117 private static final Type getListableVersionWatchableType(final Bean<?> bean) { 1118 final String cn = KubernetesControllerExtension.class.getName(); 1119 final Logger logger = Logger.getLogger(cn); 1120 assert logger != null; 1121 final String mn = "getListableVersionWatchableType"; 1122 if (logger.isLoggable(Level.FINER)) { 1123 logger.entering(cn, mn, bean); 1124 } 1125 1126 final Type returnValue; 1127 if (bean == null) { 1128 returnValue = null; 1129 } else { 1130 returnValue = getListableVersionWatchableType(bean.getTypes()); 1131 } 1132 1133 if (logger.isLoggable(Level.FINER)) { 1134 logger.exiting(cn, mn, returnValue); 1135 } 1136 return returnValue; 1137 } 1138 1139 private static final Type getListableVersionWatchableType(final Collection<? extends Type> beanTypes) { 1140 final String cn = KubernetesControllerExtension.class.getName(); 1141 final Logger logger = Logger.getLogger(cn); 1142 assert logger != null; 1143 final String mn = "getListableVersionWatchableType"; 1144 if (logger.isLoggable(Level.FINER)) { 1145 logger.entering(cn, mn, beanTypes); 1146 } 1147 1148 final Type returnValue; 1149 if (beanTypes == null || beanTypes.isEmpty()) { 1150 returnValue = null; 1151 } else { 1152 Type candidate = null; 1153 for (final Type beanType : beanTypes) { 1154 if (beanType instanceof ParameterizedType) { 1155 candidate = getListableVersionWatchableType((ParameterizedType)beanType); 1156 if (candidate != null) { 1157 break; 1158 } 1159 } 1160 } 1161 returnValue = candidate; 1162 } 1163 1164 if (logger.isLoggable(Level.FINER)) { 1165 logger.exiting(cn, mn, returnValue); 1166 } 1167 return returnValue; 1168 } 1169 1170 private static final Type getListableVersionWatchableType(final ParameterizedType type) { 1171 final String cn = KubernetesControllerExtension.class.getName(); 1172 final Logger logger = Logger.getLogger(cn); 1173 assert logger != null; 1174 final String mn = "getListableVersionWatchableType"; 1175 if (logger.isLoggable(Level.FINER)) { 1176 logger.entering(cn, mn, type); 1177 } 1178 1179 Type candidate = null; 1180 final Type rawType = type.getRawType(); 1181 if (rawType instanceof Class) { 1182 // This should always be the case; see e.g. https://stackoverflow.com/a/5767681/208288 1183 final Class<?> rawClass = (Class<?>)rawType; 1184 if (Listable.class.isAssignableFrom(rawClass) && VersionWatchable.class.isAssignableFrom(rawClass)) { 1185 // TODO: check type parameters 1186 candidate = type; 1187 } 1188 } 1189 1190 final Type returnValue = candidate; 1191 1192 if (logger.isLoggable(Level.FINER)) { 1193 logger.exiting(cn, mn, returnValue); 1194 } 1195 return returnValue; 1196 } 1197 1198 private static final Class<? extends HasMetadata> extractConcreteKubernetesResourceClass(final BeanAttributes<?> beanAttributes) { 1199 Class<? extends HasMetadata> returnValue = null; 1200 if (beanAttributes != null) { 1201 returnValue = extractConcreteKubernetesResourceClass(beanAttributes.getTypes()); 1202 } 1203 return returnValue; 1204 } 1205 1206 private static final Class<? extends HasMetadata> extractConcreteKubernetesResourceClass(final Set<? extends Type> types) { 1207 Class<? extends HasMetadata> returnValue = null; 1208 if (types != null && !types.isEmpty()) { 1209 final Set<Type> typesToProcess = new LinkedHashSet<>(types); 1210 while (!typesToProcess.isEmpty()) { 1211 final Iterator<Type> iterator = typesToProcess.iterator(); 1212 assert iterator != null; 1213 assert iterator.hasNext(); 1214 final Type type = iterator.next(); 1215 iterator.remove(); 1216 if (type != null) { 1217 if (type instanceof Class<?>) { 1218 final Class<?> concreteClass = (Class<?>)type; 1219 if (HasMetadata.class.isAssignableFrom(concreteClass)) { 1220 @SuppressWarnings("unchecked") 1221 final Class<? extends HasMetadata> temp = (Class<? extends HasMetadata>)concreteClass; 1222 returnValue = temp; 1223 break; 1224 } 1225 } else if (type instanceof ParameterizedType) { 1226 final ParameterizedType pType = (ParameterizedType)type; 1227 final Type[] actualTypeArguments = pType.getActualTypeArguments(); 1228 if (actualTypeArguments != null && actualTypeArguments.length > 0) { 1229 for (final Type actualTypeArgument : actualTypeArguments) { 1230 if (actualTypeArgument != null) { 1231 typesToProcess.add(actualTypeArgument); 1232 } 1233 } 1234 } 1235 } 1236 } 1237 } 1238 } 1239 return returnValue; 1240 } 1241 1242 1243 /* 1244 * Inner and nested classes. 1245 */ 1246 1247 1248 private static final class ParameterizedTypeImpl implements ParameterizedType { 1249 1250 private final Type ownerType; 1251 1252 private final Type rawType; 1253 1254 private final Type[] actualTypeArguments; 1255 1256 private final int hashCode; 1257 1258 private ParameterizedTypeImpl(final Class<?> rawType, final Type[] actualTypeArguments) { 1259 this(null, rawType, actualTypeArguments); 1260 } 1261 1262 private ParameterizedTypeImpl(final Type ownerType, final Class<?> rawType, final Type[] actualTypeArguments) { 1263 super(); 1264 this.ownerType = ownerType; 1265 this.rawType = Objects.requireNonNull(rawType); 1266 this.actualTypeArguments = actualTypeArguments; 1267 this.hashCode = this.computeHashCode(); 1268 } 1269 1270 @Override 1271 public final Type getOwnerType() { 1272 return this.ownerType; 1273 } 1274 1275 @Override 1276 public final Type getRawType() { 1277 return this.rawType; 1278 } 1279 1280 @Override 1281 public final Type[] getActualTypeArguments() { 1282 return this.actualTypeArguments; 1283 } 1284 1285 @Override 1286 public final int hashCode() { 1287 return this.hashCode; 1288 } 1289 1290 private final int computeHashCode() { 1291 int hashCode = 17; 1292 1293 final Object ownerType = this.getOwnerType(); 1294 int c = ownerType == null ? 0 : ownerType.hashCode(); 1295 hashCode = 37 * hashCode + c; 1296 1297 final Object rawType = this.getRawType(); 1298 c = rawType == null ? 0 : rawType.hashCode(); 1299 hashCode = 37 * hashCode + c; 1300 1301 final Type[] actualTypeArguments = this.getActualTypeArguments(); 1302 c = Arrays.hashCode(actualTypeArguments); 1303 hashCode = 37 * hashCode + c; 1304 1305 return hashCode; 1306 } 1307 1308 @Override 1309 public final boolean equals(final Object other) { 1310 if (other == this) { 1311 return true; 1312 } else if (other instanceof ParameterizedType) { 1313 final ParameterizedType her = (ParameterizedType)other; 1314 1315 final Object ownerType = this.getOwnerType(); 1316 if (ownerType == null) { 1317 if (her.getOwnerType() != null) { 1318 return false; 1319 } 1320 } else if (!ownerType.equals(her.getOwnerType())) { 1321 return false; 1322 } 1323 1324 final Object rawType = this.getRawType(); 1325 if (rawType == null) { 1326 if (her.getRawType() != null) { 1327 return false; 1328 } 1329 } else if (!rawType.equals(her.getRawType())) { 1330 return false; 1331 } 1332 1333 final Type[] actualTypeArguments = this.getActualTypeArguments(); 1334 if (!Arrays.equals(actualTypeArguments, her.getActualTypeArguments())) { 1335 return false; 1336 } 1337 1338 return true; 1339 } else { 1340 return false; 1341 } 1342 } 1343 1344 } 1345 1346 private static final class CDIController<T extends HasMetadata> extends Controller<T> { 1347 1348 private final EventDistributor<T> eventDistributor; 1349 1350 private final boolean close; 1351 1352 // This @SuppressWarnings("rawtypes") is here because the 1353 // kubernetes-model project uses raw types throughout. This class 1354 // does not. 1355 @SuppressWarnings("rawtypes") 1356 private 1357 <X extends Listable<? extends KubernetesResourceList> 1358 & VersionWatchable<? extends Closeable, Watcher<T>>> 1359 CDIController(final X operation, 1360 final Duration synchronizationInterval, 1361 final Map<Object, T> knownObjects, 1362 final CDIEventDistributor<T> eventDistributor, 1363 final Function<? super Throwable, Boolean> errorHandler) { 1364 this(operation, synchronizationInterval, errorHandler, knownObjects, new EventDistributor<>(knownObjects, synchronizationInterval), true); 1365 assert this.eventDistributor != null; 1366 if (eventDistributor != null) { 1367 this.eventDistributor.addConsumer(eventDistributor, errorHandler); 1368 } 1369 } 1370 1371 // This @SuppressWarnings("rawtypes") is here because the 1372 // kubernetes-model project uses raw types throughout. This class 1373 // does not. 1374 @SuppressWarnings("rawtypes") 1375 private 1376 <X extends Listable<? extends KubernetesResourceList> 1377 & VersionWatchable<? extends Closeable, Watcher<T>>> 1378 CDIController(final X operation, 1379 final Duration synchronizationInterval, 1380 final Function<? super Throwable, Boolean> errorHandler, 1381 final Map<Object, T> knownObjects, 1382 final EventDistributor<T> siphon, 1383 final boolean close) { 1384 super(operation, null, synchronizationInterval, errorHandler, knownObjects, siphon); 1385 this.eventDistributor = Objects.requireNonNull(siphon); 1386 this.close = close; 1387 } 1388 1389 @Override 1390 protected final boolean shouldSynchronize() { 1391 return this.eventDistributor.shouldSynchronize(); 1392 } 1393 1394 @Override 1395 protected final void onClose() { 1396 if (this.close) { 1397 this.eventDistributor.close(); 1398 } 1399 } 1400 1401 } 1402 1403 private static final class CDIEventDistributor<T extends HasMetadata> implements Consumer<AbstractEvent<? extends T>> { 1404 1405 private static final Annotation[] EMPTY_ANNOTATION_ARRAY = new Annotation[0]; 1406 1407 private final PriorContext priorContext; 1408 1409 private final KubernetesEventContext kubernetesEventContext; 1410 1411 private final Annotation[] qualifiers; 1412 1413 private final NotificationOptions notificationOptions; 1414 1415 private final boolean syncNeeded; 1416 1417 private final boolean asyncNeeded; 1418 1419 private final Logger logger; 1420 1421 private CDIEventDistributor(final PriorContext priorContext, 1422 final KubernetesEventContext kubernetesEventContext, 1423 final Set<Annotation> qualifiers, 1424 final NotificationOptions notificationOptions, 1425 final boolean syncNeeded, 1426 final boolean asyncNeeded) { 1427 super(); 1428 final String cn = this.getClass().getName(); 1429 this.logger = Logger.getLogger(cn); 1430 assert this.logger != null; 1431 final String mn = "<init>"; 1432 if (this.logger.isLoggable(Level.FINER)) { 1433 this.logger.entering(cn, mn, 1434 new Object[] { priorContext, 1435 kubernetesEventContext, 1436 qualifiers, 1437 notificationOptions, 1438 Boolean.valueOf(syncNeeded), 1439 Boolean.valueOf(asyncNeeded) 1440 }); 1441 } 1442 1443 this.priorContext = Objects.requireNonNull(priorContext); 1444 this.kubernetesEventContext = Objects.requireNonNull(kubernetesEventContext); 1445 if (qualifiers == null) { 1446 this.qualifiers = EMPTY_ANNOTATION_ARRAY; 1447 } else { 1448 this.qualifiers = qualifiers.toArray(new Annotation[qualifiers.size()]); 1449 } 1450 this.notificationOptions = notificationOptions; 1451 this.syncNeeded = syncNeeded; 1452 this.asyncNeeded = asyncNeeded; 1453 1454 if (this.logger.isLoggable(Level.FINER)) { 1455 this.logger.exiting(cn, mn); 1456 } 1457 } 1458 1459 @Override 1460 public final void accept(final AbstractEvent<? extends T> controllerEvent) { 1461 final String cn = this.getClass().getName(); 1462 final String mn = "accept"; 1463 if (this.logger.isLoggable(Level.FINER)) { 1464 this.logger.entering(cn, mn, controllerEvent); 1465 } 1466 1467 if (controllerEvent != null && (this.syncNeeded || this.asyncNeeded)) { 1468 1469 final BeanManager beanManager = CDI.current().getBeanManager(); 1470 assert beanManager != null; 1471 1472 final javax.enterprise.event.Event<Object> cdiEventMachinery = beanManager.getEvent(); 1473 assert cdiEventMachinery != null; 1474 1475 // Copy the qualifiers we were supplied with into an array big 1476 // enough to hold one more qualifier. That qualifier will be 1477 // based on the event type, which of course we didn't know at 1478 // construction time. 1479 final Annotation[] qualifiers = Arrays.copyOf(this.qualifiers, this.qualifiers.length + 1); 1480 assert qualifiers != null; 1481 1482 final AbstractEvent.Type eventType = controllerEvent.getType(); 1483 assert eventType != null; 1484 1485 switch (eventType) { 1486 1487 case ADDITION: 1488 if (controllerEvent instanceof SynchronizationEvent) { 1489 qualifiers[qualifiers.length - 1] = Added.Literal.withSynchronization(); 1490 } else { 1491 qualifiers[qualifiers.length - 1] = Added.Literal.withoutSynchronization(); 1492 } 1493 break; 1494 1495 case MODIFICATION: 1496 if (controllerEvent instanceof SynchronizationEvent) { 1497 qualifiers[qualifiers.length - 1] = Modified.Literal.withSynchronization(); 1498 } else { 1499 qualifiers[qualifiers.length - 1] = Modified.Literal.withoutSynchronization(); 1500 } 1501 break; 1502 1503 case DELETION: 1504 assert !(controllerEvent instanceof SynchronizationEvent); 1505 qualifiers[qualifiers.length - 1] = Deleted.Literal.INSTANCE; 1506 break; 1507 1508 default: 1509 throw new IllegalStateException(); 1510 1511 } 1512 1513 // This resource will be the actual "event" we end up firing. 1514 final T resource = controllerEvent.getResource(); 1515 assert resource != null; 1516 1517 // The "prior resource" represents the prior state (if any) 1518 // and can be null. We'll arrange for this to be "created" by 1519 // our PriorContext CDI Context when observer methods contain 1520 // a parameter qualified with @Prior. 1521 this.priorContext.put(resource, Optional.ofNullable(controllerEvent.getPriorResource())); 1522 1523 @SuppressWarnings("unchecked") 1524 final javax.enterprise.event.Event<T> broadcaster = cdiEventMachinery.select((Class<T>)resource.getClass(), qualifiers); 1525 1526 if (this.asyncNeeded) { 1527 1528 // Set up the machinery to fire the event asynchronously, 1529 // possibly in parallel. 1530 1531 final CompletionStage<T> stage; 1532 if (this.notificationOptions == null) { 1533 stage = broadcaster.fireAsync(resource); 1534 } else { 1535 stage = broadcaster.fireAsync(resource, this.notificationOptions); 1536 } 1537 assert stage != null; 1538 1539 // When all asynchronous observers have been notified, then 1540 // fire synchronous events (if needed). Ensure that the 1541 // PriorContext that is responsible for supplying injected 1542 // observer method parameters annotated with @Prior is 1543 // deactivated in all cases. 1544 1545 // TODO: should we make it configurable whether to fire 1546 // synchronous events before asynchronous events or the 1547 // other way around? 1548 1549 stage.whenComplete((event, throwable) -> { 1550 if (throwable != null && this.logger.isLoggable(Level.SEVERE)) { 1551 logger.logp(Level.SEVERE, cn, mn, throwable.getMessage(), throwable); 1552 } 1553 // TODO: should the presence of a non-null throwable 1554 // cause us to not perform synchronous firing? 1555 try { 1556 assert event != null; 1557 if (this.syncNeeded) { 1558 broadcaster.fire(event); 1559 } 1560 } finally { 1561 this.kubernetesEventContext.destroy(); 1562 this.priorContext.remove(event); 1563 } 1564 }); 1565 1566 } else { 1567 assert this.syncNeeded; 1568 1569 try { 1570 broadcaster.fire(resource); 1571 } finally { 1572 this.kubernetesEventContext.destroy(); 1573 this.priorContext.remove(resource); 1574 } 1575 } 1576 1577 } 1578 1579 if (this.logger.isLoggable(Level.FINER)) { 1580 this.logger.exiting(cn, mn); 1581 } 1582 } 1583 1584 } 1585 1586 private static final class PriorContext implements AlterableContext { 1587 1588 private static final InheritableThreadLocal<CurrentEventContext> currentEventContext = new InheritableThreadLocal<CurrentEventContext>() { 1589 @Override 1590 protected final CurrentEventContext initialValue() { 1591 return new CurrentEventContext(); 1592 } 1593 }; 1594 1595 /** 1596 * A {@linkplain Collections#synchronizedMap(Map) synchronized} 1597 * {@link IdentityHashMap} that maps a "current" {@link 1598 * HasMetadata} to its prior representation. 1599 * 1600 * @see #put(HasMetadata, Optional) 1601 */ 1602 private final Map<HasMetadata, Optional<? extends HasMetadata>> instances; 1603 1604 private PriorContext() { 1605 super(); 1606 // This needs to be an IdentityHashMap under the covers because 1607 // it turns out that all kubernetes-model classes use Lombok's 1608 // indiscriminate equals()-and-hashCode() generation. We need 1609 // to track Kubernetes resources in this Context implementation 1610 // by their actual JVM identity. 1611 this.instances = Collections.synchronizedMap(new IdentityHashMap<>()); 1612 } 1613 1614 /** 1615 * Activates this {@link PriorContext} <strong>for the {@linkplain 1616 * Thread#currentThread() current <code>Thread</code>}</strong>. 1617 * 1618 * @param currentEvent the {@link HasMetadata} that is currently 1619 * being fired as a CDI event; must not be {@code null} 1620 * 1621 * @exception NullPointerException if {@code currentEvent} is 1622 * {@code null} 1623 */ 1624 private final void activate(final HasMetadata currentEvent) { 1625 Objects.requireNonNull(currentEvent); 1626 final CurrentEventContext c = currentEventContext.get(); 1627 assert c != null; 1628 c.currentEvent = currentEvent; 1629 c.active = true; 1630 } 1631 1632 /** 1633 * Deactivates this {@link PriorContext} <strong>for the {@linkplain 1634 * Thread#currentThread() current <code>Thread</code>}</strong>. 1635 */ 1636 private final void deactivate() { 1637 final CurrentEventContext c = currentEventContext.get(); 1638 assert c != null; 1639 c.active = false; 1640 c.currentEvent = null; 1641 // Note: do NOT be tempted to call this.remove() here. 1642 } 1643 1644 /** 1645 * Associates the supplied {@code priorEvent} with the supplied 1646 * {@code currentEvent} and returns any previously associated 1647 * event. 1648 * 1649 * <p>This method <strong>may return {@code null}</strong>.</p> 1650 * 1651 * @param <X> a type that extends {@link HasMetadata} and therefore 1652 * represents a persistent Kubernetes resource 1653 * 1654 * @param currentEvent a Kubernetes resource about to be fired as 1655 * a CDI event; must not be {@code null} 1656 * 1657 * @param priorEvent an {@link Optional} Kubernetes resource that 1658 * represents the last known state of the {@code currentEvent} 1659 * Kubernetes resource; must not be {@code null} 1660 * 1661 * @return any previously associated Kubernetes resource as an 1662 * {@link Optional}, <strong>or, somewhat unusually, {@code null} 1663 * if there was no such {@link Optional}</strong> 1664 * 1665 * @exception NullPointerException if {@code currentEvent} or 1666 * {@code priorEvent} is {@code null} 1667 */ 1668 private final <X extends HasMetadata> Optional<X> put(final X currentEvent, final Optional<X> priorEvent) { 1669 @SuppressWarnings("unchecked") 1670 final Optional<X> returnValue = (Optional<X>)this.instances.put(Objects.requireNonNull(currentEvent), 1671 Objects.requireNonNull(priorEvent)); 1672 return returnValue; 1673 } 1674 1675 /** 1676 * Removes the supplied {@link HasMetadata} from this {@link 1677 * PriorContext}'s registry of such objects and returns any {@link 1678 * Optional} indexed under it. 1679 * 1680 * <p>This method <strong>may return {@code null}</strong>.</p> 1681 * 1682 * @param currentEvent the {@link HasMetadata} to remove; must not 1683 * be {@code null} 1684 * 1685 * @return an {@link Optional} representing the prior state 1686 * indexed under the supplied {@link HasMetadata}, <strong>or 1687 * {@code null}</strong> 1688 * 1689 * @exception NullPointerException if {@code currentEvent} is 1690 * {@code null} 1691 */ 1692 private final Optional<? extends HasMetadata> remove(final HasMetadata currentEvent) { 1693 return this.instances.remove(Objects.requireNonNull(currentEvent)); 1694 } 1695 1696 private final Optional<? extends HasMetadata> get() { 1697 if (!this.isActive()) { 1698 throw new ContextNotActiveException(); 1699 } 1700 final CurrentEventContext c = currentEventContext.get(); 1701 assert c != null; 1702 assert c.active; 1703 assert c.currentEvent != null; 1704 // Yes, this can return null, and yes, our return type is 1705 // Optional. Do NOT be tempted to return an empty Optional 1706 // here! 1707 return this.instances.get(c.currentEvent); 1708 } 1709 1710 @Override 1711 public final <T> T get(final Contextual<T> bean) { 1712 @SuppressWarnings("unchecked") 1713 final T returnValue = (T)this.get(); 1714 return returnValue; 1715 } 1716 1717 @Override 1718 public final <T> T get(final Contextual<T> bean, final CreationalContext<T> cc) { 1719 @SuppressWarnings("unchecked") 1720 final T returnValue = (T)this.get(); 1721 return returnValue; 1722 } 1723 1724 @Override 1725 public final void destroy(final Contextual<?> bean) { 1726 if (!this.isActive()) { 1727 throw new ContextNotActiveException(); 1728 } 1729 final CurrentEventContext c = currentEventContext.get(); 1730 assert c != null; 1731 assert c.active; 1732 assert c.currentEvent != null; 1733 this.remove(c.currentEvent); 1734 } 1735 1736 @Override 1737 public final Class<? extends Annotation> getScope() { 1738 return PriorScoped.class; 1739 } 1740 1741 @Override 1742 public final boolean isActive() { 1743 final CurrentEventContext c = currentEventContext.get(); 1744 assert c != null; 1745 return c.active && c.currentEvent != null && this.instances.containsKey(c.currentEvent); 1746 } 1747 1748 private static final class CurrentEventContext { 1749 1750 private volatile HasMetadata currentEvent; 1751 1752 private volatile boolean active; 1753 1754 private CurrentEventContext() { 1755 super(); 1756 } 1757 1758 } 1759 1760 } 1761 1762 private static final class Notifier<T extends HasMetadata> implements EventConsumer<T> { 1763 1764 private final PriorContext priorContext; 1765 1766 private final KubernetesEventContext kubernetesEventContext; 1767 1768 private final ObserverMethod<T> observerMethod; 1769 1770 private Notifier(final PriorContext priorContext, 1771 final KubernetesEventContext kubernetesEventContext, 1772 final ObserverMethod<T> observerMethod) { 1773 super(); 1774 this.priorContext = Objects.requireNonNull(priorContext); 1775 this.kubernetesEventContext = Objects.requireNonNull(kubernetesEventContext); 1776 this.observerMethod = Objects.requireNonNull(observerMethod); 1777 } 1778 1779 @Override 1780 public final void accept(final EventContext<T> eventContext) { 1781 try { 1782 this.kubernetesEventContext.setActive(true); 1783 this.priorContext.activate(Objects.requireNonNull(eventContext).getEvent()); // thread-specific 1784 this.observerMethod.notify(eventContext); 1785 } finally { 1786 this.priorContext.deactivate(); // thread-specific 1787 this.kubernetesEventContext.setActive(false); 1788 } 1789 } 1790 1791 } 1792 1793 @Retention(value = RetentionPolicy.RUNTIME) 1794 @Scope // deliberately NOT NormalScope 1795 @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD }) 1796 private static @interface PriorScoped { 1797 1798 } 1799 1800}