1 /* 2 * Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 package org.openjdk.skara.bot; 24 25 import org.openjdk.skara.json.JSONValue; 26 27 import java.io.IOException; 28 import java.nio.file.Path; 29 import java.time.*; 30 import java.util.*; 31 import java.util.concurrent.*; 32 import java.util.logging.*; 33 import java.util.stream.Collectors; 34 35 class BotRunnerError extends RuntimeException { 36 BotRunnerError(String msg) { 37 super(msg); 38 } 39 40 BotRunnerError(String msg, Throwable suppressed) { 41 super(msg); 42 addSuppressed(suppressed); 43 } 44 } 45 46 public class BotRunner { 47 48 enum TaskPhases { 49 BEGIN, 50 END 51 } 52 53 private class RunnableWorkItem implements Runnable { 54 private final WorkItem item; 55 56 RunnableWorkItem(WorkItem wrappedItem) { 57 item = wrappedItem; 58 } 59 60 public WorkItem get() { 61 return item; 62 } 63 64 @Override 65 public void run() { 66 Path scratchPath; 67 68 synchronized (executor) { 69 if (scratchPaths.isEmpty()) { 70 log.finer("No scratch paths available - postponing " + item); 71 pending.put(item, Optional.empty()); 72 return; 73 } 74 scratchPath = scratchPaths.removeFirst(); 75 } 76 77 log.log(Level.FINE, "Executing item " + item + " on repository " + scratchPath, TaskPhases.BEGIN); 78 try { 79 item.run(scratchPath); 80 } catch (RuntimeException e) { 81 log.severe("Exception during item execution (" + item + "): " + e.getMessage()); 82 item.handleRuntimeException(e); 83 log.throwing(item.toString(), "run", e); 84 } finally { 85 log.log(Level.FINE, "Item " + item + " is now done", TaskPhases.END); 86 } 87 88 synchronized (executor) { 89 scratchPaths.addLast(scratchPath); 90 active.remove(item); 91 92 // Some of the pending items may now be eligible for execution 93 var candidateItems = pending.entrySet().stream() 94 .filter(e -> !e.getValue().isPresent() || !active.contains(e.getValue().get())) 95 .map(Map.Entry::getKey) 96 .collect(Collectors.toList()); 97 98 // Try the candidates against the current active set 99 for (var candidate : candidateItems) { 100 boolean maySubmit = true; 101 for (var activeItem : active) { 102 if (!activeItem.concurrentWith(candidate)) { 103 // Still can't run this candidate, leave it pending 104 log.finer("Cannot submit candidate " + candidate + " - not concurrent with " + activeItem); 105 maySubmit = false; 106 break; 107 } 108 } 109 110 if (maySubmit) { 111 pending.remove(candidate); 112 executor.submit(new RunnableWorkItem(candidate)); 113 active.add(candidate); 114 log.finer("Submitting candidate: " + candidate); 115 } 116 } 117 } 118 119 } 120 } 121 122 private final Map<WorkItem, Optional<WorkItem>> pending; 123 private final Set<WorkItem> active; 124 private final Deque<Path> scratchPaths; 125 126 private void submitOrSchedule(WorkItem item) { 127 128 synchronized (executor) { 129 for (var activeItem : active) { 130 if (!activeItem.concurrentWith(item)) { 131 132 for (var pendingItem : pending.entrySet()) { 133 // If there are pending items of the same type that we cannot run concurrently with, replace them. 134 if (pendingItem.getKey().getClass().equals(item.getClass()) && !pendingItem.getKey().concurrentWith(item)) { 135 log.finer("Discarding obsoleted item " + pendingItem.getKey() + 136 " in favor of item " + item); 137 pending.remove(pendingItem.getKey()); 138 // There can't be more than one 139 break; 140 } 141 } 142 143 pending.put(item, Optional.of(activeItem)); 144 return; 145 } 146 } 147 148 executor.submit(new RunnableWorkItem(item)); 149 active.add(item); 150 } 151 } 152 153 private void drain(Duration timeout) throws TimeoutException { 154 155 Instant start = Instant.now(); 156 157 while (Instant.now().isBefore(start.plus(timeout))) { 158 while (true) { 159 var head = (ScheduledFuture<?>) executor.getQueue().peek(); 160 if (head != null) { 161 log.fine("Waiting for future to complete"); 162 try { 163 head.get(); 164 } catch (InterruptedException | ExecutionException e) { 165 log.warning("Exception during queue drain"); 166 log.throwing("BotRunner", "drain", e); 167 } 168 } else { 169 log.finest("Queue is now empty"); 170 break; 171 } 172 } 173 174 synchronized (executor) { 175 if (pending.isEmpty() && active.isEmpty()) { 176 log.fine("Nothing awaiting scheduling - drain is finished"); 177 return; 178 } else { 179 log.finest("Waiting for flighted tasks"); 180 } 181 } 182 try { 183 Thread.sleep(1); 184 } catch (InterruptedException e) { 185 log.warning("Exception during queue drain"); 186 log.throwing("BotRunner", "drain", e); 187 } 188 } 189 190 throw new TimeoutException(); 191 } 192 193 private final BotRunnerConfiguration config; 194 private final List<Bot> bots; 195 private final ScheduledThreadPoolExecutor executor; 196 private final Logger log; 197 198 public BotRunner(BotRunnerConfiguration config, List<Bot> bots) { 199 this.config = config; 200 this.bots = bots; 201 202 pending = new HashMap<>(); 203 active = new HashSet<>(); 204 scratchPaths = new LinkedList<>(); 205 206 for (int i = 0; i < config.concurrency(); ++i) { 207 var folder = config.scratchFolder().resolve("scratch-" + i); 208 scratchPaths.addLast(folder); 209 } 210 211 executor = new ScheduledThreadPoolExecutor(config.concurrency()); 212 log = Logger.getLogger("org.openjdk.skara.bot"); 213 } 214 215 private void checkPeriodicItems() { 216 log.log(Level.FINE, "Starting of checking for periodic items", TaskPhases.BEGIN); 217 try { 218 for (var bot : bots) { 219 var items = bot.getPeriodicItems(); 220 for (var item : items) { 221 submitOrSchedule(item); 222 } 223 } 224 } catch (RuntimeException e) { 225 log.severe("Exception during periodic item checking: " + e.getMessage()); 226 log.throwing("BotRunner", "checkPeriodicItems", e); 227 } finally { 228 log.log(Level.FINE, "Done checking periodic items", TaskPhases.END); 229 } 230 } 231 232 private void processRestRequest(JSONValue request) { 233 log.log(Level.FINE, "Starting processing of incoming rest request", TaskPhases.BEGIN); 234 log.fine("Request: " + request); 235 try { 236 for (var bot : bots) { 237 var items = bot.processWebHook(request); 238 for (var item : items) { 239 submitOrSchedule(item); 240 } 241 } 242 } catch (RuntimeException e) { 243 log.severe("Exception during rest request processing: " + e.getMessage()); 244 log.throwing("BotRunner", "processRestRequest", e); 245 } finally { 246 log.log(Level.FINE, "Done processing incoming rest request", TaskPhases.END); 247 } 248 } 249 250 public void run() { 251 log.info("Starting BotRunner execution, will run forever."); 252 log.info("Periodic task interval: " + config.scheduledExecutionPeriod()); 253 log.info("Concurrency: " + config.concurrency()); 254 255 RestReceiver restReceiver = null; 256 if (config.restReceiverPort().isPresent()) { 257 log.info("Listening for webhooks on port: " + config.restReceiverPort().get()); 258 try { 259 restReceiver = new RestReceiver(config.restReceiverPort().get(), this::processRestRequest); 260 } catch (IOException e) { 261 log.warning("Failed to create RestReceiver"); 262 log.throwing("BotRunner", "run", e); 263 } 264 } 265 266 executor.scheduleAtFixedRate(this::checkPeriodicItems, 0, 267 config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS); 268 269 try { 270 executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 271 } catch (InterruptedException e) { 272 e.printStackTrace(); 273 } 274 275 if (restReceiver != null) { 276 restReceiver.close(); 277 } 278 executor.shutdown(); 279 } 280 281 public void runOnce(Duration timeout) throws TimeoutException { 282 log.info("Starting BotRunner execution, will run once"); 283 log.info("Timeout: " + timeout); 284 log.info("Concurrency: " + config.concurrency()); 285 286 var periodics = executor.submit(this::checkPeriodicItems); 287 try { 288 log.fine("Make sure periodics execute at least once"); 289 periodics.get(); 290 log.fine("Periodics have now run"); 291 } catch (InterruptedException e) { 292 throw new BotRunnerError("Interrupted", e); 293 } catch (ExecutionException e) { 294 throw new BotRunnerError("Execution error", e); 295 } 296 log.fine("Waiting for all spawned tasks"); 297 drain(timeout); 298 299 log.fine("Done waiting for all tasks"); 300 executor.shutdown(); 301 } 302 }