001/* -*- mode: Java; c-basic-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*- 002 * 003 * Copyright © 2017-2018 microBean. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 014 * implied. See the License for the specific language governing 015 * permissions and limitations under the License. 016 */ 017package org.microbean.kubernetes.controller; 018 019import java.util.Map; 020import java.util.Objects; 021 022import java.util.function.Consumer; 023 024import java.util.logging.Level; 025import java.util.logging.Logger; 026 027import io.fabric8.kubernetes.api.model.HasMetadata; 028 029import net.jcip.annotations.GuardedBy; 030 031/** 032 * A {@link Consumer} of {@link EventQueue}s that tracks the 033 * Kubernetes resources they contain before allowing subclasses to 034 * process their individual {@link Event}s. 035 * 036 * <p>Typically you would supply an implementation of this class to a 037 * {@link Controller}.</p> 038 * 039 * @param <T> a Kubernetes resource type 040 * 041 * @author <a href="https://about.me/lairdnelson" 042 * target="_parent">Laird Nelson</a> 043 * 044 * @see #accept(AbstractEvent) 045 * 046 * @see Controller 047 */ 048public abstract class ResourceTrackingEventQueueConsumer<T extends HasMetadata> implements Consumer<EventQueue<? extends T>> { 049 050 051 /* 052 * Instance fields. 053 */ 054 055 056 /** 057 * A mutable {@link Map} of {@link HasMetadata} objects indexed by 058 * their keys (often a pairing of namespace and name). 059 * 060 * <p>This field may be {@code null} in which case no resource 061 * tracking will take place.</p> 062 * 063 * <p>The value of this field is {@linkplain 064 * #ResourceTrackingEventQueueConsumer(Map) supplied at construction 065 * time} and is <strong>synchronized on</strong> and written to, if 066 * non-{@code null}, by the {@link #accept(EventQueue)} method.</p> 067 * 068 * <p>This class <strong>synchronizes on this field's 069 * value</strong>, if it is non-{@code null}, when mutating its 070 * contents.</p> 071 */ 072 @GuardedBy("itself") 073 private final Map<Object, T> knownObjects; 074 075 /** 076 * A {@link Logger} for use by this {@link 077 * ResourceTrackingEventQueueConsumer} implementation. 078 * 079 * <p>This field is never {@code null}.</p> 080 * 081 * @see #createLogger() 082 */ 083 protected final Logger logger; 084 085 086 /* 087 * Constructors. 088 */ 089 090 091 /** 092 * Creates a new {@link ResourceTrackingEventQueueConsumer}. 093 * 094 * @param knownObjects a mutable {@link Map} of {@link HasMetadata} 095 * objects indexed by their keys (often a pairing of namespace and 096 * name); may be {@code null} if deletion tracking is not needed; 097 * <strong>will have its contents changed</strong> by this {@link 098 * ResourceTrackingEventQueueConsumer}'s {@link #accept(EventQueue)} 099 * method; <strong>will be synchronized on</strong> by this {@link 100 * ResourceTrackingEventQueueConsumer}'s {@link #accept(EventQueue)} 101 * method 102 * 103 * @see #accept(EventQueue) 104 */ 105 protected ResourceTrackingEventQueueConsumer(final Map<Object, T> knownObjects) { 106 super(); 107 this.logger = this.createLogger(); 108 if (this.logger == null) { 109 throw new IllegalStateException("createLogger() == null"); 110 } 111 final String cn = this.getClass().getName(); 112 final String mn = "<init>"; 113 if (this.logger.isLoggable(Level.FINER)) { 114 final String knownObjectsString; 115 if (knownObjects == null) { 116 knownObjectsString = null; 117 } else { 118 synchronized (knownObjects) { 119 knownObjectsString = knownObjects.toString(); 120 } 121 } 122 this.logger.entering(cn, mn, knownObjectsString); 123 } 124 this.knownObjects = knownObjects; 125 if (this.logger.isLoggable(Level.FINER)) { 126 this.logger.exiting(cn, mn); 127 } 128 } 129 130 131 /* 132 * Instance methods. 133 */ 134 135 136 /** 137 * Returns a {@link Logger} for use with this {@link 138 * ResourceTrackingEventQueueConsumer}. 139 * 140 * <p>This method never returns {@code null}.</p> 141 * 142 * <p>Overrides of this method must not return {@code null}.</p> 143 * 144 * @return a non-{@code null} {@link Logger} 145 */ 146 protected Logger createLogger() { 147 return Logger.getLogger(this.getClass().getName()); 148 } 149 150 151 /** 152 * {@linkplain EventQueue#iterator() Loops through} all the {@link 153 * AbstractEvent}s in the supplied {@link EventQueue}, keeping track 154 * of the {@link HasMetadata} it concerns along the way by 155 * <strong>synchronizing on</strong> and writing to the {@link Map} 156 * {@linkplain #ResourceTrackingEventQueueConsumer(Map) supplied at 157 * construction time}. 158 * 159 * <p>Individual {@link AbstractEvent}s are forwarded on to the 160 * {@link #accept(AbstractEvent)} method.</p> 161 * 162 * <h2>Implementation Notes</h2> 163 * 164 * <p>This loosely models the <a 165 * href="https://github.com/kubernetes/client-go/blob/v6.0.0/tools/cache/shared_informer.go#L343">{@code 166 * HandleDeltas} function in {@code 167 * tools/cache/shared_informer.go}</a>. The final distribution step 168 * is left unimplemented on purpose.</p> 169 * 170 * @param eventQueue the {@link EventQueue} to process; may be 171 * {@code null} in which case no action will be taken 172 * 173 * @see #accept(AbstractEvent) 174 */ 175 @Override 176 public final void accept(final EventQueue<? extends T> eventQueue) { 177 final String cn = this.getClass().getName(); 178 final String mn = "accept"; 179 if (eventQueue == null) { 180 if (this.logger.isLoggable(Level.FINER)) { 181 this.logger.entering(cn, mn, null); 182 } 183 } else { 184 synchronized (eventQueue) { 185 if (this.logger.isLoggable(Level.FINER)) { 186 this.logger.entering(cn, mn, eventQueue); 187 } 188 189 final Object key = eventQueue.getKey(); 190 if (key == null) { 191 throw new IllegalStateException("eventQueue.getKey() == null; eventQueue: " + eventQueue); 192 } 193 194 for (final AbstractEvent<? extends T> event : eventQueue) { 195 if (event != null) { 196 197 assert key.equals(event.getKey()); 198 199 final Event.Type eventType = event.getType(); 200 assert eventType != null; 201 202 final T newResource = event.getResource(); 203 204 if (event.getPriorResource() != null && this.logger.isLoggable(Level.FINE)) { 205 this.logger.logp(Level.FINE, cn, mn, "Unexpected state; event has a priorResource: {0}", event.getPriorResource()); 206 } 207 208 final T priorResource; 209 final AbstractEvent<? extends T> newEvent; 210 211 if (this.knownObjects == null) { 212 priorResource = null; 213 newEvent = event; 214 } else if (Event.Type.DELETION.equals(eventType)) { 215 216 // "Forget" (untrack) the object in question. 217 synchronized (this.knownObjects) { 218 priorResource = this.knownObjects.remove(key); 219 } 220 221 newEvent = event; 222 } else { 223 assert eventType.equals(Event.Type.ADDITION) || eventType.equals(Event.Type.MODIFICATION); 224 225 // "Learn" (track) the resource in question. 226 synchronized (this.knownObjects) { 227 priorResource = this.knownObjects.put(key, newResource); 228 } 229 230 if (event instanceof SynchronizationEvent) { 231 if (priorResource == null) { 232 assert Event.Type.ADDITION.equals(eventType) : "!Event.Type.ADDITION.equals(eventType): " + eventType; 233 newEvent = event; 234 } else { 235 assert Event.Type.MODIFICATION.equals(eventType) : "!Event.Type.MODIFICATION.equals(eventType): " + eventType; 236 newEvent = this.createSynchronizationEvent(Event.Type.MODIFICATION, priorResource, newResource); 237 } 238 } else if (priorResource == null) { 239 if (Event.Type.ADDITION.equals(eventType)) { 240 newEvent = event; 241 } else { 242 newEvent = this.createEvent(Event.Type.ADDITION, null, newResource); 243 } 244 } else { 245 newEvent = this.createEvent(Event.Type.MODIFICATION, priorResource, newResource); 246 } 247 } 248 249 assert newEvent != null; 250 assert newEvent instanceof SynchronizationEvent || newEvent instanceof Event; 251 252 // This is the final consumption/distribution step; it is 253 // an abstract method in this class. 254 this.accept(newEvent); 255 256 } 257 } 258 259 } 260 } 261 if (this.logger.isLoggable(Level.FINER)) { 262 this.logger.exiting(cn, mn); 263 } 264 } 265 266 /** 267 * Creates and returns a new {@link Event}. 268 * 269 * <p>This method never returns {@code null}.</p> 270 * 271 * <p>Overrides of this method must not return {@code null}.</p> 272 * 273 * @param eventType the {@link AbstractEvent.Type} for the new 274 * {@link Event}; must not be {@code null}; when supplied by the 275 * {@link #accept(EventQueue)} method's internals, will always be 276 * either {@link AbstractEvent.Type#ADDITION} or {@link 277 * AbstractEvent.Type#MODIFICATION} 278 * 279 * @param priorResource the prior state of the resource the new 280 * {@link Event} will represent; may be (and often is) {@code null} 281 * 282 * @param resource the latest state of the resource the new {@link 283 * Event} will represent; must not be {@code null} 284 * 285 * @return a new, non-{@code null} {@link Event} with each 286 * invocation 287 * 288 * @exception NullPointerException if {@code eventType} or {@code 289 * resource} is {@code null} 290 */ 291 protected Event<T> createEvent(final Event.Type eventType, final T priorResource, final T resource) { 292 final String cn = this.getClass().getName(); 293 final String mn = "createEvent"; 294 if (this.logger.isLoggable(Level.FINER)) { 295 this.logger.entering(cn, mn, new Object[] { eventType, priorResource, resource }); 296 } 297 Objects.requireNonNull(eventType); 298 final Event<T> returnValue = new Event<>(this, eventType, priorResource, resource); 299 if (this.logger.isLoggable(Level.FINER)) { 300 this.logger.exiting(cn, mn, returnValue); 301 } 302 return returnValue; 303 } 304 305 /** 306 * Creates and returns a new {@link SynchronizationEvent}. 307 * 308 * <p>This method never returns {@code null}.</p> 309 * 310 * <p>Overrides of this method must not return {@code null}.</p> 311 * 312 * @param eventType the {@link AbstractEvent.Type} for the new 313 * {@link SynchronizationEvent}; must not be {@code null}; when 314 * supplied by the {@link #accept(EventQueue)} method's internals, 315 * will always be {@link AbstractEvent.Type#MODIFICATION} 316 * 317 * @param priorResource the prior state of the resource the new 318 * {@link SynchronizationEvent} will represent; may be (and often 319 * is) {@code null} 320 * 321 * @param resource the latest state of the resource the new {@link 322 * SynchronizationEvent} will represent; must not be {@code null} 323 * 324 * @return a new, non-{@code null} {@link SynchronizationEvent} with 325 * each invocation 326 * 327 * @exception NullPointerException if {@code eventType} or {@code 328 * resource} is {@code null} 329 */ 330 protected SynchronizationEvent<T> createSynchronizationEvent(final Event.Type eventType, final T priorResource, final T resource) { 331 final String cn = this.getClass().getName(); 332 final String mn = "createSynchronizationEvent"; 333 if (this.logger.isLoggable(Level.FINER)) { 334 this.logger.entering(cn, mn, new Object[] { eventType, priorResource, resource }); 335 } 336 Objects.requireNonNull(eventType); 337 final SynchronizationEvent<T> returnValue = new SynchronizationEvent<>(this, eventType, priorResource, resource); 338 if (this.logger.isLoggable(Level.FINER)) { 339 this.logger.exiting(cn, mn, returnValue); 340 } 341 return returnValue; 342 } 343 344 /** 345 * Called to process a given {@link AbstractEvent} from the {@link 346 * EventQueue} supplied to the {@link #accept(EventQueue)} method, 347 * <strong>with that {@link EventQueue}'s monitor held</strong>. 348 * 349 * <p>Implementations of this method should be relatively fast as 350 * this method dictates the speed of {@link EventQueue} 351 * processing.</p> 352 * 353 * @param event the {@link AbstractEvent} encountered in the {@link 354 * EventQueue}; must not be {@code null} 355 * 356 * @exception NullPointerException if {@code event} is {@code null} 357 * 358 * @see #accept(EventQueue) 359 */ 360 protected abstract void accept(final AbstractEvent<? extends T> event); 361 362}