001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
002 *
003 * Copyright © 2018 microBean.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
014 * implied.  See the License for the specific language governing
015 * permissions and limitations under the License.
016 */
017package org.microbean.kubernetes.controller.cdi;
018
019import java.io.Closeable;
020import java.io.IOException;
021
022import java.lang.annotation.Annotation;
023import java.lang.annotation.ElementType;
024import java.lang.annotation.Retention;
025import java.lang.annotation.RetentionPolicy;
026import java.lang.annotation.Target;
027
028import java.lang.reflect.ParameterizedType;
029import java.lang.reflect.Type;
030
031import java.time.Duration;
032
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.HashMap;
038import java.util.HashSet;
039import java.util.IdentityHashMap;
040import java.util.Iterator;
041import java.util.LinkedHashSet;
042import java.util.Map;
043import java.util.Objects;
044import java.util.Optional;
045import java.util.Set;
046
047import java.util.concurrent.CompletionStage;
048import java.util.concurrent.CountDownLatch;
049
050import java.util.function.Consumer;
051import java.util.function.Function;
052
053import java.util.logging.Level;
054import java.util.logging.Logger;
055
056import javax.annotation.Priority;
057
058import javax.enterprise.context.ApplicationScoped;
059import javax.enterprise.context.BeforeDestroyed;
060import javax.enterprise.context.ContextNotActiveException;
061import javax.enterprise.context.Initialized;
062
063import javax.enterprise.context.spi.AlterableContext;
064import javax.enterprise.context.spi.Contextual;
065import javax.enterprise.context.spi.CreationalContext;
066
067import javax.enterprise.event.NotificationOptions;
068import javax.enterprise.event.Observes;
069
070import javax.enterprise.inject.Default; // for javadoc only
071
072import javax.enterprise.inject.spi.AfterBeanDiscovery;
073import javax.enterprise.inject.spi.BeanAttributes;
074import javax.enterprise.inject.spi.Bean;
075import javax.enterprise.inject.spi.BeanManager;
076import javax.enterprise.inject.spi.CDI;
077import javax.enterprise.inject.spi.DeploymentException;
078import javax.enterprise.inject.spi.EventContext;
079import javax.enterprise.inject.spi.ObserverMethod;
080import javax.enterprise.inject.spi.ProcessBean;
081import javax.enterprise.inject.spi.ProcessManagedBean;
082import javax.enterprise.inject.spi.ProcessObserverMethod;
083import javax.enterprise.inject.spi.ProcessProducerField;
084import javax.enterprise.inject.spi.ProcessProducerMethod;
085import javax.enterprise.inject.spi.ProcessSyntheticBean;
086import javax.enterprise.inject.spi.ProcessSyntheticObserverMethod;
087
088import javax.enterprise.inject.spi.configurator.ObserverMethodConfigurator.EventConsumer;
089
090import javax.inject.Qualifier; // for javadoc only
091import javax.inject.Scope;
092
093import io.fabric8.kubernetes.api.model.ConfigMap; // for javadoc only
094import io.fabric8.kubernetes.api.model.HasMetadata;
095import io.fabric8.kubernetes.api.model.KubernetesResourceList;
096
097import io.fabric8.kubernetes.client.KubernetesClient; // for javadoc only
098import io.fabric8.kubernetes.client.Watcher;
099
100import io.fabric8.kubernetes.client.dsl.Listable;
101import io.fabric8.kubernetes.client.dsl.VersionWatchable;
102import io.fabric8.kubernetes.client.dsl.Operation; // for javadoc only
103
104import org.microbean.cdi.AbstractBlockingExtension;
105import org.microbean.cdi.Annotations;
106
107import org.microbean.configuration.api.Configurations;
108
109import org.microbean.development.annotation.Issue;
110
111import org.microbean.kubernetes.controller.AbstractEvent;
112import org.microbean.kubernetes.controller.Controller;
113import org.microbean.kubernetes.controller.EventDistributor;
114import org.microbean.kubernetes.controller.SynchronizationEvent;
115
116import org.microbean.kubernetes.controller.cdi.annotation.Added;
117import org.microbean.kubernetes.controller.cdi.annotation.Deleted;
118import org.microbean.kubernetes.controller.cdi.annotation.Modified;
119import org.microbean.kubernetes.controller.cdi.annotation.KubernetesEventSelector;
120import org.microbean.kubernetes.controller.cdi.annotation.Prior;
121
122import static javax.interceptor.Interceptor.Priority.LIBRARY_AFTER;
123import static javax.interceptor.Interceptor.Priority.LIBRARY_BEFORE;
124
125/**
126 * An {@link AbstractBlockingExtension} that distributes Kubernetes
127 * events to interested listeners asynchronously.
128 *
129 * <h1>Usage</h1>
130 * 
131 * <p>To use this extension, simply place it on your classpath (along
132 * with your selection from a menu of certain required runtime
133 * dependencies described below).  If you have a mechanism for
134 * describing the kinds of Kubernetes resources you'd like to watch
135 * for events, and if you have observer methods created to do
136 * something with those events, both of which are described below,
137 * then this extension will take care of connecting to the Kubernetes
138 * API server and listing and watching for new events for you.</p>
139 *
140 * <h2>Dependency Choices</h2>
141 *
142 * <p>This extension relies on the presence of certain CDI beans.  In
143 * some cases, those beans are not produced by this extension.  For
144 * maximum flexibility, this project does not mandate how certain
145 * beans are produced.  Below is a list of the beans that are
146 * required, and suggested&mdash;but not required&mdash;ways of
147 * producing them.</p>
148 *
149 * <h2>Maven</h2>
150 *
151 * <p>If you are using Maven, you may indicate that you want this
152 * extension to be included on your project's runtime classpath with
153 * the following dependency stanza:</p>
154 *
155 * <blockquote><pre>&lt;dependency&gt;
156 *  &lt;groupId&gt;org.microbean&lt;/groupId&gt;
157 *  &lt;artifactId&gt;microbean-kubernetes-controller-cdi&lt;/artifactId&gt;
158 *  &lt;version&gt;0.2.1&lt;/version&gt;
159 *  &lt;scope&gt;runtime&lt;/scope&gt;
160 *&lt;/dependency&gt;</pre></blockquote>
161 *
162 * <h3>{@link KubernetesClient} Bean</h3>
163 *
164 * <p>This extension indirectly requires that a {@link
165 * KubernetesClient} be available in the CDI container (qualified with
166 * {@link Default @Default}).  You can use the <a
167 * href="https://microbean.github.io/microbean-kubernetes-client-cdi/">microBean
168 * Kubernetes Client CDI</a> project for this, or you can arrange to
169 * fulfil this requirement yourself.</p>
170 *
171 * <p>If you are going to use the <a
172 * href="https://microbean.github.io/microbean-kubernetes-client-cdi/">microBean
173 * Kubernetes Client CDI</a> project to provide a {@link
174 * Default}-qualified {@link KubernetesClient}, you can indicate that
175 * you want it to be included on your project's runtime classpath with
176 * the following dependency stanza:</p>
177 *
178 * <blockquote><pre>&lt;dependency&gt;
179 *  &lt;groupId&gt;org.microbean&lt;/groupId&gt;
180 *  &lt;artifactId&gt;microbean-kubernetes-client-cdi&lt;/artifactId&gt;
181 *  &lt;version&gt;0.3.1&lt;/version&gt;
182 *  &lt;scope&gt;runtime&lt;/scope&gt;
183 *&lt;/dependency&gt;</pre></blockquote>
184 *
185 * <h3>Configuration Beans</h3>
186 * 
187 * <p>You'll need an implementation of the <a
188 * href="https://microbean.github.io/microbean-configuration-api/">microBean
189 * Configuration API</a>.  Usually, the <a
190 * href="https://microbean.github.io/microbean-configuration/">microBean
191 * Configuration</a> project is what you want.  You can indicate that
192 * you want it to be included on your project's runtime classpath with
193 * the following dependency stanza:</p>
194 *
195 * <blockquote><pre>&lt;dependency&gt;
196 *  &lt;groupId&gt;org.microbean&lt;/groupId&gt;
197 *  &lt;artifactId&gt;microbean-configuration&lt;/artifactId&gt;
198 *  &lt;version&gt;0.4.2&lt;/version&gt;
199 *  &lt;scope&gt;runtime&lt;/scope&gt;
200 *&lt;/dependency&gt;</pre></blockquote>
201 *
202 * <p>You'll need a means of getting that configuration implementation
203 * into CDI.  Usually, you would use the <a
204 * href="https://microbean.github.io/microbean-configuration-cdi/">microBean
205 * Configuration CDI</a> project.  You can indicate that you want it
206 * to be included on your project's runtime classpath with the
207 * following dependency stanza:</p>
208 *
209 * <blockquote><pre>&lt;dependency&gt;
210 *  &lt;groupId&gt;org.microbean&lt;/groupId&gt;
211 *  &lt;artifactId&gt;microbean-configuration-cdi&lt;/artifactId&gt;
212 *  &lt;version&gt;0.4.2&lt;/version&gt;
213 *  &lt;scope&gt;runtime&lt;/scope&gt;
214 *&lt;/dependency&gt;</pre></blockquote>
215 *
216 * <h2>Event Selectors</h2>
217 * 
218 * <p>To describe the kinds of Kubernetes resources you're interested
219 * in, you'll need one or more <em>event selectors</em> in your CDI
220 * application.  An event selector, for the purposes of this class, is
221 * a CDI bean (either a managed bean or a producer method, most
222 * commonly) with certain types in its {@linkplain Bean#getTypes() set
223 * of bean types}.  Specifically, the event selector type, {@code X},
224 * must conform to this specification:</p>
225 *
226 * <blockquote>{@code <X extends Listable<? extends
227 * KubernetesResourceList> & VersionWatchable<? extends Closeable,
228 * Watcher<? extends HasMetadata>>>}</blockquote>
229 *
230 * <p>Many return types of methods belonging to {@link
231 * KubernetesClient} conveniently conform to this specification.</p>
232 *
233 * <p>The event selector will also need to be annotated with an
234 * annotation that you write describing the sort of event selection it
235 * is.  This annotation does not need any elements, but must itself be
236 * annotated with the {@link
237 * KubernetesEventSelector @KubernetesEventSelector} annotation.  It
238 * must be applicable to {@linkplain ElementType#PARAMETER parameters}
239 * and your event selector beans, so if as is most common you are
240 * writing a producer method it must be applicable to {@linkplain
241 * ElementType#METHOD methods} as well.</p>
242 *
243 * <p>Here is an example producer method that will cause this
244 * extension to look for all ConfigMap events:</p>
245 *
246 * <blockquote><pre>&#64;Produces
247 *&#64;{@link ApplicationScoped}
248 *&#64;AllConfigMapEvents // see declaration below
249 *private static final {@link Operation}&lt;{@link ConfigMap}, ConfigMapList, DoneableConfigMap, Resource&lt;ConfigMap, DoneableConfigMap&gt;&gt; selectAllConfigMaps(final {@link KubernetesClient} client) {
250 *  return {@link KubernetesClient#configMaps() client.configMaps()};
251 *}}</pre></blockquote>
252 *
253 * <p>Note in particular that {@link Operation} implements both {@link
254 * Listable} and {@link VersionWatchable} with the proper type
255 * parameters.</p>
256 *
257 * <p>The {@code @AllConfigMapEvents} annotation is simply:</p>
258 *
259 * <blockquote><pre>&#64;Documented
260 *&#64;{@link KubernetesEventSelector}
261 *&#64;{@link Qualifier}
262 *&#64;Retention(value = RetentionPolicy.RUNTIME)
263 *&#64;Target({ ElementType.METHOD, ElementType.PARAMETER })
264 *public &#64;interface AllConfigMapEvents {
265 *
266 *}</pre></blockquote>
267 *
268 * <h2>Observer Methods</h2>
269 *
270 * <p>Observer methods are where your CDI application actually takes
271 * delivery of a Kubernetes resource as a CDI event.</p>
272 *
273 * <p>You will need a notional pair consisting of an event selector
274 * and an observer method that conforms to certain requirements that
275 * help "link" it to its associated event selector.  To realize this
276 * pair, you write a normal CDI observer method that adheres to the
277 * following additional requirements:</p>
278 *
279 * <ol>
280 *
281 * <li>Its <em>observed event type</em> is a concrete class that
282 * extends {@link HasMetadata} and is the same type with which the
283 * associated event selector is primarily concerned.  For example, if
284 * your event selector is primarily concerned with {@link ConfigMap}s,
285 * then your observer method's observed event type should also be
286 * {@link ConfigMap}.</li>
287 *
288 * <li>Its observed event type is qualified with the same annotation
289 * that (a) qualifies the event selector and (b) is, in turn,
290 * qualified with {@link
291 * KubernetesEventSelector @KubernetesEventSelector}.  For example, if
292 * {@code @AllConfigMapEvents} appears on your event selector producer
293 * method, then it should appear on your observer method's {@link
294 * ConfigMap} parameter that is annotated with {@link
295 * Observes @Observes}.</li>
296 *
297 * <li>Its observed event type is qualified with one of {@link
298 * Added @Added}, {@link Modified @Modified} or {@link
299 * Deleted @Deleted}.</li>
300 *
301 * <li>If you need access to the prior state of the Kubernetes
302 * resource your observer method is observing, you may add it as a
303 * standard (injected) parameter in the method's parameter list, but
304 * it must be (a) qualified with the {@link Prior @Prior} annotation
305 * and (b) of a type identical to that of the observed event type.
306 * For example, if your observed event type is {@link ConfigMap}, then
307 * your prior state parameter must also be of type {@link ConfigMap}
308 * and must be annotated with {@link Prior @Prior}.</li>
309 *
310 * </ol>
311 *
312 * <p>Building upon the prior example, here is an example of an
313 * observer method that is "paired" with the event selector above:</p>
314 *
315 * <blockquote><pre>private final void onConfigMapModification({@link Observes &#64;Observes} &#64;AllConfigMapEvents {@link Modified &#64;Modified} final {@link ConfigMap} configMap, {@link Prior &#64;Prior} final Optional&lt;{@link ConfigMap}&gt; prior) {
316 *  assert configMap != null;
317 *  // do something interesting with this modified {@link ConfigMap}
318 *}</pre></blockquote>
319 *
320 * @author <a href="https://about.me/lairdnelson"
321 * target="_parent">Laird Nelson</a>
322 *
323 * @see AbstractBlockingExtension
324 *
325 * @see Controller
326 *
327 * @see KubernetesEventSelector
328 *
329 * @see Added
330 *
331 * @see Modified
332 *
333 * @see Deleted
334 *
335 * @see Prior
336 */
337public class KubernetesControllerExtension extends AbstractBlockingExtension {
338
339
340  /*
341   * Instance fields.
342   */
343
344  
345  private final Collection<Controller<?>> controllers;
346  
347  private final Map<Set<Annotation>, Bean<?>> eventSelectorBeans;
348
349  private final Set<Bean<?>> beans;
350
351  private final Set<Class<? extends HasMetadata>> priorTypes;
352
353  private boolean asyncNeeded;
354
355  private boolean syncNeeded;
356  
357  private final PriorContext priorContext;
358
359  private final KubernetesEventContext kubernetesEventContext;
360
361  
362  /*
363   * Constructors.
364   */
365
366  
367  /**
368   * Creates a new {@link KubernetesControllerExtension}.
369   *
370   * @see #KubernetesControllerExtension(CountDownLatch)
371   */
372  public KubernetesControllerExtension() {
373    this(new CountDownLatch(1));
374  }
375
376  /**
377   * Creates a new {@link KubernetesControllerExtension}.
378   *
379   * <p>Most users should prefer the {@linkplain
380   * #KubernetesControllerExtension() zero-argument constructor}
381   * instead.</p>
382   *
383   * @param latch a {@link CountDownLatch} passed to the {@link
384   * AbstractBlockingExtension#AbstractBlockingExtension(CountDownLatch)}
385   * constructor; must not be {@code null}
386   *
387   * @see #KubernetesControllerExtension()
388   *
389   * @see
390   * AbstractBlockingExtension#AbstractBlockingExtension(CountDownLatch)
391   */
392  protected KubernetesControllerExtension(final CountDownLatch latch) {
393    super(latch);
394    if (this.logger == null) {
395      throw new IllegalStateException("createLogger() == null");
396    }
397    final String cn = this.getClass().getName();
398    final String mn = "<init>";
399    if (this.logger.isLoggable(Level.FINER)) {
400      this.logger.entering(cn, mn, latch);
401    }
402    
403    this.eventSelectorBeans = new HashMap<>();
404    this.beans = new HashSet<>();
405    this.priorTypes = new HashSet<>();
406    this.controllers = new ArrayList<>();
407    this.priorContext = new PriorContext();
408    this.kubernetesEventContext = new KubernetesEventContext();
409
410    if (this.logger.isLoggable(Level.FINER)) {
411      this.logger.exiting(cn, mn);
412    }
413  }
414
415
416  /*
417   * Instance methods.
418   */
419  
420
421  /**
422   * {@linkplain Observes Observes} the supplied {@link
423   * ProcessProducerMethod} event and calls the {@link
424   * #processPotentialEventSelectorBean(Bean, BeanManager)} method
425   * with the return value of the event's {@link
426   * ProcessProducerMethod#getBean()} method and the supplied {@link
427   * BeanManager}.
428   *
429   * @param <X> a type that is both {@link Listable} and {@link
430   * VersionWatchable}
431   *
432   * @param event the container lifecycle event being observed; may be
433   * {@code null} in which case no action will be performed
434   *
435   * @param beanManager the {@link BeanManager} for the current CDI
436   * container; may be {@code null}
437   *
438   * @see #processPotentialEventSelectorBean(Bean, BeanManager)
439   */
440  // Ideally, we could do this all in a ProcessBean observer method.
441  // See https://issues.jboss.org/browse/WELD-2461.
442  @SuppressWarnings("rawtypes")
443  private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processProducerMethod(@Observes final ProcessProducerMethod<X, ?> event,
444                                                                                                                                                                          final BeanManager beanManager) {
445    final String cn = this.getClass().getName();
446    final String mn = "processProducerMethod";
447    if (this.logger.isLoggable(Level.FINER)) {
448      this.logger.entering(cn, mn, new Object[] { event, beanManager });
449    }
450    
451    if (event != null) {
452      this.processPotentialEventSelectorBean(event.getBean(), beanManager);
453    }
454    
455    if (this.logger.isLoggable(Level.FINER)) {
456      this.logger.exiting(cn, mn);
457    }
458  }
459
460  /**
461   * {@linkplain Observes Observes} the supplied {@link
462   * ProcessProducerField} event and calls the {@link
463   * #processPotentialEventSelectorBean(Bean, BeanManager)} method
464   * with the return value of the event's {@link
465   * ProcessProducerField#getBean()} method and the supplied {@link
466   * BeanManager}.
467   *
468   * @param <X> a type that is both {@link Listable} and {@link
469   * VersionWatchable}
470   *
471   * @param event the container lifecycle event being observed; may be
472   * {@code null} in which case no action will be performed
473   *
474   * @param beanManager the {@link BeanManager} for the current CDI
475   * container; may be {@code null}
476   *
477   * @see #processPotentialEventSelectorBean(Bean, BeanManager)
478   */
479  // Ideally, we could do this all in a ProcessBean observer method.
480  // See https://issues.jboss.org/browse/WELD-2461.
481  @SuppressWarnings("rawtypes")
482  private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processProducerField(@Observes final ProcessProducerField<X, ?> event,
483                                                                                                                                                                         final BeanManager beanManager) {
484    final String cn = this.getClass().getName();
485    final String mn = "processProducerField";
486    if (this.logger.isLoggable(Level.FINER)) {
487      this.logger.entering(cn, mn, new Object[] { event, beanManager });
488    }
489    
490    if (event != null) {
491      this.processPotentialEventSelectorBean(event.getBean(), beanManager);
492    }
493    
494    if (this.logger.isLoggable(Level.FINER)) {
495      this.logger.exiting(cn, mn);
496    }
497  }
498
499  /**
500   * {@linkplain Observes Observes} the supplied {@link
501   * ProcessManagedBean} event and calls the {@link
502   * #processPotentialEventSelectorBean(Bean, BeanManager)} method
503   * with the return value of the event's {@link
504   * ProcessManagedBean#getBean()} method and the supplied {@link
505   * BeanManager}.
506   *
507   * @param <X> a type that is both {@link Listable} and {@link
508   * VersionWatchable}
509   *
510   * @param event the container lifecycle event being observed; may be
511   * {@code null} in which case no action will be performed
512   *
513   * @param beanManager the {@link BeanManager} for the current CDI
514   * container; may be {@code null}
515   *
516   * @see #processPotentialEventSelectorBean(Bean, BeanManager)
517   */
518  // Ideally, we could do this all in a ProcessBean observer method.
519  // See https://issues.jboss.org/browse/WELD-2461.
520  @SuppressWarnings("rawtypes")
521  private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processManagedBean(@Observes final ProcessManagedBean<X> event,
522                                                                                                                                                                       final BeanManager beanManager) {
523    final String cn = this.getClass().getName();
524    final String mn = "processManagedBean";
525    if (this.logger.isLoggable(Level.FINER)) {
526      this.logger.entering(cn, mn, new Object[] { event, beanManager });
527    }
528    
529    if (event != null) {
530      this.processPotentialEventSelectorBean(event.getBean(), beanManager);
531    }
532    
533    if (this.logger.isLoggable(Level.FINER)) {
534      this.logger.exiting(cn, mn);
535    }
536  }
537
538  /**
539   * {@linkplain Observes Observes} the supplied {@link
540   * ProcessSyntheticBean} event and calls the {@link
541   * #processPotentialEventSelectorBean(Bean, BeanManager)} method
542   * with the return value of the event's {@link
543   * ProcessSyntheticBean#getBean()} method and the supplied {@link
544   * BeanManager}.
545   *
546   * @param <X> a type that is both {@link Listable} and {@link
547   * VersionWatchable}
548   *
549   * @param event the container lifecycle event being observed; may be
550   * {@code null} in which case no action will be performed
551   *
552   * @param beanManager the {@link BeanManager} for the current CDI
553   * container; may be {@code null}
554   *
555   * @see #processPotentialEventSelectorBean(Bean, BeanManager)
556   */
557  @SuppressWarnings("rawtypes")
558  private final <X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<? extends HasMetadata>>> void processSyntheticBean(@Observes final ProcessSyntheticBean<X> event,
559                                                                                                                                                                         final BeanManager beanManager) {
560    final String cn = this.getClass().getName();
561    final String mn = "processSyntheticBean";
562    if (this.logger.isLoggable(Level.FINER)) {
563      this.logger.entering(cn, mn, new Object[] { event, beanManager });
564    }
565    
566    if (event != null) {
567      this.processPotentialEventSelectorBean(event.getBean(), beanManager);
568    }
569    
570    if (this.logger.isLoggable(Level.FINER)) {
571      this.logger.exiting(cn, mn);
572    }
573  }
574
575  private final <T extends HasMetadata> void validateScopeOfCacheBean(@Observes final ProcessBean<Map<Object, T>> event) {
576    final String cn = this.getClass().getName();
577    final String mn = "validateScopeOfCacheBean";
578    if (this.logger.isLoggable(Level.FINER)) {
579      this.logger.entering(cn, mn, new Object[] { event });
580    }
581    if (event != null) {
582      final Bean<?> bean = event.getBean();
583      if (bean != null && !ApplicationScoped.class.equals(bean.getScope()) && this.logger.isLoggable(Level.WARNING)) {
584        this.logger.logp(Level.WARNING, cn, mn, "{0} is not in application scope.", bean);
585      }
586    }
587    if (this.logger.isLoggable(Level.FINER)) {
588      this.logger.exiting(cn, mn);
589    }
590  }
591
592  private final <T extends HasMetadata> void validateScopeOfCacheBean(@Observes final ProcessProducerField<Map<Object, T>, ?> event) {
593    final String cn = this.getClass().getName();
594    final String mn = "validateScopeOfCacheBean";
595    if (this.logger.isLoggable(Level.FINER)) {
596      this.logger.entering(cn, mn, new Object[] { event });
597    }
598    if (event != null) {
599      final Bean<?> bean = event.getBean();
600      if (bean != null && !ApplicationScoped.class.equals(bean.getScope()) && this.logger.isLoggable(Level.WARNING)) {
601        this.logger.logp(Level.WARNING, cn, mn, "{0} is not in application scope.", bean);
602      }
603    }
604    if (this.logger.isLoggable(Level.FINER)) {
605      this.logger.exiting(cn, mn);
606    }
607  }
608
609  private final <T extends HasMetadata> void validateScopeOfCacheBean(@Observes final ProcessProducerMethod<Map<Object, T>, ?> event) {
610    final String cn = this.getClass().getName();
611    final String mn = "validateScopeOfCacheBean";
612    if (this.logger.isLoggable(Level.FINER)) {
613      this.logger.entering(cn, mn, new Object[] { event });
614    }
615    if (event != null) {
616      final Bean<?> bean = event.getBean();
617      if (bean != null && !ApplicationScoped.class.equals(bean.getScope()) && this.logger.isLoggable(Level.WARNING)) {
618        this.logger.logp(Level.WARNING, cn, mn, "{0} is not in application scope.", bean);
619      }
620    }
621    if (this.logger.isLoggable(Level.FINER)) {
622      this.logger.exiting(cn, mn);
623    }
624  }
625  
626  /**
627   * {@linkplain Observes Observes} the supplied {@link
628   * ProcessObserverMethod} event and calls the {@link
629   * #processPotentialEventSelectorObserverMethod(ProcessObserverMethod,
630   * BeanManager)} method with the return value of the event's {@link
631   * ProcessObserverMethod#getObserverMethod()} method and the
632   * supplied {@link BeanManager}.
633   *
634   * @param <X> a type that extends {@link HasMetadata} and therefore
635   * represents a persistent Kubernetes resource
636   *
637   * @param event the container lifecycle event being observed; may be
638   * {@code null} in which case no action will be performed
639   *
640   * @param beanManager the {@link BeanManager} for the current CDI
641   * container; may be {@code null}
642   *
643   * @see
644   * #processPotentialEventSelectorObserverMethod(ProcessObserverMethod,
645   * BeanManager)
646   */
647  // Observer method processors are guaranteed by the specification to
648  // be invoked after ProcessBean events.
649  private final <X extends HasMetadata> void processObserverMethod(@Observes final ProcessObserverMethod<X, ?> event,
650                                                                   final BeanManager beanManager) {
651    final String cn = this.getClass().getName();
652    final String mn = "processObserverMethod";
653    if (this.logger.isLoggable(Level.FINER)) {
654      this.logger.entering(cn, mn, new Object[] { event, beanManager });
655    }
656    
657    if (event != null) {
658      this.processPotentialEventSelectorObserverMethod(event, beanManager);
659    }
660
661    if (this.logger.isLoggable(Level.FINER)) {
662      this.logger.exiting(cn, mn);
663    }
664  }
665
666  /**
667   * {@linkplain Observes Observes} the supplied {@link
668   * ProcessSyntheticObserverMethod} event and calls the {@link
669   * #processPotentialEventSelectorObserverMethod(ProcessObserverMethod,
670   * BeanManager)} method with the return value of the event's {@link
671   * ProcessSyntheticObserverMethod#getObserverMethod()} method and
672   * the supplied {@link BeanManager}.
673   *
674   * @param <X> a type that extends {@link HasMetadata} and therefore
675   * represents a persistent Kubernetes resource
676   *
677   * @param event the container lifecycle event being observed; may be
678   * {@code null} in which case no action will be performed
679   *
680   * @param beanManager the {@link BeanManager} for the current CDI
681   * container; may be {@code null}
682   *
683   * @see
684   * #processPotentialEventSelectorObserverMethod(ProcessObserverMethod,
685   * BeanManager)
686   */
687  // Observer method processors are guaranteed by the specification to
688  // be invoked after ProcessBean events.
689  private final <X extends HasMetadata> void processSyntheticObserverMethod(@Observes final ProcessSyntheticObserverMethod<X, ?> event,
690                                                                            final BeanManager beanManager) {
691    final String cn = this.getClass().getName();
692    final String mn = "processSyntheticObserverMethod";
693    if (this.logger.isLoggable(Level.FINER)) {
694      this.logger.entering(cn, mn, new Object[] { event, beanManager });
695    }
696    
697    if (event != null) {
698      this.processPotentialEventSelectorObserverMethod(event, beanManager);
699    }
700
701    if (this.logger.isLoggable(Level.FINER)) {
702      this.logger.exiting(cn, mn);
703    }
704  }
705
706  /**
707   * {@linkplain Observes Observes} the supplied {@link
708   * AfterBeanDiscovery} event and, since all bean discovery is done,
709   * clears out the contents of the {@link #eventSelectorBeans} field.
710   *
711   * @param event the container lifecycle event being observed; may be
712   * {@code null} in which case no action will be performed
713   *
714   * @see #eventSelectorBeans
715   */
716  private final void processAfterBeanDiscovery(@Observes final AfterBeanDiscovery event) {
717    final String cn = this.getClass().getName();
718    final String mn = "processAfterBeanDiscovery";
719    if (this.logger.isLoggable(Level.FINER)) {
720      this.logger.entering(cn, mn, event);
721    }
722
723    if (event != null) {
724      event.addContext(this.priorContext);
725      event.addContext(this.kubernetesEventContext);
726      
727      this.eventSelectorBeans.clear();
728      // TODO: consider: we have the ability to create Controller
729      // beans here out of other bean raw materials
730      // (e.g. appropriately-qualified knownObjects etc.).
731
732      synchronized (this.priorTypes) {
733        if (!this.priorTypes.isEmpty()) {
734          for (final Type priorType : this.priorTypes) {
735            assert priorType != null;
736
737            event.addBean()
738              // This Bean is never created via this (required by CDI)
739              // callback; it is always supplied by
740              // PriorContext#get(Bean), so the container will think
741              // that it is eternal.
742              .createWith(cc -> { throw new UnsupportedOperationException(); })
743              .qualifiers(Prior.Literal.INSTANCE)
744              .scope(PriorScoped.class)
745              .types(new ParameterizedTypeImpl(null, Optional.class, new Type[] { priorType }));
746            
747          }
748          this.priorTypes.clear();
749        }
750      }
751      
752    }
753
754    if (this.logger.isLoggable(Level.FINER)) {
755      this.logger.exiting(cn, mn);
756    }
757  }
758
759  /**
760   * {@linkplain Observes Observes} the {@linkplain Initialized
761   * initialization} of the {@linkplain ApplicationScoped application
762   * scope} at {@link
763   * javax.interceptor.Interceptor.Priority#LIBRARY_AFTER
764   * LIBRARY_AFTER} {@linkplain Priority priority} and, now that this
765   * extension is in a position to know all event observers that might
766   * be interested in Kubernetes events, arranges to funnel such
767   * events through a resilient pipeline into their hands
768   * asynchronously.
769   *
770   * @param <T> a type that extends {@link HasMetadata}; e.g. a
771   * Kubernetes resource type
772   *
773   * @param <X> a type that is both a {@link Listable} and a {@link
774   * VersionWatchable}
775   *
776   * @param ignored the event itself; ignored by this implementation;
777   * may be {@code null}
778   *
779   * @param beanManager the {@link BeanManager} in effect for this CDI
780   * container; may be {@code null} in which case no action will be
781   * taken
782   *
783   * @see #stopControllers(Object)
784   */
785  @SuppressWarnings("rawtypes")
786  private final <T extends HasMetadata,
787                 X extends Listable<? extends KubernetesResourceList> & VersionWatchable<? extends Closeable, Watcher<T>>>
788                void startControllers(@Observes
789                                      @Initialized(ApplicationScoped.class)
790                                      @Priority(LIBRARY_AFTER)
791                                      final Object ignored,
792                                      final BeanManager beanManager) { 
793    final String cn = this.getClass().getName();
794    final String mn = "startControllers";
795    if (this.logger.isLoggable(Level.FINER)) {
796      this.logger.entering(cn, mn, new Object[] { ignored, beanManager });
797    }
798
799    if (beanManager != null && !this.beans.isEmpty()) {
800
801      // Use the microbean-configuration-cdi library to abstract away
802      // configuration details.  But we can't just put a
803      // Configurations object in our incoming method parameters,
804      // because according to the specification that will result in
805      // non-portable behavior.  So we look it up "by hand".
806      final Bean<?> configurationsBean = beanManager.resolve(beanManager.getBeans(Configurations.class));
807      assert configurationsBean != null;
808      final Configurations configurations =
809        (Configurations)beanManager.getReference(configurationsBean,
810                                                 Configurations.class,
811                                                 beanManager.createCreationalContext(configurationsBean));
812      assert configurations != null;      
813
814      final Duration synchronizationInterval = configurations.getValue("synchronizationInterval", Duration.class);
815
816      for (final Bean<?> bean : this.beans) {
817        assert bean != null;
818        
819        final Set<Annotation> qualifiers = bean.getQualifiers();
820        final Annotation[] qualifiersArray;
821        if (qualifiers == null) {
822          qualifiersArray = null;
823        } else {
824          qualifiersArray = qualifiers.toArray(new Annotation[qualifiers.size()]);
825        }
826
827        @Issue(id = "6", uri = "https://github.com/microbean/microbean-kubernetes-controller-cdi/issues/6")
828        final Type cacheType = new ParameterizedTypeImpl(Map.class, new Type[] { Object.class, extractConcreteKubernetesResourceClass(bean) });
829
830        final Map<Object, T> cache;
831        final Set<Bean<?>> cacheBeans = beanManager.getBeans(cacheType, qualifiersArray);
832        if (cacheBeans == null || cacheBeans.isEmpty()) {
833          cache = null;
834        } else {
835          final Bean<?> cacheBean = beanManager.resolve(cacheBeans);
836          if (cacheBean == null) {
837            cache = null;
838          } else {
839            @SuppressWarnings("unchecked")
840            final Map<Object, T> temp =
841              (Map<Object, T>)beanManager.getReference(cacheBean,
842                                                       cacheType,
843                                                       beanManager.createCreationalContext(cacheBean));
844            cache = temp;
845          }
846        }
847        if (cache == null && this.logger.isLoggable(Level.INFO)) {
848          this.logger.logp(Level.INFO, cn, mn,
849                           "No Kubernetes resource cache found for qualifiers: {0}",
850                           qualifiers);
851        }
852        
853        final NotificationOptions notificationOptions;
854        final Bean<?> notificationOptionsBean =
855          beanManager.resolve(beanManager.getBeans(NotificationOptions.class, qualifiersArray));
856        if (notificationOptionsBean == null) {
857          notificationOptions = null;
858        } else {
859          notificationOptions =
860            (NotificationOptions)beanManager.getReference(notificationOptionsBean,
861                                                          NotificationOptions.class,
862                                                          beanManager.createCreationalContext(notificationOptionsBean));
863        }
864        
865        @SuppressWarnings("unchecked")
866        final X contextualReference =
867          (X)beanManager.getReference(bean,
868                                      getListableVersionWatchableType(bean),
869                                      beanManager.createCreationalContext(bean));
870
871        final Controller<T> controller =
872          new CDIController<>(contextualReference,
873                              synchronizationInterval,
874                              cache,
875                              new CDIEventDistributor<>(this.priorContext,
876                                                        this.kubernetesEventContext,
877                                                        qualifiers,
878                                                        notificationOptions,
879                                                        this.syncNeeded,
880                                                        this.asyncNeeded),
881                              t -> {
882                                if (this.logger.isLoggable(Level.SEVERE)) {
883                                  this.logger.logp(Level.SEVERE, cn, mn, t.getMessage(), t);
884                                }
885                                return true;
886                              });
887
888        if (this.logger.isLoggable(Level.INFO)) {
889          this.logger.logp(Level.INFO, cn, mn, "Starting {0}", controller);
890        }
891        try {
892          controller.start();
893        } catch (final IOException ioException) {
894          throw new DeploymentException(ioException.getMessage(), ioException);
895        }
896        
897        synchronized (this.controllers) {
898          this.controllers.add(controller);
899        }
900      }
901      
902    }
903    
904    if (this.logger.isLoggable(Level.FINER)) {
905      this.logger.exiting(cn, mn);
906    }
907  }
908
909  /**
910   * {@linkplain Observes Observes} the {@linkplain BeforeDestroyed
911   * imminent destruction} of the {@linkplain ApplicationScoped
912   * application scope} at {@link
913   * javax.interceptor.Interceptor.Priority#LIBRARY_BEFORE
914   * LIBRARY_BEFORE} {@linkplain Priority priority} and stops any
915   * {@linkplain #controllers <tt>Controller</tt> instances that were
916   * started} by {@linkplain Controller#close() closing} them.
917   *
918   * @param event the actual {@link BeforeDestroyed} event; ignored;
919   * may be {@code null}
920   *
921   * @exception IOException if {@link Controller#close()} throws an
922   * {@link IOException}
923   *
924   * @see #startControllers(Object, BeanManager)
925   */
926  private final void stopControllers(@Observes
927                                     @BeforeDestroyed(ApplicationScoped.class)
928                                     @Priority(LIBRARY_BEFORE)
929                                     final Object event)
930    throws IOException {
931    final String cn = this.getClass().getName();
932    final String mn = "stopControllers";
933    if (this.logger.isLoggable(Level.FINER)) {
934      this.logger.entering(cn, mn, event);
935    }
936
937    Exception exception = null;
938    synchronized (this.controllers) {
939      for (final Controller<?> controller : this.controllers) {
940        assert controller != null;
941        try {
942          controller.close();
943        } catch (final IOException | RuntimeException closeException) {
944          if (exception == null) {
945            exception = closeException;
946          } else {
947            exception.addSuppressed(closeException);
948          }
949        }
950      }
951    }
952
953    if (exception instanceof IOException) {
954      throw (IOException)exception;
955    } else if (exception instanceof RuntimeException) {
956      throw (RuntimeException)exception;
957    } else if (exception != null) {
958      throw new IllegalStateException(exception.getMessage(), exception);
959    }
960    
961    if (this.logger.isLoggable(Level.FINER)) {
962      this.logger.exiting(cn, mn);
963    }
964  }
965
966
967  /*
968   * Non-observer methods.
969   */
970
971
972  /**
973   * Given a {@link Bean}, checks to see if it is annotated with at
974   * least one annotation that is, in turn, annotated with {@link
975   * KubernetesEventSelector}, and, if so, adds it to a list of
976   * candidate sources of objects that are both {@link Listable} and
977   * {@link VersionWatchable}.
978   *
979   * @param bean the {@link Bean} to inspect; may be {@code null} in
980   * which case no action will be taken
981   *
982   * @param beanManager the {@link BeanManager} in effect for the
983   * current CDI container; may be {@code null}
984   *
985   * @see Annotations#retainAnnotationsQualifiedWith(Collection,
986   * Class, BeanManager)
987   *
988   * @see KubernetesEventSelector
989   */
990  private final void processPotentialEventSelectorBean(final Bean<?> bean, final BeanManager beanManager) {
991    final String cn = this.getClass().getName();
992    final String mn = "processPotentialEventSelectorBean";
993    if (this.logger.isLoggable(Level.FINER)) {
994      this.logger.entering(cn, mn, new Object[] { bean, beanManager });
995    }
996
997    if (bean != null) {
998      final Type listableVersionWatchableType = getListableVersionWatchableType(bean);
999      if (listableVersionWatchableType != null) {
1000        final Set<Annotation> kubernetesEventSelectors = Annotations.retainAnnotationsQualifiedWith(bean.getQualifiers(), KubernetesEventSelector.class, beanManager);
1001        if (kubernetesEventSelectors != null && !kubernetesEventSelectors.isEmpty()) {
1002          synchronized (this.eventSelectorBeans) {
1003            this.eventSelectorBeans.put(kubernetesEventSelectors, bean);
1004          }
1005        }
1006      }
1007    }
1008
1009    if (this.logger.isLoggable(Level.FINER)) {
1010      this.logger.exiting(cn, mn);
1011    }
1012  }
1013
1014  /**
1015   * Given an {@link ObserverMethod}, checks to see if its event
1016   * parameter is annotated with at least one annotation that is, in
1017   * turn, annotated with {@link KubernetesEventSelector}, and, if so,
1018   * makes sure that any {@link Bean}s whose {@linkplain
1019   * Bean#getQualifiers() qualifiers} line up are retained by this
1020   * extension as sources of {@link Listable} and {@link
1021   * VersionWatchable} instances.
1022   *
1023   * @param <X> a type that extends {@link HasMetadata} and therefore
1024   * represents a persistent Kubernetes resource
1025   *
1026   * @param event the {@link ProcessObserverMethod} event to inspect;
1027   * may be {@code null} in which case no action will be taken
1028   *
1029   * @param beanManager the {@link BeanManager} in effect for the
1030   * current CDI container; may be {@code null}
1031   *
1032   * @see Annotations#retainAnnotationsQualifiedWith(Collection,
1033   * Class, BeanManager)
1034   *
1035   * @see KubernetesEventSelector
1036   */
1037  private final <X extends HasMetadata> void processPotentialEventSelectorObserverMethod(final ProcessObserverMethod<X, ?> event, final BeanManager beanManager) {
1038    final String cn = this.getClass().getName();
1039    final String mn = "processPotentialEventSelectorObserverMethod";
1040    if (this.logger.isLoggable(Level.FINER)) {
1041      this.logger.entering(cn, mn, new Object[] { event, beanManager });
1042    }
1043
1044    if (event != null) {
1045      final ObserverMethod<X> observerMethod = event.getObserverMethod();
1046      if (observerMethod != null) {
1047        final Set<Annotation> kubernetesEventSelectors = Annotations.retainAnnotationsQualifiedWith(observerMethod.getObservedQualifiers(), KubernetesEventSelector.class, beanManager);
1048        if (kubernetesEventSelectors != null && !kubernetesEventSelectors.isEmpty()) {
1049          event.configureObserverMethod()
1050            .notifyWith(new Notifier<>(this.priorContext, this.kubernetesEventContext, observerMethod));
1051          if (observerMethod.isAsync()) {
1052            if (!this.asyncNeeded) {
1053              this.asyncNeeded = true;
1054            }
1055          } else if (!this.syncNeeded) {
1056            this.syncNeeded = true;
1057          }
1058          final Bean<?> bean;
1059          synchronized (this.eventSelectorBeans) {
1060            bean = this.eventSelectorBeans.remove(kubernetesEventSelectors);
1061          }
1062          if (bean != null) {
1063            boolean added;
1064            synchronized (this.beans) {            
1065              added = this.beans.add(bean);
1066            }
1067            if (added) {
1068              final Class<? extends HasMetadata> concreteKubernetesResourceClass = extractConcreteKubernetesResourceClass(bean);
1069              assert concreteKubernetesResourceClass != null;
1070              synchronized (this.priorTypes) {
1071                this.priorTypes.add(concreteKubernetesResourceClass);
1072              }
1073            }
1074          }
1075        }
1076      }
1077    }
1078
1079    if (this.logger.isLoggable(Level.FINER)) {
1080      this.logger.exiting(cn, mn);
1081    }
1082  }
1083  
1084
1085  /*
1086   * Static methods.
1087   */
1088
1089  
1090  /**
1091   * A bit of a hack to return the {@link Type} that is the "right
1092   * kind" of {@link Listable} and {@link VersionWatchable}
1093   * implementation from the supplied {@link Bean}'s {@linkplain
1094   * Bean#getTypes() types}, if it is present among them, and to
1095   * return {@code null} otherwise.
1096   *
1097   * <p>This method may return {@code null}.</p>
1098   *
1099   * <p>{@link Operation Operation.class} is the most general
1100   * interface that implements both {@link Listable} and {@link
1101   * VersionWatchable}, which is a common constraint for {@link
1102   * Controller} operations, and is often what this method returns in
1103   * {@link ParameterizedType} form.
1104   *
1105   * @param bean the {@link Bean} to inspect; may be {@code null} in
1106   * which case {@code null} will be returned
1107   *
1108   * @return a {@link Type} that is both {@link Listable} and {@link
1109   * VersionWatchable}, or {@code null}
1110   *
1111   * @see Operation
1112   *
1113   * @see Listable
1114   *
1115   * @see VersionWatchable
1116   */
1117  private static final Type getListableVersionWatchableType(final Bean<?> bean) {
1118    final String cn = KubernetesControllerExtension.class.getName();
1119    final Logger logger = Logger.getLogger(cn);
1120    assert logger != null;
1121    final String mn = "getListableVersionWatchableType";
1122    if (logger.isLoggable(Level.FINER)) {
1123      logger.entering(cn, mn, bean);
1124    }
1125    
1126    final Type returnValue;
1127    if (bean == null) {
1128      returnValue = null;
1129    } else {
1130      returnValue = getListableVersionWatchableType(bean.getTypes());
1131    }
1132    
1133    if (logger.isLoggable(Level.FINER)) {
1134      logger.exiting(cn, mn, returnValue);
1135    }
1136    return returnValue;
1137  }
1138
1139  private static final Type getListableVersionWatchableType(final Collection<? extends Type> beanTypes) {
1140    final String cn = KubernetesControllerExtension.class.getName();
1141    final Logger logger = Logger.getLogger(cn);
1142    assert logger != null;
1143    final String mn = "getListableVersionWatchableType";
1144    if (logger.isLoggable(Level.FINER)) {
1145      logger.entering(cn, mn, beanTypes);
1146    }
1147
1148    final Type returnValue;
1149    if (beanTypes == null || beanTypes.isEmpty()) {
1150      returnValue = null;
1151    } else {
1152      Type candidate = null;
1153      for (final Type beanType : beanTypes) {
1154        if (beanType instanceof ParameterizedType) {
1155          candidate = getListableVersionWatchableType((ParameterizedType)beanType);
1156          if (candidate != null) {
1157            break;
1158          }
1159        }
1160      }
1161      returnValue = candidate;
1162    }
1163
1164    if (logger.isLoggable(Level.FINER)) {
1165      logger.exiting(cn, mn, returnValue);
1166    }
1167    return returnValue;
1168  }
1169
1170  private static final Type getListableVersionWatchableType(final ParameterizedType type) {
1171    final String cn = KubernetesControllerExtension.class.getName();
1172    final Logger logger = Logger.getLogger(cn);
1173    assert logger != null;
1174    final String mn = "getListableVersionWatchableType";
1175    if (logger.isLoggable(Level.FINER)) {
1176      logger.entering(cn, mn, type);
1177    }
1178
1179    Type candidate = null;
1180    final Type rawType = type.getRawType();
1181    if (rawType instanceof Class) {
1182      // This should always be the case; see e.g. https://stackoverflow.com/a/5767681/208288
1183      final Class<?> rawClass = (Class<?>)rawType;
1184      if (Listable.class.isAssignableFrom(rawClass) && VersionWatchable.class.isAssignableFrom(rawClass)) {
1185        // TODO: check type parameters
1186        candidate = type;
1187      }
1188    }
1189    
1190    final Type returnValue = candidate;
1191
1192    if (logger.isLoggable(Level.FINER)) {
1193      logger.exiting(cn, mn, returnValue);
1194    }
1195    return returnValue;
1196  }
1197
1198  private static final Class<? extends HasMetadata> extractConcreteKubernetesResourceClass(final BeanAttributes<?> beanAttributes) {
1199    Class<? extends HasMetadata> returnValue = null;
1200    if (beanAttributes != null) {
1201      returnValue = extractConcreteKubernetesResourceClass(beanAttributes.getTypes());
1202    }
1203    return returnValue;
1204  }
1205
1206  private static final Class<? extends HasMetadata> extractConcreteKubernetesResourceClass(final Set<? extends Type> types) {
1207    Class<? extends HasMetadata> returnValue = null;
1208    if (types != null && !types.isEmpty()) {
1209      final Set<Type> typesToProcess = new LinkedHashSet<>(types);
1210      while (!typesToProcess.isEmpty()) {
1211        final Iterator<Type> iterator = typesToProcess.iterator();
1212        assert iterator != null;
1213        assert iterator.hasNext();
1214        final Type type = iterator.next();
1215        iterator.remove();
1216        if (type != null) {
1217          if (type instanceof Class<?>) {
1218            final Class<?> concreteClass = (Class<?>)type;
1219            if (HasMetadata.class.isAssignableFrom(concreteClass)) {
1220              @SuppressWarnings("unchecked")
1221              final Class<? extends HasMetadata> temp = (Class<? extends HasMetadata>)concreteClass;
1222              returnValue = temp;
1223              break;
1224            }
1225          } else if (type instanceof ParameterizedType) {
1226            final ParameterizedType pType = (ParameterizedType)type;
1227            final Type[] actualTypeArguments = pType.getActualTypeArguments();
1228            if (actualTypeArguments != null && actualTypeArguments.length > 0) {
1229              for (final Type actualTypeArgument : actualTypeArguments) {
1230                if (actualTypeArgument != null) {
1231                  typesToProcess.add(actualTypeArgument);
1232                }
1233              }
1234            }
1235          }
1236        }
1237      }
1238    }
1239    return returnValue;
1240  }
1241
1242
1243  /*
1244   * Inner and nested classes.
1245   */
1246
1247  
1248  private static final class ParameterizedTypeImpl implements ParameterizedType {
1249
1250    private final Type ownerType;
1251
1252    private final Type rawType;
1253
1254    private final Type[] actualTypeArguments;
1255
1256    private final int hashCode;
1257
1258    private ParameterizedTypeImpl(final Class<?> rawType, final Type[] actualTypeArguments) {
1259      this(null, rawType, actualTypeArguments);
1260    }
1261    
1262    private ParameterizedTypeImpl(final Type ownerType, final Class<?> rawType, final Type[] actualTypeArguments) {
1263      super();
1264      this.ownerType = ownerType;
1265      this.rawType = Objects.requireNonNull(rawType);
1266      this.actualTypeArguments = actualTypeArguments;
1267      this.hashCode = this.computeHashCode();
1268    }
1269    
1270    @Override
1271    public final Type getOwnerType() {
1272      return this.ownerType;
1273    }
1274    
1275    @Override
1276    public final Type getRawType() {
1277      return this.rawType;
1278    }
1279    
1280    @Override
1281    public final Type[] getActualTypeArguments() {
1282      return this.actualTypeArguments;
1283    }
1284    
1285    @Override
1286    public final int hashCode() {
1287      return this.hashCode;
1288    }
1289
1290    private final int computeHashCode() {
1291      int hashCode = 17;
1292      
1293      final Object ownerType = this.getOwnerType();
1294      int c = ownerType == null ? 0 : ownerType.hashCode();
1295      hashCode = 37 * hashCode + c;
1296      
1297      final Object rawType = this.getRawType();
1298      c = rawType == null ? 0 : rawType.hashCode();
1299      hashCode = 37 * hashCode + c;
1300      
1301      final Type[] actualTypeArguments = this.getActualTypeArguments();
1302      c = Arrays.hashCode(actualTypeArguments);
1303      hashCode = 37 * hashCode + c;
1304      
1305      return hashCode;
1306    }
1307    
1308    @Override
1309    public final boolean equals(final Object other) {
1310      if (other == this) {
1311        return true;
1312      } else if (other instanceof ParameterizedType) {
1313        final ParameterizedType her = (ParameterizedType)other;
1314        
1315        final Object ownerType = this.getOwnerType();
1316        if (ownerType == null) {
1317          if (her.getOwnerType() != null) {
1318            return false;
1319          }
1320        } else if (!ownerType.equals(her.getOwnerType())) {
1321          return false;
1322        }
1323        
1324        final Object rawType = this.getRawType();
1325        if (rawType == null) {
1326          if (her.getRawType() != null) {
1327            return false;
1328          }
1329        } else if (!rawType.equals(her.getRawType())) {
1330          return false;
1331        }
1332        
1333        final Type[] actualTypeArguments = this.getActualTypeArguments();
1334        if (!Arrays.equals(actualTypeArguments, her.getActualTypeArguments())) {
1335          return false;
1336        }
1337        
1338        return true;
1339      } else {
1340        return false;
1341      }
1342    }
1343
1344  }
1345
1346  private static final class CDIController<T extends HasMetadata> extends Controller<T> {
1347
1348    private final EventDistributor<T> eventDistributor;
1349
1350    private final boolean close;
1351
1352    // This @SuppressWarnings("rawtypes") is here because the
1353    // kubernetes-model project uses raw types throughout.  This class
1354    // does not.
1355    @SuppressWarnings("rawtypes")
1356    private
1357    <X extends Listable<? extends KubernetesResourceList>
1358               & VersionWatchable<? extends Closeable, Watcher<T>>>
1359    CDIController(final X operation,
1360                  final Duration synchronizationInterval,
1361                  final Map<Object, T> knownObjects,
1362                  final CDIEventDistributor<T> eventDistributor,
1363                  final Function<? super Throwable, Boolean> errorHandler) {
1364      this(operation, synchronizationInterval, errorHandler, knownObjects, new EventDistributor<>(knownObjects, synchronizationInterval), true);
1365      assert this.eventDistributor != null;
1366      if (eventDistributor != null) {
1367        this.eventDistributor.addConsumer(eventDistributor, errorHandler);
1368      }      
1369    }
1370
1371    // This @SuppressWarnings("rawtypes") is here because the
1372    // kubernetes-model project uses raw types throughout.  This class
1373    // does not.
1374    @SuppressWarnings("rawtypes")
1375    private
1376    <X extends Listable<? extends KubernetesResourceList>
1377               & VersionWatchable<? extends Closeable, Watcher<T>>>
1378    CDIController(final X operation,
1379                  final Duration synchronizationInterval,
1380                  final Function<? super Throwable, Boolean> errorHandler,
1381                  final Map<Object, T> knownObjects,
1382                  final EventDistributor<T> siphon,
1383                  final boolean close) {
1384      super(operation, null, synchronizationInterval, errorHandler, knownObjects, siphon);
1385      this.eventDistributor = Objects.requireNonNull(siphon);
1386      this.close = close;
1387    }
1388
1389    @Override
1390    protected final boolean shouldSynchronize() {
1391      return this.eventDistributor.shouldSynchronize();
1392    }
1393
1394    @Override
1395    protected final void onClose() {
1396      if (this.close) {
1397        this.eventDistributor.close();
1398      }
1399    }
1400    
1401  }
1402  
1403  private static final class CDIEventDistributor<T extends HasMetadata> implements Consumer<AbstractEvent<? extends T>> {
1404
1405    private static final Annotation[] EMPTY_ANNOTATION_ARRAY = new Annotation[0];
1406
1407    private final PriorContext priorContext;
1408
1409    private final KubernetesEventContext kubernetesEventContext;
1410    
1411    private final Annotation[] qualifiers;
1412
1413    private final NotificationOptions notificationOptions;
1414
1415    private final boolean syncNeeded;
1416
1417    private final boolean asyncNeeded;
1418
1419    private final Logger logger;
1420    
1421    private CDIEventDistributor(final PriorContext priorContext,
1422                                final KubernetesEventContext kubernetesEventContext,
1423                                final Set<Annotation> qualifiers,
1424                                final NotificationOptions notificationOptions,
1425                                final boolean syncNeeded,
1426                                final boolean asyncNeeded) {
1427      super();
1428      final String cn = this.getClass().getName();      
1429      this.logger = Logger.getLogger(cn);
1430      assert this.logger != null;
1431      final String mn = "<init>";
1432      if (this.logger.isLoggable(Level.FINER)) {
1433        this.logger.entering(cn, mn,
1434                             new Object[] { priorContext,
1435                                            kubernetesEventContext,
1436                                            qualifiers,
1437                                            notificationOptions,
1438                                            Boolean.valueOf(syncNeeded),
1439                                            Boolean.valueOf(asyncNeeded)
1440                             });
1441      }
1442
1443      this.priorContext = Objects.requireNonNull(priorContext);
1444      this.kubernetesEventContext = Objects.requireNonNull(kubernetesEventContext);
1445      if (qualifiers == null) {
1446        this.qualifiers = EMPTY_ANNOTATION_ARRAY;
1447      } else {
1448        this.qualifiers = qualifiers.toArray(new Annotation[qualifiers.size()]);
1449      }
1450      this.notificationOptions = notificationOptions;
1451      this.syncNeeded = syncNeeded;
1452      this.asyncNeeded = asyncNeeded;
1453
1454      if (this.logger.isLoggable(Level.FINER)) {
1455        this.logger.exiting(cn, mn);
1456      }
1457    }
1458
1459    @Override
1460    public final void accept(final AbstractEvent<? extends T> controllerEvent) {
1461      final String cn = this.getClass().getName();
1462      final String mn = "accept";
1463      if (this.logger.isLoggable(Level.FINER)) {
1464        this.logger.entering(cn, mn, controllerEvent);
1465      }
1466
1467      if (controllerEvent != null && (this.syncNeeded || this.asyncNeeded)) {
1468
1469        final BeanManager beanManager = CDI.current().getBeanManager();
1470        assert beanManager != null;
1471
1472        final javax.enterprise.event.Event<Object> cdiEventMachinery = beanManager.getEvent();
1473        assert cdiEventMachinery != null;
1474
1475        // Copy the qualifiers we were supplied with into an array big
1476        // enough to hold one more qualifier.  That qualifier will be
1477        // based on the event type, which of course we didn't know at
1478        // construction time.
1479        final Annotation[] qualifiers = Arrays.copyOf(this.qualifiers, this.qualifiers.length + 1);
1480        assert qualifiers != null;
1481
1482        final AbstractEvent.Type eventType = controllerEvent.getType();
1483        assert eventType != null;
1484
1485        switch (eventType) {
1486          
1487        case ADDITION:
1488          if (controllerEvent instanceof SynchronizationEvent) {
1489            qualifiers[qualifiers.length - 1] = Added.Literal.withSynchronization();
1490          } else {
1491            qualifiers[qualifiers.length - 1] = Added.Literal.withoutSynchronization();
1492          }
1493          break;
1494          
1495        case MODIFICATION:
1496          if (controllerEvent instanceof SynchronizationEvent) {
1497            qualifiers[qualifiers.length - 1] = Modified.Literal.withSynchronization();
1498          } else {
1499            qualifiers[qualifiers.length - 1] = Modified.Literal.withoutSynchronization();
1500          }
1501          break;
1502          
1503        case DELETION:
1504          assert !(controllerEvent instanceof SynchronizationEvent);
1505          qualifiers[qualifiers.length - 1] = Deleted.Literal.INSTANCE;
1506          break;
1507          
1508        default:
1509          throw new IllegalStateException();
1510          
1511        }
1512
1513        // This resource will be the actual "event" we end up firing.
1514        final T resource = controllerEvent.getResource();
1515        assert resource != null;
1516
1517        // The "prior resource" represents the prior state (if any)
1518        // and can be null.  We'll arrange for this to be "created" by
1519        // our PriorContext CDI Context when observer methods contain
1520        // a parameter qualified with @Prior.
1521        this.priorContext.put(resource, Optional.ofNullable(controllerEvent.getPriorResource()));
1522
1523        @SuppressWarnings("unchecked")
1524        final javax.enterprise.event.Event<T> broadcaster = cdiEventMachinery.select((Class<T>)resource.getClass(), qualifiers);
1525
1526        if (this.asyncNeeded) {
1527
1528          // Set up the machinery to fire the event asynchronously,
1529          // possibly in parallel.
1530          
1531          final CompletionStage<T> stage;
1532          if (this.notificationOptions == null) {
1533            stage = broadcaster.fireAsync(resource);
1534          } else {
1535            stage = broadcaster.fireAsync(resource, this.notificationOptions);
1536          }
1537          assert stage != null;
1538
1539          // When all asynchronous observers have been notified, then
1540          // fire synchronous events (if needed).  Ensure that the
1541          // PriorContext that is responsible for supplying injected
1542          // observer method parameters annotated with @Prior is
1543          // deactivated in all cases.
1544
1545          // TODO: should we make it configurable whether to fire
1546          // synchronous events before asynchronous events or the
1547          // other way around?
1548          
1549          stage.whenComplete((event, throwable) -> {
1550              if (throwable != null && this.logger.isLoggable(Level.SEVERE)) {
1551                logger.logp(Level.SEVERE, cn, mn, throwable.getMessage(), throwable);
1552              }
1553              // TODO: should the presence of a non-null throwable
1554              // cause us to not perform synchronous firing?
1555              try {
1556                assert event != null;
1557                if (this.syncNeeded) {
1558                  broadcaster.fire(event);
1559                }
1560              } finally {
1561                this.kubernetesEventContext.destroy();
1562                this.priorContext.remove(event);
1563              }
1564            });
1565          
1566        } else {
1567          assert this.syncNeeded;
1568
1569          try {
1570            broadcaster.fire(resource);
1571          } finally {
1572            this.kubernetesEventContext.destroy();
1573            this.priorContext.remove(resource);
1574          }
1575        }
1576        
1577      }
1578
1579      if (this.logger.isLoggable(Level.FINER)) {
1580        this.logger.exiting(cn, mn);
1581      }
1582    }
1583    
1584  }
1585
1586  private static final class PriorContext implements AlterableContext {
1587
1588    private static final InheritableThreadLocal<CurrentEventContext> currentEventContext = new InheritableThreadLocal<CurrentEventContext>() {
1589        @Override
1590        protected final CurrentEventContext initialValue() {
1591          return new CurrentEventContext();
1592        }
1593      };
1594    
1595    /**
1596     * A {@linkplain Collections#synchronizedMap(Map) synchronized}
1597     * {@link IdentityHashMap} that maps a "current" {@link
1598     * HasMetadata} to its prior representation.
1599     *
1600     * @see #put(HasMetadata, Optional)
1601     */
1602    private final Map<HasMetadata, Optional<? extends HasMetadata>> instances;
1603    
1604    private PriorContext() {
1605      super();
1606      // This needs to be an IdentityHashMap under the covers because
1607      // it turns out that all kubernetes-model classes use Lombok's
1608      // indiscriminate equals()-and-hashCode() generation.  We need
1609      // to track Kubernetes resources in this Context implementation
1610      // by their actual JVM identity.
1611      this.instances = Collections.synchronizedMap(new IdentityHashMap<>());
1612    }
1613
1614    /**
1615     * Activates this {@link PriorContext} <strong>for the {@linkplain
1616     * Thread#currentThread() current <code>Thread</code>}</strong>.
1617     *
1618     * @param currentEvent the {@link HasMetadata} that is currently
1619     * being fired as a CDI event; must not be {@code null}
1620     *
1621     * @exception NullPointerException if {@code currentEvent} is
1622     * {@code null}
1623     */
1624    private final void activate(final HasMetadata currentEvent) {
1625      Objects.requireNonNull(currentEvent);
1626      final CurrentEventContext c = currentEventContext.get();
1627      assert c != null;
1628      c.currentEvent = currentEvent;
1629      c.active = true;
1630    }
1631
1632    /**
1633     * Deactivates this {@link PriorContext} <strong>for the {@linkplain
1634     * Thread#currentThread() current <code>Thread</code>}</strong>.
1635     */
1636    private final void deactivate() {
1637      final CurrentEventContext c = currentEventContext.get();
1638      assert c != null;
1639      c.active = false;
1640      c.currentEvent = null;
1641      // Note: do NOT be tempted to call this.remove() here.
1642    }
1643
1644    /**
1645     * Associates the supplied {@code priorEvent} with the supplied
1646     * {@code currentEvent} and returns any previously associated
1647     * event.
1648     *
1649     * <p>This method <strong>may return {@code null}</strong>.</p>
1650     *
1651     * @param <X> a type that extends {@link HasMetadata} and therefore
1652     * represents a persistent Kubernetes resource
1653     *
1654     * @param currentEvent a Kubernetes resource about to be fired as
1655     * a CDI event; must not be {@code null}
1656     *
1657     * @param priorEvent an {@link Optional} Kubernetes resource that
1658     * represents the last known state of the {@code currentEvent}
1659     * Kubernetes resource; must not be {@code null}
1660     *
1661     * @return any previously associated Kubernetes resource as an
1662     * {@link Optional}, <strong>or, somewhat unusually, {@code null}
1663     * if there was no such {@link Optional}</strong>
1664     *
1665     * @exception NullPointerException if {@code currentEvent} or
1666     * {@code priorEvent} is {@code null}
1667     */
1668    private final <X extends HasMetadata> Optional<X> put(final X currentEvent, final Optional<X> priorEvent) {
1669      @SuppressWarnings("unchecked")
1670      final Optional<X> returnValue = (Optional<X>)this.instances.put(Objects.requireNonNull(currentEvent),
1671                                                                      Objects.requireNonNull(priorEvent));
1672      return returnValue;
1673    }
1674
1675    /**
1676     * Removes the supplied {@link HasMetadata} from this {@link
1677     * PriorContext}'s registry of such objects and returns any {@link
1678     * Optional} indexed under it.
1679     *
1680     * <p>This method <strong>may return {@code null}</strong>.</p>
1681     *
1682     * @param currentEvent the {@link HasMetadata} to remove; must not
1683     * be {@code null}
1684     *
1685     * @return an {@link Optional} representing the prior state
1686     * indexed under the supplied {@link HasMetadata}, <strong>or
1687     * {@code null}</strong>
1688     *
1689     * @exception NullPointerException if {@code currentEvent} is
1690     * {@code null}
1691     */
1692    private final Optional<? extends HasMetadata> remove(final HasMetadata currentEvent) {
1693      return this.instances.remove(Objects.requireNonNull(currentEvent));
1694    }
1695
1696    private final Optional<? extends HasMetadata> get() {
1697      if (!this.isActive()) {
1698        throw new ContextNotActiveException();
1699      }
1700      final CurrentEventContext c = currentEventContext.get();
1701      assert c != null;
1702      assert c.active;
1703      assert c.currentEvent != null;
1704      // Yes, this can return null, and yes, our return type is
1705      // Optional.  Do NOT be tempted to return an empty Optional
1706      // here!
1707      return this.instances.get(c.currentEvent);
1708    }
1709    
1710    @Override
1711    public final <T> T get(final Contextual<T> bean) {
1712      @SuppressWarnings("unchecked")
1713      final T returnValue = (T)this.get();
1714      return returnValue;
1715    }
1716
1717    @Override
1718    public final <T> T get(final Contextual<T> bean, final CreationalContext<T> cc) {
1719      @SuppressWarnings("unchecked")
1720      final T returnValue = (T)this.get();
1721      return returnValue;
1722    }
1723
1724    @Override
1725    public final void destroy(final Contextual<?> bean) {
1726      if (!this.isActive()) {
1727        throw new ContextNotActiveException();
1728      }
1729      final CurrentEventContext c = currentEventContext.get();
1730      assert c != null;
1731      assert c.active;
1732      assert c.currentEvent != null;
1733      this.remove(c.currentEvent);
1734    }
1735
1736    @Override
1737    public final Class<? extends Annotation> getScope() {
1738      return PriorScoped.class;
1739    }
1740
1741    @Override
1742    public final boolean isActive() {
1743      final CurrentEventContext c = currentEventContext.get();
1744      assert c != null;
1745      return c.active && c.currentEvent != null && this.instances.containsKey(c.currentEvent);
1746    }
1747
1748    private static final class CurrentEventContext {
1749
1750      private volatile HasMetadata currentEvent;
1751
1752      private volatile boolean active;
1753
1754      private CurrentEventContext() {
1755        super();
1756      }
1757      
1758    }
1759    
1760  }
1761
1762  private static final class Notifier<T extends HasMetadata> implements EventConsumer<T> {
1763
1764    private final PriorContext priorContext;
1765
1766    private final KubernetesEventContext kubernetesEventContext;
1767    
1768    private final ObserverMethod<T> observerMethod;
1769    
1770    private Notifier(final PriorContext priorContext,
1771                     final KubernetesEventContext kubernetesEventContext,
1772                     final ObserverMethod<T> observerMethod) {
1773      super();
1774      this.priorContext = Objects.requireNonNull(priorContext);
1775      this.kubernetesEventContext = Objects.requireNonNull(kubernetesEventContext);
1776      this.observerMethod = Objects.requireNonNull(observerMethod);
1777    }
1778
1779    @Override
1780    public final void accept(final EventContext<T> eventContext) {
1781      try {
1782        this.kubernetesEventContext.setActive(true);
1783        this.priorContext.activate(Objects.requireNonNull(eventContext).getEvent()); // thread-specific
1784        this.observerMethod.notify(eventContext);
1785      } finally {
1786        this.priorContext.deactivate(); // thread-specific
1787        this.kubernetesEventContext.setActive(false);
1788      }
1789    }
1790    
1791  }
1792
1793  @Retention(value = RetentionPolicy.RUNTIME)
1794  @Scope // deliberately NOT NormalScope
1795  @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD })
1796  private static @interface PriorScoped {
1797
1798  }
1799
1800}