< prev index next >

bot/src/main/java/org/openjdk/skara/bot/BotRunner.java

Print this page

 27 import java.io.IOException;
 28 import java.nio.file.Path;
 29 import java.time.*;
 30 import java.util.*;
 31 import java.util.concurrent.*;
 32 import java.util.logging.*;
 33 import java.util.stream.Collectors;
 34 
 35 class BotRunnerError extends RuntimeException {
 36     BotRunnerError(String msg) {
 37         super(msg);
 38     }
 39 
 40     BotRunnerError(String msg, Throwable suppressed) {
 41         super(msg);
 42         addSuppressed(suppressed);
 43     }
 44 }
 45 
 46 public class BotRunner {
 47 
 48     enum TaskPhases {
 49         BEGIN,
 50         END
 51     }
 52 
 53     private class RunnableWorkItem implements Runnable {
 54         private final WorkItem item;
 55 
 56         RunnableWorkItem(WorkItem wrappedItem) {
 57             item = wrappedItem;
 58         }
 59 
 60         public WorkItem get() {
 61             return item;
 62         }
 63 
 64         @Override
 65         public void run() {
 66             Path scratchPath;
 67 

 74                 scratchPath = scratchPaths.removeFirst();
 75             }
 76 
 77             log.log(Level.FINE, "Executing item " + item + " on repository " + scratchPath, TaskPhases.BEGIN);
 78             try {
 79                 item.run(scratchPath);
 80             } catch (RuntimeException e) {
 81                 log.severe("Exception during item execution (" + item + "): " + e.getMessage());
 82                 item.handleRuntimeException(e);
 83                 log.throwing(item.toString(), "run", e);
 84             } finally {
 85                 log.log(Level.FINE, "Item " + item + " is now done", TaskPhases.END);
 86             }
 87 
 88             synchronized (executor) {
 89                 scratchPaths.addLast(scratchPath);
 90                 active.remove(item);
 91 
 92                 // Some of the pending items may now be eligible for execution
 93                 var candidateItems = pending.entrySet().stream()
 94                                             .filter(e -> !e.getValue().isPresent() || !active.contains(e.getValue().get()))
 95                                             .map(Map.Entry::getKey)
 96                                             .collect(Collectors.toList());
 97 
 98                 // Try the candidates against the current active set
 99                 for (var candidate : candidateItems) {
100                     boolean maySubmit = true;
101                     for (var activeItem : active) {
102                         if (!activeItem.concurrentWith(candidate)) {
103                             // Still can't run this candidate, leave it pending
104                             log.finer("Cannot submit candidate " + candidate + " - not concurrent with " + activeItem);
105                             maySubmit = false;
106                             break;
107                         }
108                     }
109 
110                     if (maySubmit) {
111                         pending.remove(candidate);
112                         executor.submit(new RunnableWorkItem(candidate));
113                         active.add(candidate);
114                         log.finer("Submitting candidate: " + candidate);
115                     }
116                 }
117             }
118 
119         }
120     }
121 
122     private final Map<WorkItem, Optional<WorkItem>> pending;
123     private final Set<WorkItem> active;
124     private final Deque<Path> scratchPaths;
125 
126     private void submitOrSchedule(WorkItem item) {
127 
128         synchronized (executor) {
129             for (var activeItem : active) {
130                 if (!activeItem.concurrentWith(item)) {
131 
132                     for (var pendingItem : pending.entrySet()) {
133                         // If there are pending items of the same type that we cannot run concurrently with, replace them.
134                         if (pendingItem.getKey().getClass().equals(item.getClass()) && !pendingItem.getKey().concurrentWith(item)) {
135                             log.finer("Discarding obsoleted item " + pendingItem.getKey() +
136                                               " in favor of item " + item);
137                             pending.remove(pendingItem.getKey());
138                             // There can't be more than one
139                             break;
140                         }
141                     }
142 
143                     pending.put(item, Optional.of(activeItem));
144                     return;
145                 }
146             }
147 
148             executor.submit(new RunnableWorkItem(item));
149             active.add(item);
150         }
151     }
152 
153     private void drain(Duration timeout) throws TimeoutException {
154 
155         Instant start = Instant.now();
156 
157         while (Instant.now().isBefore(start.plus(timeout))) {
158             while (true) {
159                 var head = (ScheduledFuture<?>) executor.getQueue().peek();
160                 if (head != null) {
161                     log.fine("Waiting for future to complete");
162                     try {
163                         head.get();
164                     } catch (InterruptedException | ExecutionException e) {
165                         log.warning("Exception during queue drain");
166                         log.throwing("BotRunner", "drain", e);
167                     }
168                 } else {
169                     log.finest("Queue is now empty");
170                     break;
171                 }
172             }
173 
174             synchronized (executor) {
175                 if (pending.isEmpty() && active.isEmpty()) {
176                     log.fine("Nothing awaiting scheduling - drain is finished");
177                     return;
178                 } else {
179                     log.finest("Waiting for flighted tasks");
180                 }
181             }
182             try {
183                 Thread.sleep(1);

184             } catch (InterruptedException e) {
185                 log.warning("Exception during queue drain");
186                 log.throwing("BotRunner", "drain", e);
187             }
188         }
189 
190         throw new TimeoutException();
191     }
192 
193     private final BotRunnerConfiguration config;
194     private final List<Bot> bots;
195     private final ScheduledThreadPoolExecutor executor;
196     private final Logger log;
197 
198     public BotRunner(BotRunnerConfiguration config, List<Bot> bots) {
199         this.config = config;
200         this.bots = bots;
201 
202         pending = new HashMap<>();
203         active = new HashSet<>();
204         scratchPaths = new LinkedList<>();
205 
206         for (int i = 0; i < config.concurrency(); ++i) {
207             var folder = config.scratchFolder().resolve("scratch-" + i);
208             scratchPaths.addLast(folder);
209         }
210 
211         executor = new ScheduledThreadPoolExecutor(config.concurrency());
212         log = Logger.getLogger("org.openjdk.skara.bot");
213     }
214 
215     private void checkPeriodicItems() {
216         log.log(Level.FINE, "Starting of checking for periodic items", TaskPhases.BEGIN);
217         try {
218             for (var bot : bots) {
219                 var items = bot.getPeriodicItems();
220                 for (var item : items) {
221                     submitOrSchedule(item);
222                 }
223             }














232     private void processRestRequest(JSONValue request) {
233         log.log(Level.FINE, "Starting processing of incoming rest request", TaskPhases.BEGIN);
234         log.fine("Request: " + request);
235         try {
236             for (var bot : bots) {
237                 var items = bot.processWebHook(request);
238                 for (var item : items) {
239                     submitOrSchedule(item);
240                 }
241             }
242         } catch (RuntimeException e) {
243             log.severe("Exception during rest request processing: " + e.getMessage());
244             log.throwing("BotRunner", "processRestRequest", e);
245         } finally {
246             log.log(Level.FINE, "Done processing incoming rest request", TaskPhases.END);
247         }
248     }
249 
250     public void run() {
251         log.info("Starting BotRunner execution, will run forever.");


266         executor.scheduleAtFixedRate(this::checkPeriodicItems, 0,
267                                      config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);
268 
269         try {
270             executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
271         } catch (InterruptedException e) {
272             e.printStackTrace();
273         }
274 
275         if (restReceiver != null) {
276             restReceiver.close();
277         }
278         executor.shutdown();
279     }
280 
281     public void runOnce(Duration timeout) throws TimeoutException {
282         log.info("Starting BotRunner execution, will run once");
283         log.info("Timeout: " + timeout);
284         log.info("Concurrency: " + config.concurrency());
285 

 27 import java.io.IOException;
 28 import java.nio.file.Path;
 29 import java.time.*;
 30 import java.util.*;
 31 import java.util.concurrent.*;
 32 import java.util.logging.*;
 33 import java.util.stream.Collectors;
 34 
 35 class BotRunnerError extends RuntimeException {
 36     BotRunnerError(String msg) {
 37         super(msg);
 38     }
 39 
 40     BotRunnerError(String msg, Throwable suppressed) {
 41         super(msg);
 42         addSuppressed(suppressed);
 43     }
 44 }
 45 
 46 public class BotRunner {

 47     enum TaskPhases {
 48         BEGIN,
 49         END
 50     }
 51 
 52     private class RunnableWorkItem implements Runnable {
 53         private final WorkItem item;
 54 
 55         RunnableWorkItem(WorkItem wrappedItem) {
 56             item = wrappedItem;
 57         }
 58 
 59         public WorkItem get() {
 60             return item;
 61         }
 62 
 63         @Override
 64         public void run() {
 65             Path scratchPath;
 66 

 73                 scratchPath = scratchPaths.removeFirst();
 74             }
 75 
 76             log.log(Level.FINE, "Executing item " + item + " on repository " + scratchPath, TaskPhases.BEGIN);
 77             try {
 78                 item.run(scratchPath);
 79             } catch (RuntimeException e) {
 80                 log.severe("Exception during item execution (" + item + "): " + e.getMessage());
 81                 item.handleRuntimeException(e);
 82                 log.throwing(item.toString(), "run", e);
 83             } finally {
 84                 log.log(Level.FINE, "Item " + item + " is now done", TaskPhases.END);
 85             }
 86 
 87             synchronized (executor) {
 88                 scratchPaths.addLast(scratchPath);
 89                 active.remove(item);
 90 
 91                 // Some of the pending items may now be eligible for execution
 92                 var candidateItems = pending.entrySet().stream()
 93                                             .filter(e -> e.getValue().isEmpty() || !active.containsKey(e.getValue().get()))
 94                                             .map(Map.Entry::getKey)
 95                                             .collect(Collectors.toList());
 96 
 97                 // Try the candidates against the current active set
 98                 for (var candidate : candidateItems) {
 99                     boolean maySubmit = true;
100                     for (var activeItem : active.keySet()) {
101                         if (!activeItem.concurrentWith(candidate)) {
102                             // Still can't run this candidate, leave it pending
103                             log.finer("Cannot submit candidate " + candidate + " - not concurrent with " + activeItem);
104                             maySubmit = false;
105                             break;
106                         }
107                     }
108 
109                     if (maySubmit) {
110                         pending.remove(candidate);
111                         executor.submit(new RunnableWorkItem(candidate));
112                         active.put(candidate, Instant.now());
113                         log.finer("Submitting candidate: " + candidate);
114                     }
115                 }
116             }

117         }
118     }
119 
120     private final Map<WorkItem, Optional<WorkItem>> pending;
121     private final Map<WorkItem, Instant> active;
122     private final Deque<Path> scratchPaths;
123 
124     private void submitOrSchedule(WorkItem item) {

125         synchronized (executor) {
126             for (var activeItem : active.keySet()) {
127                 if (!activeItem.concurrentWith(item)) {
128 
129                     for (var pendingItem : pending.entrySet()) {
130                         // If there are pending items of the same type that we cannot run concurrently with, replace them.
131                         if (pendingItem.getKey().getClass().equals(item.getClass()) && !pendingItem.getKey().concurrentWith(item)) {
132                             log.finer("Discarding obsoleted item " + pendingItem.getKey() +
133                                               " in favor of item " + item);
134                             pending.remove(pendingItem.getKey());
135                             // There can't be more than one
136                             break;
137                         }
138                     }
139 
140                     pending.put(item, Optional.of(activeItem));
141                     return;
142                 }
143             }
144 
145             executor.submit(new RunnableWorkItem(item));
146             active.put(item, Instant.now());
147         }
148     }
149 
150     private void drain(Duration timeout) throws TimeoutException {

151         Instant start = Instant.now();
152 
153         while (Instant.now().isBefore(start.plus(timeout))) {
154             while (true) {
155                 var head = (ScheduledFuture<?>) executor.getQueue().peek();
156                 if (head != null) {
157                     log.fine("Waiting for future to complete");
158                     try {
159                         head.get();
160                     } catch (InterruptedException | ExecutionException e) {
161                         log.warning("Exception during queue drain");
162                         log.throwing("BotRunner", "drain", e);
163                     }
164                 } else {
165                     log.finest("Queue is now empty");
166                     break;
167                 }
168             }
169 
170             synchronized (executor) {
171                 if (pending.isEmpty() && active.isEmpty()) {
172                     log.fine("Nothing awaiting scheduling - drain is finished");
173                     return;
174                 } else {
175                     log.finest("Waiting for flighted tasks");
176                 }
177             }
178             try {
179                 Thread.sleep(1);
180                 watchdog();
181             } catch (InterruptedException e) {
182                 log.warning("Exception during queue drain");
183                 log.throwing("BotRunner", "drain", e);
184             }
185         }
186 
187         throw new TimeoutException();
188     }
189 
190     private final BotRunnerConfiguration config;
191     private final List<Bot> bots;
192     private final ScheduledThreadPoolExecutor executor;
193     private final Logger log;
194 
195     public BotRunner(BotRunnerConfiguration config, List<Bot> bots) {
196         this.config = config;
197         this.bots = bots;
198 
199         pending = new HashMap<>();
200         active = new HashMap<>();
201         scratchPaths = new LinkedList<>();
202 
203         for (int i = 0; i < config.concurrency(); ++i) {
204             var folder = config.scratchFolder().resolve("scratch-" + i);
205             scratchPaths.addLast(folder);
206         }
207 
208         executor = new ScheduledThreadPoolExecutor(config.concurrency());
209         log = Logger.getLogger("org.openjdk.skara.bot");
210     }
211 
212     private void checkPeriodicItems() {
213         log.log(Level.FINE, "Starting of checking for periodic items", TaskPhases.BEGIN);
214         try {
215             for (var bot : bots) {
216                 var items = bot.getPeriodicItems();
217                 for (var item : items) {
218                     submitOrSchedule(item);
219                 }
220             }
229     private void watchdog() {
230         synchronized (executor) {
231             for (var activeItem : active.entrySet()) {
232                 var activeDuration = Duration.between(activeItem.getValue(), Instant.now());
233                 if (activeDuration.compareTo(config.watchdogTimeout()) > 0) {
234                     log.severe("Item " + activeItem.getKey() + " has been active more than " + activeDuration +
235                                        " - this may be an error!");
236                     // Reset the counter to avoid continuous reporting - once every watchdogTimeout is enough
237                     activeItem.setValue(Instant.now());
238                 }
239             }
240         }
241     }
242 
243     private void processRestRequest(JSONValue request) {
244         log.log(Level.FINE, "Starting processing of incoming rest request", TaskPhases.BEGIN);
245         log.fine("Request: " + request);
246         try {
247             for (var bot : bots) {
248                 var items = bot.processWebHook(request);
249                 for (var item : items) {
250                     submitOrSchedule(item);
251                 }
252             }
253         } catch (RuntimeException e) {
254             log.severe("Exception during rest request processing: " + e.getMessage());
255             log.throwing("BotRunner", "processRestRequest", e);
256         } finally {
257             log.log(Level.FINE, "Done processing incoming rest request", TaskPhases.END);
258         }
259     }
260 
261     public void run() {
262         log.info("Starting BotRunner execution, will run forever.");
277         executor.scheduleAtFixedRate(this::watchdog, 0,
278                                      config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);
279         executor.scheduleAtFixedRate(this::checkPeriodicItems, 0,
280                                      config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);
281 
282         try {
283             executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
284         } catch (InterruptedException e) {
285             e.printStackTrace();
286         }
287 
288         if (restReceiver != null) {
289             restReceiver.close();
290         }
291         executor.shutdown();
292     }
293 
294     public void runOnce(Duration timeout) throws TimeoutException {
295         log.info("Starting BotRunner execution, will run once");
296         log.info("Timeout: " + timeout);
297         log.info("Concurrency: " + config.concurrency());
298 
< prev index next >