001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
002 *
003 * Copyright © 2017-2018 microBean.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
014 * implied.  See the License for the specific language governing
015 * permissions and limitations under the License.
016 */
017package org.microbean.kubernetes.controller;
018
019import java.util.Map;
020import java.util.Objects;
021
022import java.util.function.Consumer;
023
024import java.util.logging.Level;
025import java.util.logging.Logger;
026
027import io.fabric8.kubernetes.api.model.HasMetadata;
028
029import net.jcip.annotations.GuardedBy;
030
031/**
032 * A {@link Consumer} of {@link EventQueue}s that tracks the
033 * Kubernetes resources they contain before allowing subclasses to
034 * process their individual {@link Event}s.
035 *
036 * <p>Typically you would supply an implementation of this class to a
037 * {@link Controller}.</p>
038 *
039 * @param <T> a Kubernetes resource type
040 *
041 * @author <a href="https://about.me/lairdnelson"
042 * target="_parent">Laird Nelson</a>
043 *
044 * @see #accept(AbstractEvent)
045 *
046 * @see Controller
047 */
048public abstract class ResourceTrackingEventQueueConsumer<T extends HasMetadata> implements Consumer<EventQueue<? extends T>> {
049
050
051  /*
052   * Instance fields.
053   */
054
055
056  /**
057   * A mutable {@link Map} of {@link HasMetadata} objects indexed by
058   * their keys (often a pairing of namespace and name).
059   *
060   * <p>This field may be {@code null} in which case no resource
061   * tracking will take place.</p>
062   *
063   * <p>The value of this field is {@linkplain
064   * #ResourceTrackingEventQueueConsumer(Map) supplied at construction
065   * time} and is <strong>synchronized on</strong> and written to, if
066   * non-{@code null}, by the {@link #accept(EventQueue)} method.</p>
067   *
068   * <p>This class <strong>synchronizes on this field's
069   * value</strong>, if it is non-{@code null}, when mutating its
070   * contents.</p>
071   */
072  @GuardedBy("itself")
073  private final Map<Object, T> knownObjects;
074
075  /**
076   * A {@link Logger} for use by this {@link
077   * ResourceTrackingEventQueueConsumer} implementation.
078   *
079   * <p>This field is never {@code null}.</p>
080   *
081   * @see #createLogger()
082   */
083  protected final Logger logger;
084
085
086  /*
087   * Constructors.
088   */
089
090
091  /**
092   * Creates a new {@link ResourceTrackingEventQueueConsumer}.
093   *
094   * @param knownObjects a mutable {@link Map} of {@link HasMetadata}
095   * objects indexed by their keys (often a pairing of namespace and
096   * name); may be {@code null} if deletion tracking is not needed;
097   * <strong>will have its contents changed</strong> by this {@link
098   * ResourceTrackingEventQueueConsumer}'s {@link #accept(EventQueue)}
099   * method; <strong>will be synchronized on</strong> by this {@link
100   * ResourceTrackingEventQueueConsumer}'s {@link #accept(EventQueue)}
101   * method
102   *
103   * @see #accept(EventQueue)
104   */
105  protected ResourceTrackingEventQueueConsumer(final Map<Object, T> knownObjects) {
106    super();
107    this.logger = this.createLogger();
108    if (this.logger == null) {
109      throw new IllegalStateException("createLogger() == null");
110    }
111    final String cn = this.getClass().getName();
112    final String mn = "<init>";
113    if (this.logger.isLoggable(Level.FINER)) {
114      final String knownObjectsString;
115      if (knownObjects == null) {
116        knownObjectsString = null;
117      } else {
118        synchronized (knownObjects) {
119          knownObjectsString = knownObjects.toString();
120        }
121      }
122      this.logger.entering(cn, mn, knownObjectsString);
123    }
124    this.knownObjects = knownObjects;
125    if (this.logger.isLoggable(Level.FINER)) {
126      this.logger.exiting(cn, mn);
127    }
128  }
129
130
131  /*
132   * Instance methods.
133   */
134
135
136  /**
137   * Returns a {@link Logger} for use with this {@link
138   * ResourceTrackingEventQueueConsumer}.
139   *
140   * <p>This method never returns {@code null}.</p>
141   *
142   * <p>Overrides of this method must not return {@code null}.</p>
143   *
144   * @return a non-{@code null} {@link Logger}
145   */
146  protected Logger createLogger() {
147    return Logger.getLogger(this.getClass().getName());
148  }
149
150
151  /**
152   * {@linkplain EventQueue#iterator() Loops through} all the {@link
153   * AbstractEvent}s in the supplied {@link EventQueue}, keeping track
154   * of the {@link HasMetadata} it concerns along the way by
155   * <strong>synchronizing on</strong> and writing to the {@link Map}
156   * {@linkplain #ResourceTrackingEventQueueConsumer(Map) supplied at
157   * construction time}.
158   *
159   * <p>Individual {@link AbstractEvent}s are forwarded on to the
160   * {@link #accept(AbstractEvent)} method.</p>
161   *
162   * <h2>Implementation Notes</h2>
163   *
164   * <p>This loosely models the <a
165   * href="https://github.com/kubernetes/client-go/blob/v6.0.0/tools/cache/shared_informer.go#L343">{@code
166   * HandleDeltas} function in {@code
167   * tools/cache/shared_informer.go}</a>.  The final distribution step
168   * is left unimplemented on purpose.</p>
169   *
170   * @param eventQueue the {@link EventQueue} to process; may be
171   * {@code null} in which case no action will be taken
172   *
173   * @see #accept(AbstractEvent)
174   */
175  @Override
176  public final void accept(final EventQueue<? extends T> eventQueue) {
177    final String cn = this.getClass().getName();
178    final String mn = "accept";
179    if (eventQueue == null) {
180      if (this.logger.isLoggable(Level.FINER)) {
181        this.logger.entering(cn, mn, null);
182      }
183    } else {
184      synchronized (eventQueue) {
185        if (this.logger.isLoggable(Level.FINER)) {
186          this.logger.entering(cn, mn, eventQueue);
187        }
188
189        final Object key = eventQueue.getKey();
190        if (key == null) {
191          throw new IllegalStateException("eventQueue.getKey() == null; eventQueue: " + eventQueue);
192        }
193
194        for (final AbstractEvent<? extends T> event : eventQueue) {
195          if (event != null) {
196
197            assert key.equals(event.getKey());
198
199            final Event.Type eventType = event.getType();
200            assert eventType != null;
201
202            final T newResource = event.getResource();
203
204            if (event.getPriorResource() != null && this.logger.isLoggable(Level.FINE)) {
205              this.logger.logp(Level.FINE, cn, mn, "Unexpected state; event has a priorResource: {0}", event.getPriorResource());
206            }
207
208            final T priorResource;
209            final AbstractEvent<? extends T> newEvent;
210
211            if (this.knownObjects == null) {
212              priorResource = null;
213              newEvent = event;
214            } else if (Event.Type.DELETION.equals(eventType)) {
215
216              // "Forget" (untrack) the object in question.
217              synchronized (this.knownObjects) {
218                priorResource = this.knownObjects.remove(key);
219              }
220
221              newEvent = event;
222            } else {
223              assert eventType.equals(Event.Type.ADDITION) || eventType.equals(Event.Type.MODIFICATION);
224
225              // "Learn" (track) the resource in question.
226              synchronized (this.knownObjects) {
227                priorResource = this.knownObjects.put(key, newResource);
228              }
229
230              if (event instanceof SynchronizationEvent) {
231                if (priorResource == null) {
232                  assert Event.Type.ADDITION.equals(eventType) : "!Event.Type.ADDITION.equals(eventType): " + eventType;
233                  newEvent = event;
234                } else {
235                  assert Event.Type.MODIFICATION.equals(eventType) : "!Event.Type.MODIFICATION.equals(eventType): " + eventType;
236                  newEvent = this.createSynchronizationEvent(Event.Type.MODIFICATION, priorResource, newResource);
237                }
238              } else if (priorResource == null) {
239                if (Event.Type.ADDITION.equals(eventType)) {
240                  newEvent = event;
241                } else {
242                  newEvent = this.createEvent(Event.Type.ADDITION, null, newResource);
243                }
244              } else {
245                newEvent = this.createEvent(Event.Type.MODIFICATION, priorResource, newResource);
246              }
247            }
248
249            assert newEvent != null;
250            assert newEvent instanceof SynchronizationEvent || newEvent instanceof Event;
251
252            // This is the final consumption/distribution step; it is
253            // an abstract method in this class.
254            this.accept(newEvent);
255
256          }
257        }
258
259      }
260    }
261    if (this.logger.isLoggable(Level.FINER)) {
262      this.logger.exiting(cn, mn);
263    }
264  }
265
266  /**
267   * Creates and returns a new {@link Event}.
268   *
269   * <p>This method never returns {@code null}.</p>
270   *
271   * <p>Overrides of this method must not return {@code null}.</p>
272   *
273   * @param eventType the {@link AbstractEvent.Type} for the new
274   * {@link Event}; must not be {@code null}; when supplied by the
275   * {@link #accept(EventQueue)} method's internals, will always be
276   * either {@link AbstractEvent.Type#ADDITION} or {@link
277   * AbstractEvent.Type#MODIFICATION}
278   *
279   * @param priorResource the prior state of the resource the new
280   * {@link Event} will represent; may be (and often is) {@code null}
281   *
282   * @param resource the latest state of the resource the new {@link
283   * Event} will represent; must not be {@code null}
284   *
285   * @return a new, non-{@code null} {@link Event} with each
286   * invocation
287   *
288   * @exception NullPointerException if {@code eventType} or {@code
289   * resource} is {@code null}
290   */
291  protected Event<T> createEvent(final Event.Type eventType, final T priorResource, final T resource) {
292    final String cn = this.getClass().getName();
293    final String mn = "createEvent";
294    if (this.logger.isLoggable(Level.FINER)) {
295      this.logger.entering(cn, mn, new Object[] { eventType, priorResource, resource });
296    }
297    Objects.requireNonNull(eventType);
298    final Event<T> returnValue = new Event<>(this, eventType, priorResource, resource);
299    if (this.logger.isLoggable(Level.FINER)) {
300      this.logger.exiting(cn, mn, returnValue);
301    }
302    return returnValue;
303  }
304
305  /**
306   * Creates and returns a new {@link SynchronizationEvent}.
307   *
308   * <p>This method never returns {@code null}.</p>
309   *
310   * <p>Overrides of this method must not return {@code null}.</p>
311   *
312   * @param eventType the {@link AbstractEvent.Type} for the new
313   * {@link SynchronizationEvent}; must not be {@code null}; when
314   * supplied by the {@link #accept(EventQueue)} method's internals,
315   * will always be {@link AbstractEvent.Type#MODIFICATION}
316   *
317   * @param priorResource the prior state of the resource the new
318   * {@link SynchronizationEvent} will represent; may be (and often
319   * is) {@code null}
320   *
321   * @param resource the latest state of the resource the new {@link
322   * SynchronizationEvent} will represent; must not be {@code null}
323   *
324   * @return a new, non-{@code null} {@link SynchronizationEvent} with
325   * each invocation
326   *
327   * @exception NullPointerException if {@code eventType} or {@code
328   * resource} is {@code null}
329   */
330  protected SynchronizationEvent<T> createSynchronizationEvent(final Event.Type eventType, final T priorResource, final T resource) {
331    final String cn = this.getClass().getName();
332    final String mn = "createSynchronizationEvent";
333    if (this.logger.isLoggable(Level.FINER)) {
334      this.logger.entering(cn, mn, new Object[] { eventType, priorResource, resource });
335    }
336    Objects.requireNonNull(eventType);
337    final SynchronizationEvent<T> returnValue = new SynchronizationEvent<>(this, eventType, priorResource, resource);
338    if (this.logger.isLoggable(Level.FINER)) {
339      this.logger.exiting(cn, mn, returnValue);
340    }
341    return returnValue;
342  }
343
344  /**
345   * Called to process a given {@link AbstractEvent} from the {@link
346   * EventQueue} supplied to the {@link #accept(EventQueue)} method,
347   * <strong>with that {@link EventQueue}'s monitor held</strong>.
348   *
349   * <p>Implementations of this method should be relatively fast as
350   * this method dictates the speed of {@link EventQueue}
351   * processing.</p>
352   *
353   * @param event the {@link AbstractEvent} encountered in the {@link
354   * EventQueue}; must not be {@code null}
355   *
356   * @exception NullPointerException if {@code event} is {@code null}
357   *
358   * @see #accept(EventQueue)
359   */
360  protected abstract void accept(final AbstractEvent<? extends T> event);
361
362}