1 /*
  2  * Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 package org.openjdk.skara.bot;
 24 
 25 import org.openjdk.skara.json.JSONValue;
 26 
 27 import java.io.IOException;
 28 import java.nio.file.Path;
 29 import java.time.*;
 30 import java.util.*;
 31 import java.util.concurrent.*;
 32 import java.util.logging.*;
 33 import java.util.stream.Collectors;
 34 
 35 class BotRunnerError extends RuntimeException {
 36     BotRunnerError(String msg) {
 37         super(msg);
 38     }
 39 
 40     BotRunnerError(String msg, Throwable suppressed) {
 41         super(msg);
 42         addSuppressed(suppressed);
 43     }
 44 }
 45 
 46 public class BotRunner {
 47     enum TaskPhases {
 48         BEGIN,
 49         END
 50     }
 51 
 52     private class RunnableWorkItem implements Runnable {
 53         private final WorkItem item;
 54 
 55         RunnableWorkItem(WorkItem wrappedItem) {
 56             item = wrappedItem;
 57         }
 58 
 59         public WorkItem get() {
 60             return item;
 61         }
 62 
 63         @Override
 64         public void run() {
 65             Path scratchPath;
 66 
 67             synchronized (executor) {
 68                 if (scratchPaths.isEmpty()) {
 69                     log.finer("No scratch paths available - postponing " + item);
 70                     pending.put(item, Optional.empty());
 71                     return;
 72                 }
 73                 scratchPath = scratchPaths.removeFirst();
 74             }
 75 
 76             log.log(Level.FINE, "Executing item " + item + " on repository " + scratchPath, TaskPhases.BEGIN);
 77             try {
 78                 item.run(scratchPath);
 79             } catch (RuntimeException e) {
 80                 log.severe("Exception during item execution (" + item + "): " + e.getMessage());
 81                 item.handleRuntimeException(e);
 82                 log.throwing(item.toString(), "run", e);
 83             } finally {
 84                 log.log(Level.FINE, "Item " + item + " is now done", TaskPhases.END);
 85             }
 86 
 87             synchronized (executor) {
 88                 scratchPaths.addLast(scratchPath);
 89                 active.remove(item);
 90 
 91                 // Some of the pending items may now be eligible for execution
 92                 var candidateItems = pending.entrySet().stream()
 93                                             .filter(e -> e.getValue().isEmpty() || !active.containsKey(e.getValue().get()))
 94                                             .map(Map.Entry::getKey)
 95                                             .collect(Collectors.toList());
 96 
 97                 // Try the candidates against the current active set
 98                 for (var candidate : candidateItems) {
 99                     boolean maySubmit = true;
100                     for (var activeItem : active.keySet()) {
101                         if (!activeItem.concurrentWith(candidate)) {
102                             // Still can't run this candidate, leave it pending
103                             log.finer("Cannot submit candidate " + candidate + " - not concurrent with " + activeItem);
104                             maySubmit = false;
105                             break;
106                         }
107                     }
108 
109                     if (maySubmit) {
110                         pending.remove(candidate);
111                         executor.submit(new RunnableWorkItem(candidate));
112                         active.put(candidate, Instant.now());
113                         log.finer("Submitting candidate: " + candidate);
114                     }
115                 }
116             }
117         }
118     }
119 
120     private final Map<WorkItem, Optional<WorkItem>> pending;
121     private final Map<WorkItem, Instant> active;
122     private final Deque<Path> scratchPaths;
123 
124     private void submitOrSchedule(WorkItem item) {
125         synchronized (executor) {
126             for (var activeItem : active.keySet()) {
127                 if (!activeItem.concurrentWith(item)) {
128 
129                     for (var pendingItem : pending.entrySet()) {
130                         // If there are pending items of the same type that we cannot run concurrently with, replace them.
131                         if (pendingItem.getKey().getClass().equals(item.getClass()) && !pendingItem.getKey().concurrentWith(item)) {
132                             log.finer("Discarding obsoleted item " + pendingItem.getKey() +
133                                               " in favor of item " + item);
134                             pending.remove(pendingItem.getKey());
135                             // There can't be more than one
136                             break;
137                         }
138                     }
139 
140                     pending.put(item, Optional.of(activeItem));
141                     return;
142                 }
143             }
144 
145             executor.submit(new RunnableWorkItem(item));
146             active.put(item, Instant.now());
147         }
148     }
149 
150     private void drain(Duration timeout) throws TimeoutException {
151         Instant start = Instant.now();
152 
153         while (Instant.now().isBefore(start.plus(timeout))) {
154             while (true) {
155                 var head = (ScheduledFuture<?>) executor.getQueue().peek();
156                 if (head != null) {
157                     log.fine("Waiting for future to complete");
158                     try {
159                         head.get();
160                     } catch (InterruptedException | ExecutionException e) {
161                         log.warning("Exception during queue drain");
162                         log.throwing("BotRunner", "drain", e);
163                     }
164                 } else {
165                     log.finest("Queue is now empty");
166                     break;
167                 }
168             }
169 
170             synchronized (executor) {
171                 if (pending.isEmpty() && active.isEmpty()) {
172                     log.fine("Nothing awaiting scheduling - drain is finished");
173                     return;
174                 } else {
175                     log.finest("Waiting for flighted tasks");
176                 }
177             }
178             try {
179                 Thread.sleep(1);
180                 watchdog();
181             } catch (InterruptedException e) {
182                 log.warning("Exception during queue drain");
183                 log.throwing("BotRunner", "drain", e);
184             }
185         }
186 
187         throw new TimeoutException();
188     }
189 
190     private final BotRunnerConfiguration config;
191     private final List<Bot> bots;
192     private final ScheduledThreadPoolExecutor executor;
193     private final Logger log;
194 
195     public BotRunner(BotRunnerConfiguration config, List<Bot> bots) {
196         this.config = config;
197         this.bots = bots;
198 
199         pending = new HashMap<>();
200         active = new HashMap<>();
201         scratchPaths = new LinkedList<>();
202 
203         for (int i = 0; i < config.concurrency(); ++i) {
204             var folder = config.scratchFolder().resolve("scratch-" + i);
205             scratchPaths.addLast(folder);
206         }
207 
208         executor = new ScheduledThreadPoolExecutor(config.concurrency());
209         log = Logger.getLogger("org.openjdk.skara.bot");
210     }
211 
212     private void checkPeriodicItems() {
213         log.log(Level.FINE, "Starting of checking for periodic items", TaskPhases.BEGIN);
214         try {
215             for (var bot : bots) {
216                 var items = bot.getPeriodicItems();
217                 for (var item : items) {
218                     submitOrSchedule(item);
219                 }
220             }
221         } catch (RuntimeException e) {
222             log.severe("Exception during periodic item checking: " + e.getMessage());
223             log.throwing("BotRunner", "checkPeriodicItems", e);
224         } finally {
225             log.log(Level.FINE, "Done checking periodic items", TaskPhases.END);
226         }
227     }
228 
229     private void watchdog() {
230         synchronized (executor) {
231             for (var activeItem : active.entrySet()) {
232                 var activeDuration = Duration.between(activeItem.getValue(), Instant.now());
233                 if (activeDuration.compareTo(config.watchdogTimeout()) > 0) {
234                     log.severe("Item " + activeItem.getKey() + " has been active more than " + activeDuration +
235                                        " - this may be an error!");
236                     // Reset the counter to avoid continuous reporting - once every watchdogTimeout is enough
237                     activeItem.setValue(Instant.now());
238                 }
239             }
240         }
241     }
242 
243     private void processRestRequest(JSONValue request) {
244         log.log(Level.FINE, "Starting processing of incoming rest request", TaskPhases.BEGIN);
245         log.fine("Request: " + request);
246         try {
247             for (var bot : bots) {
248                 var items = bot.processWebHook(request);
249                 for (var item : items) {
250                     submitOrSchedule(item);
251                 }
252             }
253         } catch (RuntimeException e) {
254             log.severe("Exception during rest request processing: " + e.getMessage());
255             log.throwing("BotRunner", "processRestRequest", e);
256         } finally {
257             log.log(Level.FINE, "Done processing incoming rest request", TaskPhases.END);
258         }
259     }
260 
261     public void run() {
262         log.info("Starting BotRunner execution, will run forever.");
263         log.info("Periodic task interval: " + config.scheduledExecutionPeriod());
264         log.info("Concurrency: " + config.concurrency());
265 
266         RestReceiver restReceiver = null;
267         if (config.restReceiverPort().isPresent()) {
268             log.info("Listening for webhooks on port: " + config.restReceiverPort().get());
269             try {
270                 restReceiver = new RestReceiver(config.restReceiverPort().get(), this::processRestRequest);
271             } catch (IOException e) {
272                 log.warning("Failed to create RestReceiver");
273                 log.throwing("BotRunner", "run", e);
274             }
275         }
276 
277         executor.scheduleAtFixedRate(this::watchdog, 0,
278                                      config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);
279         executor.scheduleAtFixedRate(this::checkPeriodicItems, 0,
280                                      config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);
281 
282         try {
283             executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
284         } catch (InterruptedException e) {
285             e.printStackTrace();
286         }
287 
288         if (restReceiver != null) {
289             restReceiver.close();
290         }
291         executor.shutdown();
292     }
293 
294     public void runOnce(Duration timeout) throws TimeoutException {
295         log.info("Starting BotRunner execution, will run once");
296         log.info("Timeout: " + timeout);
297         log.info("Concurrency: " + config.concurrency());
298 
299         var periodics = executor.submit(this::checkPeriodicItems);
300         try {
301             log.fine("Make sure periodics execute at least once");
302             periodics.get();
303             log.fine("Periodics have now run");
304         } catch (InterruptedException e) {
305             throw new BotRunnerError("Interrupted", e);
306         } catch (ExecutionException e) {
307             throw new BotRunnerError("Execution error", e);
308         }
309         log.fine("Waiting for all spawned tasks");
310         drain(timeout);
311 
312         log.fine("Done waiting for all tasks");
313         executor.shutdown();
314     }
315 }