1 /*
  2  * Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 package org.openjdk.skara.bot;
 24 
 25 import org.openjdk.skara.json.JSONValue;
 26 
 27 import java.io.IOException;
 28 import java.nio.file.Path;
 29 import java.time.*;
 30 import java.util.*;
 31 import java.util.concurrent.*;
 32 import java.util.logging.*;
 33 import java.util.stream.Collectors;
 34 
 35 class BotRunnerError extends RuntimeException {
 36     BotRunnerError(String msg) {
 37         super(msg);
 38     }
 39 
 40     BotRunnerError(String msg, Throwable suppressed) {
 41         super(msg);
 42         addSuppressed(suppressed);
 43     }
 44 }
 45 
 46 public class BotRunner {
 47 
 48     enum TaskPhases {
 49         BEGIN,
 50         END
 51     }
 52 
 53     private class RunnableWorkItem implements Runnable {
 54         private final WorkItem item;
 55 
 56         RunnableWorkItem(WorkItem wrappedItem) {
 57             item = wrappedItem;
 58         }
 59 
 60         public WorkItem get() {
 61             return item;
 62         }
 63 
 64         @Override
 65         public void run() {
 66             Path scratchPath;
 67 
 68             synchronized (executor) {
 69                 if (scratchPaths.isEmpty()) {
 70                     log.finer("No scratch paths available - postponing " + item);
 71                     pending.put(item, Optional.empty());
 72                     return;
 73                 }
 74                 scratchPath = scratchPaths.removeFirst();
 75             }
 76 
 77             log.log(Level.FINE, "Executing item " + item + " on repository " + scratchPath, TaskPhases.BEGIN);
 78             try {
 79                 item.run(scratchPath);
 80             } catch (RuntimeException e) {
 81                 log.severe("Exception during item execution (" + item + "): " + e.getMessage());
 82                 item.handleRuntimeException(e);
 83                 log.throwing(item.toString(), "run", e);
 84             } finally {
 85                 log.log(Level.FINE, "Item " + item + " is now done", TaskPhases.END);
 86             }
 87 
 88             synchronized (executor) {
 89                 scratchPaths.addLast(scratchPath);
 90                 active.remove(item);
 91 
 92                 // Some of the pending items may now be eligible for execution
 93                 var candidateItems = pending.entrySet().stream()
 94                                             .filter(e -> !e.getValue().isPresent() || !active.contains(e.getValue().get()))
 95                                             .map(Map.Entry::getKey)
 96                                             .collect(Collectors.toList());
 97 
 98                 // Try the candidates against the current active set
 99                 for (var candidate : candidateItems) {
100                     boolean maySubmit = true;
101                     for (var activeItem : active) {
102                         if (!activeItem.concurrentWith(candidate)) {
103                             // Still can't run this candidate, leave it pending
104                             log.finer("Cannot submit candidate " + candidate + " - not concurrent with " + activeItem);
105                             maySubmit = false;
106                             break;
107                         }
108                     }
109 
110                     if (maySubmit) {
111                         pending.remove(candidate);
112                         executor.submit(new RunnableWorkItem(candidate));
113                         active.add(candidate);
114                         log.finer("Submitting candidate: " + candidate);
115                     }
116                 }
117             }
118 
119         }
120     }
121 
122     private final Map<WorkItem, Optional<WorkItem>> pending;
123     private final Set<WorkItem> active;
124     private final Deque<Path> scratchPaths;
125 
126     private void submitOrSchedule(WorkItem item) {
127 
128         synchronized (executor) {
129             for (var activeItem : active) {
130                 if (!activeItem.concurrentWith(item)) {
131 
132                     for (var pendingItem : pending.entrySet()) {
133                         // If there are pending items of the same type that we cannot run concurrently with, replace them.
134                         if (pendingItem.getKey().getClass().equals(item.getClass()) && !pendingItem.getKey().concurrentWith(item)) {
135                             log.finer("Discarding obsoleted item " + pendingItem.getKey() +
136                                               " in favor of item " + item);
137                             pending.remove(pendingItem.getKey());
138                             // There can't be more than one
139                             break;
140                         }
141                     }
142 
143                     pending.put(item, Optional.of(activeItem));
144                     return;
145                 }
146             }
147 
148             executor.submit(new RunnableWorkItem(item));
149             active.add(item);
150         }
151     }
152 
153     private void drain(Duration timeout) throws TimeoutException {
154 
155         Instant start = Instant.now();
156 
157         while (Instant.now().isBefore(start.plus(timeout))) {
158             while (true) {
159                 var head = (ScheduledFuture<?>) executor.getQueue().peek();
160                 if (head != null) {
161                     log.fine("Waiting for future to complete");
162                     try {
163                         head.get();
164                     } catch (InterruptedException | ExecutionException e) {
165                         log.warning("Exception during queue drain");
166                         log.throwing("BotRunner", "drain", e);
167                     }
168                 } else {
169                     log.finest("Queue is now empty");
170                     break;
171                 }
172             }
173 
174             synchronized (executor) {
175                 if (pending.isEmpty() && active.isEmpty()) {
176                     log.fine("Nothing awaiting scheduling - drain is finished");
177                     return;
178                 } else {
179                     log.finest("Waiting for flighted tasks");
180                 }
181             }
182             try {
183                 Thread.sleep(1);
184             } catch (InterruptedException e) {
185                 log.warning("Exception during queue drain");
186                 log.throwing("BotRunner", "drain", e);
187             }
188         }
189 
190         throw new TimeoutException();
191     }
192 
193     private final BotRunnerConfiguration config;
194     private final List<Bot> bots;
195     private final ScheduledThreadPoolExecutor executor;
196     private final Logger log;
197 
198     public BotRunner(BotRunnerConfiguration config, List<Bot> bots) {
199         this.config = config;
200         this.bots = bots;
201 
202         pending = new HashMap<>();
203         active = new HashSet<>();
204         scratchPaths = new LinkedList<>();
205 
206         for (int i = 0; i < config.concurrency(); ++i) {
207             var folder = config.scratchFolder().resolve("scratch-" + i);
208             scratchPaths.addLast(folder);
209         }
210 
211         executor = new ScheduledThreadPoolExecutor(config.concurrency());
212         log = Logger.getLogger("org.openjdk.skara.bot");
213     }
214 
215     private void checkPeriodicItems() {
216         log.log(Level.FINE, "Starting of checking for periodic items", TaskPhases.BEGIN);
217         try {
218             for (var bot : bots) {
219                 var items = bot.getPeriodicItems();
220                 for (var item : items) {
221                     submitOrSchedule(item);
222                 }
223             }
224         } catch (RuntimeException e) {
225             log.severe("Exception during periodic item checking: " + e.getMessage());
226             log.throwing("BotRunner", "checkPeriodicItems", e);
227         } finally {
228             log.log(Level.FINE, "Done checking periodic items", TaskPhases.END);
229         }
230     }
231 
232     private void processRestRequest(JSONValue request) {
233         log.log(Level.FINE, "Starting processing of incoming rest request", TaskPhases.BEGIN);
234         log.fine("Request: " + request);
235         try {
236             for (var bot : bots) {
237                 var items = bot.processWebHook(request);
238                 for (var item : items) {
239                     submitOrSchedule(item);
240                 }
241             }
242         } catch (RuntimeException e) {
243             log.severe("Exception during rest request processing: " + e.getMessage());
244             log.throwing("BotRunner", "processRestRequest", e);
245         } finally {
246             log.log(Level.FINE, "Done processing incoming rest request", TaskPhases.END);
247         }
248     }
249 
250     public void run() {
251         log.info("Starting BotRunner execution, will run forever.");
252         log.info("Periodic task interval: " + config.scheduledExecutionPeriod());
253         log.info("Concurrency: " + config.concurrency());
254 
255         RestReceiver restReceiver = null;
256         if (config.restReceiverPort().isPresent()) {
257             log.info("Listening for webhooks on port: " + config.restReceiverPort().get());
258             try {
259                 restReceiver = new RestReceiver(config.restReceiverPort().get(), this::processRestRequest);
260             } catch (IOException e) {
261                 log.warning("Failed to create RestReceiver");
262                 log.throwing("BotRunner", "run", e);
263             }
264         }
265 
266         executor.scheduleAtFixedRate(this::checkPeriodicItems, 0,
267                                      config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);
268 
269         try {
270             executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
271         } catch (InterruptedException e) {
272             e.printStackTrace();
273         }
274 
275         if (restReceiver != null) {
276             restReceiver.close();
277         }
278         executor.shutdown();
279     }
280 
281     public void runOnce(Duration timeout) throws TimeoutException {
282         log.info("Starting BotRunner execution, will run once");
283         log.info("Timeout: " + timeout);
284         log.info("Concurrency: " + config.concurrency());
285 
286         var periodics = executor.submit(this::checkPeriodicItems);
287         try {
288             log.fine("Make sure periodics execute at least once");
289             periodics.get();
290             log.fine("Periodics have now run");
291         } catch (InterruptedException e) {
292             throw new BotRunnerError("Interrupted", e);
293         } catch (ExecutionException e) {
294             throw new BotRunnerError("Execution error", e);
295         }
296         log.fine("Waiting for all spawned tasks");
297         drain(timeout);
298 
299         log.fine("Done waiting for all tasks");
300         executor.shutdown();
301     }
302 }