001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*- 002 * 003 * Copyright © 2017-2018 microBean. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 014 * implied. See the License for the specific language governing 015 * permissions and limitations under the License. 016 */ 017package org.microbean.kubernetes.controller; 018 019import java.util.AbstractCollection; 020import java.util.Collection; 021import java.util.Collections; 022import java.util.Iterator; 023import java.util.LinkedList; 024import java.util.NoSuchElementException; // for javadoc only 025import java.util.Objects; 026 027import java.util.function.Consumer; 028 029import java.util.logging.Level; 030import java.util.logging.Logger; 031 032import io.fabric8.kubernetes.api.model.HasMetadata; 033 034import net.jcip.annotations.GuardedBy; 035import net.jcip.annotations.ThreadSafe; 036 037/** 038 * A publicly-unmodifiable {@link AbstractCollection} of {@link 039 * AbstractEvent}s produced by an {@link EventQueueCollection}. 040 * 041 * <p>All {@link AbstractEvent}s in an {@link EventQueue} describe the 042 * life of a single {@linkplain HasMetadata resource} in 043 * Kubernetes.</p> 044 * 045 * <h2>Thread Safety</h2> 046 * 047 * <p>This class is safe for concurrent use by multiple {@link 048 * Thread}s. Some operations, like the usage of the {@link 049 * #iterator()} method, require that callers synchronize on the {@link 050 * EventQueue} directly. This class' internals synchronize on {@code 051 * this} when locking is needed.</p> 052 * 053 * <p>Overrides of this class must also be safe for concurrent use by 054 * multiple {@link Thread}s.</p> 055 * 056 * @param <T> the type of a Kubernetes resource 057 * 058 * @author <a href="https://about.me/lairdnelson" 059 * target="_parent">Laird Nelson</a> 060 * 061 * @see EventQueueCollection 062 */ 063@ThreadSafe 064public class EventQueue<T extends HasMetadata> extends AbstractCollection<AbstractEvent<T>> { 065 066 067 /* 068 * Instance fields. 069 */ 070 071 072 /** 073 * A {@link Logger} for use by this {@link EventQueue}. 074 * 075 * <p>This field is never {@code null}.</p> 076 * 077 * @see #createLogger() 078 */ 079 protected final Logger logger; 080 081 /** 082 * The key identifying the Kubernetes resource to which all of the 083 * {@link AbstractEvent}s managed by this {@link EventQueue} apply. 084 * 085 * <p>This field is never {@code null}.</p> 086 */ 087 private final Object key; 088 089 /** 090 * The actual underlying queue of {@link AbstractEvent}s. 091 * 092 * <p>This field is never {@code null}.</p> 093 */ 094 @GuardedBy("this") 095 private final LinkedList<AbstractEvent<T>> events; 096 097 098 /* 099 * Constructors. 100 */ 101 102 103 /** 104 * Creates a new {@link EventQueue}. 105 * 106 * @param key the key identifying the Kubernetes resource to which 107 * all of the {@link AbstractEvent}s managed by this {@link 108 * EventQueue} apply; must not be {@code null} 109 * 110 * @exception NullPointerException if {@code key} is {@code null} 111 * 112 * @exception IllegalStateException if the {@link #createLogger()} 113 * method returns {@code null} 114 */ 115 protected EventQueue(final Object key) { 116 super(); 117 this.logger = this.createLogger(); 118 if (this.logger == null) { 119 throw new IllegalStateException("createLogger() == null"); 120 } 121 final String cn = this.getClass().getName(); 122 final String mn = "<init>"; 123 if (this.logger.isLoggable(Level.FINER)) { 124 this.logger.entering(cn, mn, key); 125 } 126 this.key = Objects.requireNonNull(key); 127 this.events = new LinkedList<>(); 128 if (this.logger.isLoggable(Level.FINER)) { 129 this.logger.exiting(cn, mn); 130 } 131 } 132 133 134 /* 135 * Instance methods. 136 */ 137 138 139 /** 140 * Returns a {@link Logger} for use by this {@link EventQueue}. 141 * 142 * <p>This method never returns {@code null}.</p> 143 * 144 * <p>Overrides of this method must not return {@code null}.</p> 145 * 146 * @return a non-{@code null} {@link Logger} 147 */ 148 protected Logger createLogger() { 149 return Logger.getLogger(this.getClass().getName()); 150 } 151 152 /** 153 * Returns the key identifying the Kubernetes resource to which all 154 * of the {@link AbstractEvent}s managed by this {@link EventQueue} 155 * apply. 156 * 157 * <p>This method never returns {@code null}.</p> 158 * 159 * @return a non-{@code null} {@link Object} 160 * 161 * @see #EventQueue(Object) 162 */ 163 public final Object getKey() { 164 final String cn = this.getClass().getName(); 165 final String mn = "getKey"; 166 if (this.logger.isLoggable(Level.FINER)) { 167 this.logger.entering(cn, mn); 168 } 169 final Object returnValue = this.key; 170 if (this.logger.isLoggable(Level.FINER)) { 171 this.logger.entering(cn, mn, returnValue); 172 } 173 return returnValue; 174 } 175 176 /** 177 * Returns {@code true} if this {@link EventQueue} is empty. 178 * 179 * @return {@code true} if this {@link EventQueue} is empty; {@code 180 * false} otherwise 181 * 182 * @see #size() 183 */ 184 public synchronized final boolean isEmpty() { 185 final String cn = this.getClass().getName(); 186 final String mn = "isEmpty"; 187 if (this.logger.isLoggable(Level.FINER)) { 188 this.logger.entering(cn, mn); 189 } 190 final boolean returnValue = this.events.isEmpty(); 191 if (this.logger.isLoggable(Level.FINER)) { 192 this.logger.exiting(cn, mn, Boolean.valueOf(returnValue)); 193 } 194 return returnValue; 195 } 196 197 /** 198 * Returns the size of this {@link EventQueue}. 199 * 200 * <p>This method never returns an {@code int} less than {@code 201 * 0}.</p> 202 * 203 * @return the size of this {@link EventQueue}; never negative 204 * 205 * @see #isEmpty() 206 */ 207 @Override 208 public synchronized final int size() { 209 final String cn = this.getClass().getName(); 210 final String mn = "size"; 211 if (this.logger.isLoggable(Level.FINER)) { 212 this.logger.entering(cn, mn); 213 } 214 final int returnValue = this.events.size(); 215 if (this.logger.isLoggable(Level.FINER)) { 216 this.logger.exiting(cn, mn, Integer.valueOf(returnValue)); 217 } 218 return returnValue; 219 } 220 221 /** 222 * Adds the supplied {@link AbstractEvent} to this {@link 223 * EventQueue} under certain conditions. 224 * 225 * <p>The supplied {@link AbstractEvent} is added to this {@link 226 * EventQueue} if:</p> 227 * 228 * <ul> 229 * 230 * <li>its {@linkplain AbstractEvent#getKey() key} is equal to this 231 * {@link EventQueue}'s {@linkplain #getKey() key}</li> 232 * 233 * <li>it is either not a {@linkplain SynchronizationEvent} 234 * synchronization event}, or it <em>is</em> a {@linkplain 235 * SynchronizationEvent synchronization event} and this {@link 236 * EventQueue} does not represent a sequence of events that 237 * {@linkplain #resultsInDeletion() describes a deletion}, and</li> 238 * 239 * <li>optional {@linkplain #compress(Collection) compression} does 240 * not result in this {@link EventQueue} being empty</li> 241 * 242 * </ul> 243 * 244 * @param event the {@link AbstractEvent} to add; must not be {@code 245 * null} 246 * 247 * @return {@code true} if an addition took place and {@linkplain 248 * #compress(Collection) optional compression} did not result in 249 * this {@link EventQueue} {@linkplain #isEmpty() becoming empty}; 250 * {@code false} otherwise 251 * 252 * @exception NullPointerException if {@code event} is {@code null} 253 * 254 * @exception IllegalArgumentException if {@code event}'s 255 * {@linkplain AbstractEvent#getKey() key} is not equal to this 256 * {@link EventQueue}'s {@linkplain #getKey() key} 257 * 258 * @see #compress(Collection) 259 * 260 * @see SynchronizationEvent 261 * 262 * @see #resultsInDeletion() 263 */ 264 final boolean addEvent(final AbstractEvent<T> event) { 265 final String cn = this.getClass().getName(); 266 final String mn = "addEvent"; 267 if (this.logger.isLoggable(Level.FINER)) { 268 this.logger.entering(cn, mn, event); 269 } 270 271 Objects.requireNonNull(event); 272 273 final Object key = this.getKey(); 274 if (!key.equals(event.getKey())) { 275 throw new IllegalArgumentException("!this.getKey().equals(event.getKey()): " + key + ", " + event.getKey()); 276 } 277 278 boolean returnValue = false; 279 280 final AbstractEvent.Type eventType = event.getType(); 281 assert eventType != null; 282 283 synchronized (this) { 284 if (!(event instanceof SynchronizationEvent) || !this.resultsInDeletion()) { 285 // If the event is NOT a synchronization event (so it's an 286 // addition, modification, or deletion)... 287 // ...OR if it IS a synchronization event AND we are NOT 288 // already going to delete this queue... 289 returnValue = this.events.add(event); 290 if (returnValue) { 291 this.deduplicate(); 292 final Collection<AbstractEvent<T>> readOnlyEvents = Collections.unmodifiableCollection(this.events); 293 final Collection<AbstractEvent<T>> newEvents = this.compress(readOnlyEvents); 294 if (newEvents != readOnlyEvents) { 295 this.events.clear(); 296 if (newEvents != null && !newEvents.isEmpty()) { 297 this.events.addAll(newEvents); 298 } 299 } 300 returnValue = !this.isEmpty(); 301 } 302 } 303 } 304 305 if (this.logger.isLoggable(Level.FINER)) { 306 this.logger.exiting(cn, mn, Boolean.valueOf(returnValue)); 307 } 308 return returnValue; 309 } 310 311 /** 312 * Returns the last (and definitionally newest) {@link 313 * AbstractEvent} in this {@link EventQueue}. 314 * 315 * <p>This method never returns {@code null}.</p> 316 * 317 * @return the last {@link AbstractEvent} in this {@link 318 * EventQueue}; never {@code null} 319 * 320 * @exception NoSuchElementException if this {@link EventQueue} is 321 * {@linkplain #isEmpty() empty} 322 */ 323 synchronized final AbstractEvent<T> getLast() { 324 final String cn = this.getClass().getName(); 325 final String mn = "getLast"; 326 if (this.logger.isLoggable(Level.FINER)) { 327 this.logger.entering(cn, mn); 328 } 329 final AbstractEvent<T> returnValue = this.events.getLast(); 330 if (this.logger.isLoggable(Level.FINER)) { 331 this.logger.exiting(cn, mn, returnValue); 332 } 333 return returnValue; 334 } 335 336 /** 337 * Synchronizes on this {@link EventQueue} and, while holding its 338 * monitor, invokes the {@link Consumer#accept(Object)} method on 339 * the supplied {@link Consumer} for every {@link AbstractEvent} in 340 * this {@link EventQueue}. 341 * 342 * @param action the {@link Consumer} in question; must not be 343 * {@code null} 344 * 345 * @exception NullPointerException if {@code action} is {@code null} 346 */ 347 @Override 348 public synchronized final void forEach(final Consumer<? super AbstractEvent<T>> action) { 349 super.forEach(action); 350 } 351 352 /** 353 * Synchronizes on this {@link EventQueue} and, while holding its 354 * monitor, returns an unmodifiable {@link Iterator} over its 355 * contents. 356 * 357 * <p>This method never returns {@code null}.</p> 358 * 359 * @return a non-{@code null} unmodifiable {@link Iterator} of 360 * {@link AbstractEvent}s 361 */ 362 @Override 363 public synchronized final Iterator<AbstractEvent<T>> iterator() { 364 return Collections.unmodifiableCollection(this.events).iterator(); 365 } 366 367 /** 368 * If this {@link EventQueue}'s {@linkplain #size() size} is greater 369 * than {@code 2}, and if its last two {@link AbstractEvent}s are 370 * {@linkplain AbstractEvent.Type#DELETION deletions}, and if the 371 * next-to-last deletion {@link AbstractEvent}'s {@linkplain 372 * AbstractEvent#isFinalStateKnown() state is known}, then this method 373 * causes that {@link AbstractEvent} to replace the two under consideration. 374 * 375 * <p>This method is called only by the {@link #addEvent(AbstractEvent)} 376 * method.</p> 377 * 378 * @see #addEvent(AbstractEvent) 379 */ 380 private synchronized final void deduplicate() { 381 final String cn = this.getClass().getName(); 382 final String mn = "deduplicate"; 383 if (this.logger.isLoggable(Level.FINER)) { 384 this.logger.entering(cn, mn); 385 } 386 final int size = this.size(); 387 if (size > 2) { 388 final AbstractEvent<T> lastEvent = this.events.get(size - 1); 389 final AbstractEvent<T> nextToLastEvent = this.events.get(size - 2); 390 final AbstractEvent<T> event; 391 if (lastEvent != null && nextToLastEvent != null && AbstractEvent.Type.DELETION.equals(lastEvent.getType()) && AbstractEvent.Type.DELETION.equals(nextToLastEvent.getType())) { 392 event = nextToLastEvent.isFinalStateKnown() ? nextToLastEvent : lastEvent; 393 } else { 394 event = null; 395 } 396 if (event != null) { 397 this.events.set(size - 2, event); 398 this.events.remove(size - 1); 399 } 400 } 401 if (this.logger.isLoggable(Level.FINER)) { 402 this.logger.exiting(cn, mn); 403 } 404 } 405 406 /** 407 * Returns {@code true} if this {@link EventQueue} is {@linkplain 408 * #isEmpty() not empty} and the {@linkplain #getLast() last 409 * <code>AbstractEvent</code> in this <code>EventQueue</code>} is a 410 * {@linkplain AbstractEvent.Type#DELETION deletion event}. 411 * 412 * @return {@code true} if this {@link EventQueue} currently 413 * logically represents the deletion of a resource, {@code false} 414 * otherwise 415 */ 416 synchronized final boolean resultsInDeletion() { 417 final String cn = this.getClass().getName(); 418 final String mn = "resultsInDeletion"; 419 if (this.logger.isLoggable(Level.FINER)) { 420 this.logger.entering(cn, mn); 421 } 422 final boolean returnValue = !this.isEmpty() && this.getLast().getType().equals(AbstractEvent.Type.DELETION); 423 if (this.logger.isLoggable(Level.FINER)) { 424 this.logger.exiting(cn, mn, Boolean.valueOf(returnValue)); 425 } 426 return returnValue; 427 } 428 429 /** 430 * Performs a compression operation on the supplied {@link 431 * Collection} of {@link AbstractEvent}s and returns the result of that 432 * operation. 433 * 434 * <p>This method may return {@code null}, which will result in the 435 * emptying of this {@link EventQueue}.</p> 436 * 437 * <p>This method is called while holding this {@link EventQueue}'s 438 * monitor.</p> 439 * 440 * <p>This method is called when an {@link EventQueueCollection} (or 441 * some other {@link AbstractEvent} producer with access to 442 * package-protected methods of this class) adds an {@link AbstractEvent} to 443 * this {@link EventQueue} and provides the {@link EventQueue} 444 * implementation with the ability to eliminate duplicates or 445 * otherwise compress the event stream it represents.</p> 446 * 447 * <p>This implementation simply returns the supplied {@code events} 448 * {@link Collection}; i.e. no compression is performed.</p> 449 * 450 * @param events an {@link 451 * Collections#unmodifiableCollection(Collection) unmodifiable 452 * <tt>Collection</tt>} of {@link AbstractEvent}s representing the 453 * current state of this {@link EventQueue}; will never be {@code 454 * null} 455 * 456 * @return the new state that this {@link EventQueue} should assume; 457 * may be {@code null}; may simply be the supplied {@code events} 458 * {@link Collection} if compression is not desired or implemented 459 */ 460 protected Collection<AbstractEvent<T>> compress(final Collection<AbstractEvent<T>> events) { 461 return events; 462 } 463 464 /** 465 * Returns a hashcode for this {@link EventQueue}. 466 * 467 * @return a hashcode for this {@link EventQueue} 468 * 469 * @see #equals(Object) 470 */ 471 @Override 472 public final int hashCode() { 473 int hashCode = 17; 474 475 Object value = this.getKey(); 476 int c = value == null ? 0 : value.hashCode(); 477 hashCode = 37 * hashCode + c; 478 479 synchronized (this) { 480 value = this.events; 481 c = value == null ? 0 : value.hashCode(); 482 } 483 hashCode = 37 * hashCode + c; 484 485 return hashCode; 486 } 487 488 /** 489 * Returns {@code true} if the supplied {@link Object} is also an 490 * {@link EventQueue} and is equal in all respects to this one. 491 * 492 * @param other the {@link Object} to test; may be {@code null} in 493 * which case {@code null} will be returned 494 * 495 * @return {@code true} if the supplied {@link Object} is also an 496 * {@link EventQueue} and is equal in all respects to this one; 497 * {@code false} otherwise 498 * 499 * @see #hashCode() 500 */ 501 @Override 502 public final boolean equals(final Object other) { 503 if (other == this) { 504 return true; 505 } else if (other instanceof EventQueue) { 506 final EventQueue<?> her = (EventQueue<?>)other; 507 508 final Object key = this.getKey(); 509 if (key == null) { 510 if (her.getKey() != null) { 511 return false; 512 } 513 } else if (!key.equals(her.getKey())) { 514 return false; 515 } 516 517 synchronized (this) { 518 final Object events = this.events; 519 if (events == null) { 520 synchronized (her) { 521 if (her.events != null) { 522 return false; 523 } 524 } 525 } else { 526 synchronized (her) { 527 if (!events.equals(her.events)) { 528 return false; 529 } 530 } 531 } 532 } 533 534 return true; 535 } else { 536 return false; 537 } 538 } 539 540 /** 541 * Returns a {@link String} representation of this {@link 542 * EventQueue}. 543 * 544 * <p>This method never returns {@code null}.</p> 545 * 546 * @return a non-{@code null} {@link String} representation of this 547 * {@link EventQueue} 548 */ 549 @Override 550 public synchronized final String toString() { 551 return new StringBuilder().append(this.getKey()).append(": ").append(this.events).toString(); 552 } 553 554}