001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
002 *
003 * Copyright © 2017-2018 microBean.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
014 * implied.  See the License for the specific language governing
015 * permissions and limitations under the License.
016 */
017package org.microbean.kubernetes.controller;
018
019import java.util.AbstractCollection;
020import java.util.Collection;
021import java.util.Collections;
022import java.util.Iterator;
023import java.util.LinkedList;
024import java.util.NoSuchElementException; // for javadoc only
025import java.util.Objects;
026
027import java.util.function.Consumer;
028
029import java.util.logging.Level;
030import java.util.logging.Logger;
031
032import io.fabric8.kubernetes.api.model.HasMetadata;
033
034import net.jcip.annotations.GuardedBy;
035import net.jcip.annotations.ThreadSafe;
036
037/**
038 * A publicly-unmodifiable {@link AbstractCollection} of {@link
039 * AbstractEvent}s produced by an {@link EventQueueCollection}.
040 *
041 * <p>All {@link AbstractEvent}s in an {@link EventQueue} describe the
042 * life of a single {@linkplain HasMetadata resource} in
043 * Kubernetes.</p>
044 *
045 * <h2>Thread Safety</h2>
046 *
047 * <p>This class is safe for concurrent use by multiple {@link
048 * Thread}s.  Some operations, like the usage of the {@link
049 * #iterator()} method, require that callers synchronize on the {@link
050 * EventQueue} directly.  This class' internals synchronize on {@code
051 * this} when locking is needed.</p>
052 *
053 * <p>Overrides of this class must also be safe for concurrent use by
054 * multiple {@link Thread}s.</p>
055 *
056 * @param <T> the type of a Kubernetes resource
057 *
058 * @author <a href="https://about.me/lairdnelson"
059 * target="_parent">Laird Nelson</a>
060 *
061 * @see EventQueueCollection
062 */
063@ThreadSafe
064public class EventQueue<T extends HasMetadata> extends AbstractCollection<AbstractEvent<T>> {
065
066  
067  /*
068   * Instance fields.
069   */
070
071
072  /**
073   * A {@link Logger} for use by this {@link EventQueue}.
074   *
075   * <p>This field is never {@code null}.</p>
076   *
077   * @see #createLogger()
078   */
079  protected final Logger logger;
080
081  /**
082   * The key identifying the Kubernetes resource to which all of the
083   * {@link AbstractEvent}s managed by this {@link EventQueue} apply.
084   *
085   * <p>This field is never {@code null}.</p>
086   */
087  private final Object key;
088
089  /**
090   * The actual underlying queue of {@link AbstractEvent}s.
091   *
092   * <p>This field is never {@code null}.</p>
093   */
094  @GuardedBy("this")
095  private final LinkedList<AbstractEvent<T>> events;
096
097
098  /*
099   * Constructors.
100   */
101
102
103  /**
104   * Creates a new {@link EventQueue}.
105   *
106   * @param key the key identifying the Kubernetes resource to which
107   * all of the {@link AbstractEvent}s managed by this {@link
108   * EventQueue} apply; must not be {@code null}
109   *
110   * @exception NullPointerException if {@code key} is {@code null}
111   *
112   * @exception IllegalStateException if the {@link #createLogger()}
113   * method returns {@code null}
114   */
115  protected EventQueue(final Object key) {
116    super();
117    this.logger = this.createLogger();
118    if (this.logger == null) {
119      throw new IllegalStateException("createLogger() == null");
120    }
121    final String cn = this.getClass().getName();
122    final String mn = "<init>";
123    if (this.logger.isLoggable(Level.FINER)) {
124      this.logger.entering(cn, mn, key);
125    }
126    this.key = Objects.requireNonNull(key);
127    this.events = new LinkedList<>();
128    if (this.logger.isLoggable(Level.FINER)) {
129      this.logger.exiting(cn, mn);
130    }
131  }
132
133
134  /*
135   * Instance methods.
136   */
137
138
139  /**
140   * Returns a {@link Logger} for use by this {@link EventQueue}.
141   *
142   * <p>This method never returns {@code null}.</p>
143   *
144   * <p>Overrides of this method must not return {@code null}.</p>
145   *
146   * @return a non-{@code null} {@link Logger}
147   */
148  protected Logger createLogger() {
149    return Logger.getLogger(this.getClass().getName());
150  }
151
152  /**
153   * Returns the key identifying the Kubernetes resource to which all
154   * of the {@link AbstractEvent}s managed by this {@link EventQueue}
155   * apply.
156   *
157   * <p>This method never returns {@code null}.</p>
158   *
159   * @return a non-{@code null} {@link Object}
160   *
161   * @see #EventQueue(Object)
162   */
163  public final Object getKey() {
164    final String cn = this.getClass().getName();
165    final String mn = "getKey";
166    if (this.logger.isLoggable(Level.FINER)) {
167      this.logger.entering(cn, mn);
168    }
169    final Object returnValue = this.key;
170    if (this.logger.isLoggable(Level.FINER)) {
171      this.logger.entering(cn, mn, returnValue);
172    }
173    return returnValue;
174  }
175
176  /**
177   * Returns {@code true} if this {@link EventQueue} is empty.
178   *
179   * @return {@code true} if this {@link EventQueue} is empty; {@code
180   * false} otherwise
181   *
182   * @see #size()
183   */
184  public synchronized final boolean isEmpty() {
185    final String cn = this.getClass().getName();
186    final String mn = "isEmpty";
187    if (this.logger.isLoggable(Level.FINER)) {
188      this.logger.entering(cn, mn);
189    }
190    final boolean returnValue = this.events.isEmpty();
191    if (this.logger.isLoggable(Level.FINER)) {
192      this.logger.exiting(cn, mn, Boolean.valueOf(returnValue));
193    }
194    return returnValue;
195  }
196
197  /**
198   * Returns the size of this {@link EventQueue}.
199   *
200   * <p>This method never returns an {@code int} less than {@code
201   * 0}.</p>
202   *
203   * @return the size of this {@link EventQueue}; never negative
204   *
205   * @see #isEmpty()
206   */
207  @Override
208  public synchronized final int size() {
209    final String cn = this.getClass().getName();
210    final String mn = "size";
211    if (this.logger.isLoggable(Level.FINER)) {
212      this.logger.entering(cn, mn);
213    }
214    final int returnValue = this.events.size();
215    if (this.logger.isLoggable(Level.FINER)) {
216      this.logger.exiting(cn, mn, Integer.valueOf(returnValue));
217    }
218    return returnValue;
219  }
220
221  /**
222   * Adds the supplied {@link AbstractEvent} to this {@link
223   * EventQueue} under certain conditions.
224   *
225   * <p>The supplied {@link AbstractEvent} is added to this {@link
226   * EventQueue} if:</p>
227   *
228   * <ul>
229   *
230   * <li>its {@linkplain AbstractEvent#getKey() key} is equal to this
231   * {@link EventQueue}'s {@linkplain #getKey() key}</li>
232   *
233   * <li>it is either not a {@linkplain SynchronizationEvent}
234   * synchronization event}, or it <em>is</em> a {@linkplain
235   * SynchronizationEvent synchronization event} and this {@link
236   * EventQueue} does not represent a sequence of events that
237   * {@linkplain #resultsInDeletion() describes a deletion}, and</li>
238   *
239   * <li>optional {@linkplain #compress(Collection) compression} does
240   * not result in this {@link EventQueue} being empty</li>
241   *
242   * </ul>
243   *
244   * @param event the {@link AbstractEvent} to add; must not be {@code
245   * null}
246   *
247   * @return {@code true} if an addition took place and {@linkplain
248   * #compress(Collection) optional compression} did not result in
249   * this {@link EventQueue} {@linkplain #isEmpty() becoming empty};
250   * {@code false} otherwise
251   *
252   * @exception NullPointerException if {@code event} is {@code null}
253   *
254   * @exception IllegalArgumentException if {@code event}'s
255   * {@linkplain AbstractEvent#getKey() key} is not equal to this
256   * {@link EventQueue}'s {@linkplain #getKey() key}
257   *
258   * @see #compress(Collection)
259   *
260   * @see SynchronizationEvent
261   *
262   * @see #resultsInDeletion()
263   */
264  final boolean addEvent(final AbstractEvent<T> event) {
265    final String cn = this.getClass().getName();
266    final String mn = "addEvent";
267    if (this.logger.isLoggable(Level.FINER)) {
268      this.logger.entering(cn, mn, event);
269    }
270    
271    Objects.requireNonNull(event);
272
273    final Object key = this.getKey();
274    if (!key.equals(event.getKey())) {
275      throw new IllegalArgumentException("!this.getKey().equals(event.getKey()): " + key + ", " + event.getKey());
276    }
277
278    boolean returnValue = false;
279
280    final AbstractEvent.Type eventType = event.getType();
281    assert eventType != null;
282
283    synchronized (this) {
284      if (!(event instanceof SynchronizationEvent) || !this.resultsInDeletion()) {
285        // If the event is NOT a synchronization event (so it's an
286        // addition, modification, or deletion)...
287        // ...OR if it IS a synchronization event AND we are NOT
288        // already going to delete this queue...
289        returnValue = this.events.add(event);
290        if (returnValue) {
291          this.deduplicate();
292          final Collection<AbstractEvent<T>> readOnlyEvents = Collections.unmodifiableCollection(this.events);
293          final Collection<AbstractEvent<T>> newEvents = this.compress(readOnlyEvents);
294          if (newEvents != readOnlyEvents) {
295            this.events.clear();
296            if (newEvents != null && !newEvents.isEmpty()) {
297              this.events.addAll(newEvents);
298            }
299          }
300          returnValue = !this.isEmpty();
301        }
302      }
303    }
304    
305    if (this.logger.isLoggable(Level.FINER)) {
306      this.logger.exiting(cn, mn, Boolean.valueOf(returnValue));
307    }
308    return returnValue;
309  }
310
311  /**
312   * Returns the last (and definitionally newest) {@link
313   * AbstractEvent} in this {@link EventQueue}.
314   *
315   * <p>This method never returns {@code null}.</p>
316   *
317   * @return the last {@link AbstractEvent} in this {@link
318   * EventQueue}; never {@code null}
319   *
320   * @exception NoSuchElementException if this {@link EventQueue} is
321   * {@linkplain #isEmpty() empty}
322   */
323  synchronized final AbstractEvent<T> getLast() {
324    final String cn = this.getClass().getName();
325    final String mn = "getLast";
326    if (this.logger.isLoggable(Level.FINER)) {
327      this.logger.entering(cn, mn);
328    }
329    final AbstractEvent<T> returnValue = this.events.getLast();
330    if (this.logger.isLoggable(Level.FINER)) {
331      this.logger.exiting(cn, mn, returnValue);
332    }
333    return returnValue;
334  }
335
336  /**
337   * Synchronizes on this {@link EventQueue} and, while holding its
338   * monitor, invokes the {@link Consumer#accept(Object)} method on
339   * the supplied {@link Consumer} for every {@link AbstractEvent} in
340   * this {@link EventQueue}.
341   *
342   * @param action the {@link Consumer} in question; must not be
343   * {@code null}
344   *
345   * @exception NullPointerException if {@code action} is {@code null}
346   */
347  @Override
348  public synchronized final void forEach(final Consumer<? super AbstractEvent<T>> action) {
349    super.forEach(action);
350  }
351  
352  /**
353   * Synchronizes on this {@link EventQueue} and, while holding its
354   * monitor, returns an unmodifiable {@link Iterator} over its
355   * contents.
356   *
357   * <p>This method never returns {@code null}.</p>
358   *
359   * @return a non-{@code null} unmodifiable {@link Iterator} of
360   * {@link AbstractEvent}s
361   */
362  @Override
363  public synchronized final Iterator<AbstractEvent<T>> iterator() {
364    return Collections.unmodifiableCollection(this.events).iterator();
365  }
366
367  /**
368   * If this {@link EventQueue}'s {@linkplain #size() size} is greater
369   * than {@code 2}, and if its last two {@link AbstractEvent}s are
370   * {@linkplain AbstractEvent.Type#DELETION deletions}, and if the
371   * next-to-last deletion {@link AbstractEvent}'s {@linkplain
372   * AbstractEvent#isFinalStateKnown() state is known}, then this method
373   * causes that {@link AbstractEvent} to replace the two under consideration.
374   *
375   * <p>This method is called only by the {@link #addEvent(AbstractEvent)}
376   * method.</p>
377   *
378   * @see #addEvent(AbstractEvent)
379   */
380  private synchronized final void deduplicate() {
381    final String cn = this.getClass().getName();
382    final String mn = "deduplicate";
383    if (this.logger.isLoggable(Level.FINER)) {
384      this.logger.entering(cn, mn);
385    }
386    final int size = this.size();
387    if (size > 2) {
388      final AbstractEvent<T> lastEvent = this.events.get(size - 1);
389      final AbstractEvent<T> nextToLastEvent = this.events.get(size - 2);
390      final AbstractEvent<T> event;
391      if (lastEvent != null && nextToLastEvent != null && AbstractEvent.Type.DELETION.equals(lastEvent.getType()) && AbstractEvent.Type.DELETION.equals(nextToLastEvent.getType())) {
392        event = nextToLastEvent.isFinalStateKnown() ? nextToLastEvent : lastEvent;
393      } else {
394        event = null;
395      }
396      if (event != null) {
397        this.events.set(size - 2, event);
398        this.events.remove(size - 1);
399      }
400    }
401    if (this.logger.isLoggable(Level.FINER)) {
402      this.logger.exiting(cn, mn);
403    }
404  }
405
406  /**
407   * Returns {@code true} if this {@link EventQueue} is {@linkplain
408   * #isEmpty() not empty} and the {@linkplain #getLast() last
409   * <code>AbstractEvent</code> in this <code>EventQueue</code>} is a
410   * {@linkplain AbstractEvent.Type#DELETION deletion event}.
411   *
412   * @return {@code true} if this {@link EventQueue} currently
413   * logically represents the deletion of a resource, {@code false}
414   * otherwise
415   */
416  synchronized final boolean resultsInDeletion() {
417    final String cn = this.getClass().getName();
418    final String mn = "resultsInDeletion";
419    if (this.logger.isLoggable(Level.FINER)) {
420      this.logger.entering(cn, mn);
421    }
422    final boolean returnValue = !this.isEmpty() && this.getLast().getType().equals(AbstractEvent.Type.DELETION);
423    if (this.logger.isLoggable(Level.FINER)) {
424      this.logger.exiting(cn, mn, Boolean.valueOf(returnValue));
425    }
426    return returnValue;
427  }
428
429  /**
430   * Performs a compression operation on the supplied {@link
431   * Collection} of {@link AbstractEvent}s and returns the result of that
432   * operation.
433   *
434   * <p>This method may return {@code null}, which will result in the
435   * emptying of this {@link EventQueue}.</p>
436   *
437   * <p>This method is called while holding this {@link EventQueue}'s
438   * monitor.</p>
439   *
440   * <p>This method is called when an {@link EventQueueCollection} (or
441   * some other {@link AbstractEvent} producer with access to
442   * package-protected methods of this class) adds an {@link AbstractEvent} to
443   * this {@link EventQueue} and provides the {@link EventQueue}
444   * implementation with the ability to eliminate duplicates or
445   * otherwise compress the event stream it represents.</p>
446   *
447   * <p>This implementation simply returns the supplied {@code events}
448   * {@link Collection}; i.e. no compression is performed.</p>
449   *
450   * @param events an {@link
451   * Collections#unmodifiableCollection(Collection) unmodifiable
452   * <tt>Collection</tt>} of {@link AbstractEvent}s representing the
453   * current state of this {@link EventQueue}; will never be {@code
454   * null}
455   *
456   * @return the new state that this {@link EventQueue} should assume;
457   * may be {@code null}; may simply be the supplied {@code events}
458   * {@link Collection} if compression is not desired or implemented
459   */
460  protected Collection<AbstractEvent<T>> compress(final Collection<AbstractEvent<T>> events) {
461    return events;
462  }
463
464  /**
465   * Returns a hashcode for this {@link EventQueue}.
466   *
467   * @return a hashcode for this {@link EventQueue}
468   *
469   * @see #equals(Object)
470   */
471  @Override
472  public final int hashCode() {
473    int hashCode = 17;
474
475    Object value = this.getKey();
476    int c = value == null ? 0 : value.hashCode();
477    hashCode = 37 * hashCode + c;
478
479    synchronized (this) {
480      value = this.events;
481      c = value == null ? 0 : value.hashCode();
482    }
483    hashCode = 37 * hashCode + c;
484
485    return hashCode;
486  }
487
488  /**
489   * Returns {@code true} if the supplied {@link Object} is also an
490   * {@link EventQueue} and is equal in all respects to this one.
491   *
492   * @param other the {@link Object} to test; may be {@code null} in
493   * which case {@code null} will be returned
494   *
495   * @return {@code true} if the supplied {@link Object} is also an
496   * {@link EventQueue} and is equal in all respects to this one;
497   * {@code false} otherwise
498   *
499   * @see #hashCode()
500   */
501  @Override
502  public final boolean equals(final Object other) {
503    if (other == this) {
504      return true;
505    } else if (other instanceof EventQueue) {
506      final EventQueue<?> her = (EventQueue<?>)other;
507
508      final Object key = this.getKey();
509      if (key == null) {
510        if (her.getKey() != null) {
511          return false;
512        }
513      } else if (!key.equals(her.getKey())) {
514        return false;
515      }
516
517      synchronized (this) {
518        final Object events = this.events;
519        if (events == null) {
520          synchronized (her) {
521            if (her.events != null) {
522              return false;
523            }
524          }
525        } else {
526          synchronized (her) {
527            if (!events.equals(her.events)) {
528              return false;
529            }
530          }
531        }
532      }
533
534      return true;
535    } else {
536      return false;
537    }
538  }
539
540  /**
541   * Returns a {@link String} representation of this {@link
542   * EventQueue}.
543   *
544   * <p>This method never returns {@code null}.</p>
545   *
546   * @return a non-{@code null} {@link String} representation of this
547   * {@link EventQueue}
548   */
549  @Override
550  public synchronized final String toString() {
551    return new StringBuilder().append(this.getKey()).append(": ").append(this.events).toString();
552  }
553
554}