1 /* 2 * Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 package org.openjdk.skara.bot; 24 25 import org.openjdk.skara.json.JSONValue; 26 27 import java.io.IOException; 28 import java.nio.file.Path; 29 import java.time.*; 30 import java.util.*; 31 import java.util.concurrent.*; 32 import java.util.logging.*; 33 import java.util.stream.Collectors; 34 35 class BotRunnerError extends RuntimeException { 36 BotRunnerError(String msg) { 37 super(msg); 38 } 39 40 BotRunnerError(String msg, Throwable suppressed) { 41 super(msg); 42 addSuppressed(suppressed); 43 } 44 } 45 46 public class BotRunner { 47 enum TaskPhases { 48 BEGIN, 49 END 50 } 51 52 private class RunnableWorkItem implements Runnable { 53 private final WorkItem item; 54 55 RunnableWorkItem(WorkItem wrappedItem) { 56 item = wrappedItem; 57 } 58 59 public WorkItem get() { 60 return item; 61 } 62 63 @Override 64 public void run() { 65 Path scratchPath; 66 67 synchronized (executor) { 68 if (scratchPaths.isEmpty()) { 69 log.finer("No scratch paths available - postponing " + item); 70 pending.put(item, Optional.empty()); 71 return; 72 } 73 scratchPath = scratchPaths.removeFirst(); 74 } 75 76 log.log(Level.FINE, "Executing item " + item + " on repository " + scratchPath, TaskPhases.BEGIN); 77 try { 78 item.run(scratchPath); 79 } catch (RuntimeException e) { 80 log.severe("Exception during item execution (" + item + "): " + e.getMessage()); 81 item.handleRuntimeException(e); 82 log.throwing(item.toString(), "run", e); 83 } finally { 84 log.log(Level.FINE, "Item " + item + " is now done", TaskPhases.END); 85 } 86 87 synchronized (executor) { 88 scratchPaths.addLast(scratchPath); 89 active.remove(item); 90 91 // Some of the pending items may now be eligible for execution 92 var candidateItems = pending.entrySet().stream() 93 .filter(e -> e.getValue().isEmpty() || !active.containsKey(e.getValue().get())) 94 .map(Map.Entry::getKey) 95 .collect(Collectors.toList()); 96 97 // Try the candidates against the current active set 98 for (var candidate : candidateItems) { 99 boolean maySubmit = true; 100 for (var activeItem : active.keySet()) { 101 if (!activeItem.concurrentWith(candidate)) { 102 // Still can't run this candidate, leave it pending 103 log.finer("Cannot submit candidate " + candidate + " - not concurrent with " + activeItem); 104 maySubmit = false; 105 break; 106 } 107 } 108 109 if (maySubmit) { 110 pending.remove(candidate); 111 executor.submit(new RunnableWorkItem(candidate)); 112 active.put(candidate, Instant.now()); 113 log.finer("Submitting candidate: " + candidate); 114 } 115 } 116 } 117 } 118 } 119 120 private final Map<WorkItem, Optional<WorkItem>> pending; 121 private final Map<WorkItem, Instant> active; 122 private final Deque<Path> scratchPaths; 123 124 private void submitOrSchedule(WorkItem item) { 125 synchronized (executor) { 126 for (var activeItem : active.keySet()) { 127 if (!activeItem.concurrentWith(item)) { 128 129 for (var pendingItem : pending.entrySet()) { 130 // If there are pending items of the same type that we cannot run concurrently with, replace them. 131 if (pendingItem.getKey().getClass().equals(item.getClass()) && !pendingItem.getKey().concurrentWith(item)) { 132 log.finer("Discarding obsoleted item " + pendingItem.getKey() + 133 " in favor of item " + item); 134 pending.remove(pendingItem.getKey()); 135 // There can't be more than one 136 break; 137 } 138 } 139 140 pending.put(item, Optional.of(activeItem)); 141 return; 142 } 143 } 144 145 executor.submit(new RunnableWorkItem(item)); 146 active.put(item, Instant.now()); 147 } 148 } 149 150 private void drain(Duration timeout) throws TimeoutException { 151 Instant start = Instant.now(); 152 153 while (Instant.now().isBefore(start.plus(timeout))) { 154 while (true) { 155 var head = (ScheduledFuture<?>) executor.getQueue().peek(); 156 if (head != null) { 157 log.fine("Waiting for future to complete"); 158 try { 159 head.get(); 160 } catch (InterruptedException | ExecutionException e) { 161 log.warning("Exception during queue drain"); 162 log.throwing("BotRunner", "drain", e); 163 } 164 } else { 165 log.finest("Queue is now empty"); 166 break; 167 } 168 } 169 170 synchronized (executor) { 171 if (pending.isEmpty() && active.isEmpty()) { 172 log.fine("Nothing awaiting scheduling - drain is finished"); 173 return; 174 } else { 175 log.finest("Waiting for flighted tasks"); 176 } 177 } 178 try { 179 Thread.sleep(1); 180 watchdog(); 181 } catch (InterruptedException e) { 182 log.warning("Exception during queue drain"); 183 log.throwing("BotRunner", "drain", e); 184 } 185 } 186 187 throw new TimeoutException(); 188 } 189 190 private final BotRunnerConfiguration config; 191 private final List<Bot> bots; 192 private final ScheduledThreadPoolExecutor executor; 193 private final Logger log; 194 195 public BotRunner(BotRunnerConfiguration config, List<Bot> bots) { 196 this.config = config; 197 this.bots = bots; 198 199 pending = new HashMap<>(); 200 active = new HashMap<>(); 201 scratchPaths = new LinkedList<>(); 202 203 for (int i = 0; i < config.concurrency(); ++i) { 204 var folder = config.scratchFolder().resolve("scratch-" + i); 205 scratchPaths.addLast(folder); 206 } 207 208 executor = new ScheduledThreadPoolExecutor(config.concurrency()); 209 log = Logger.getLogger("org.openjdk.skara.bot"); 210 } 211 212 private void checkPeriodicItems() { 213 log.log(Level.FINE, "Starting of checking for periodic items", TaskPhases.BEGIN); 214 try { 215 for (var bot : bots) { 216 var items = bot.getPeriodicItems(); 217 for (var item : items) { 218 submitOrSchedule(item); 219 } 220 } 221 } catch (RuntimeException e) { 222 log.severe("Exception during periodic item checking: " + e.getMessage()); 223 log.throwing("BotRunner", "checkPeriodicItems", e); 224 } finally { 225 log.log(Level.FINE, "Done checking periodic items", TaskPhases.END); 226 } 227 } 228 229 private void watchdog() { 230 synchronized (executor) { 231 for (var activeItem : active.entrySet()) { 232 var activeDuration = Duration.between(activeItem.getValue(), Instant.now()); 233 if (activeDuration.compareTo(config.watchdogTimeout()) > 0) { 234 log.severe("Item " + activeItem.getKey() + " has been active more than " + activeDuration + 235 " - this may be an error!"); 236 // Reset the counter to avoid continuous reporting - once every watchdogTimeout is enough 237 activeItem.setValue(Instant.now()); 238 } 239 } 240 } 241 } 242 243 private void processRestRequest(JSONValue request) { 244 log.log(Level.FINE, "Starting processing of incoming rest request", TaskPhases.BEGIN); 245 log.fine("Request: " + request); 246 try { 247 for (var bot : bots) { 248 var items = bot.processWebHook(request); 249 for (var item : items) { 250 submitOrSchedule(item); 251 } 252 } 253 } catch (RuntimeException e) { 254 log.severe("Exception during rest request processing: " + e.getMessage()); 255 log.throwing("BotRunner", "processRestRequest", e); 256 } finally { 257 log.log(Level.FINE, "Done processing incoming rest request", TaskPhases.END); 258 } 259 } 260 261 public void run() { 262 log.info("Starting BotRunner execution, will run forever."); 263 log.info("Periodic task interval: " + config.scheduledExecutionPeriod()); 264 log.info("Concurrency: " + config.concurrency()); 265 266 RestReceiver restReceiver = null; 267 if (config.restReceiverPort().isPresent()) { 268 log.info("Listening for webhooks on port: " + config.restReceiverPort().get()); 269 try { 270 restReceiver = new RestReceiver(config.restReceiverPort().get(), this::processRestRequest); 271 } catch (IOException e) { 272 log.warning("Failed to create RestReceiver"); 273 log.throwing("BotRunner", "run", e); 274 } 275 } 276 277 executor.scheduleAtFixedRate(this::watchdog, 0, 278 config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS); 279 executor.scheduleAtFixedRate(this::checkPeriodicItems, 0, 280 config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS); 281 282 try { 283 executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 284 } catch (InterruptedException e) { 285 e.printStackTrace(); 286 } 287 288 if (restReceiver != null) { 289 restReceiver.close(); 290 } 291 executor.shutdown(); 292 } 293 294 public void runOnce(Duration timeout) throws TimeoutException { 295 log.info("Starting BotRunner execution, will run once"); 296 log.info("Timeout: " + timeout); 297 log.info("Concurrency: " + config.concurrency()); 298 299 var periodics = executor.submit(this::checkPeriodicItems); 300 try { 301 log.fine("Make sure periodics execute at least once"); 302 periodics.get(); 303 log.fine("Periodics have now run"); 304 } catch (InterruptedException e) { 305 throw new BotRunnerError("Interrupted", e); 306 } catch (ExecutionException e) { 307 throw new BotRunnerError("Execution error", e); 308 } 309 log.fine("Waiting for all spawned tasks"); 310 drain(timeout); 311 312 log.fine("Done waiting for all tasks"); 313 executor.shutdown(); 314 } 315 }