27 import java.io.IOException;
28 import java.nio.file.Path;
29 import java.time.*;
30 import java.util.*;
31 import java.util.concurrent.*;
32 import java.util.logging.*;
33 import java.util.stream.Collectors;
34
35 class BotRunnerError extends RuntimeException {
36 BotRunnerError(String msg) {
37 super(msg);
38 }
39
40 BotRunnerError(String msg, Throwable suppressed) {
41 super(msg);
42 addSuppressed(suppressed);
43 }
44 }
45
46 public class BotRunner {
47
48 enum TaskPhases {
49 BEGIN,
50 END
51 }
52
53 private class RunnableWorkItem implements Runnable {
54 private final WorkItem item;
55
56 RunnableWorkItem(WorkItem wrappedItem) {
57 item = wrappedItem;
58 }
59
60 public WorkItem get() {
61 return item;
62 }
63
64 @Override
65 public void run() {
66 Path scratchPath;
67
74 scratchPath = scratchPaths.removeFirst();
75 }
76
77 log.log(Level.FINE, "Executing item " + item + " on repository " + scratchPath, TaskPhases.BEGIN);
78 try {
79 item.run(scratchPath);
80 } catch (RuntimeException e) {
81 log.severe("Exception during item execution (" + item + "): " + e.getMessage());
82 item.handleRuntimeException(e);
83 log.throwing(item.toString(), "run", e);
84 } finally {
85 log.log(Level.FINE, "Item " + item + " is now done", TaskPhases.END);
86 }
87
88 synchronized (executor) {
89 scratchPaths.addLast(scratchPath);
90 active.remove(item);
91
92 // Some of the pending items may now be eligible for execution
93 var candidateItems = pending.entrySet().stream()
94 .filter(e -> !e.getValue().isPresent() || !active.contains(e.getValue().get()))
95 .map(Map.Entry::getKey)
96 .collect(Collectors.toList());
97
98 // Try the candidates against the current active set
99 for (var candidate : candidateItems) {
100 boolean maySubmit = true;
101 for (var activeItem : active) {
102 if (!activeItem.concurrentWith(candidate)) {
103 // Still can't run this candidate, leave it pending
104 log.finer("Cannot submit candidate " + candidate + " - not concurrent with " + activeItem);
105 maySubmit = false;
106 break;
107 }
108 }
109
110 if (maySubmit) {
111 pending.remove(candidate);
112 executor.submit(new RunnableWorkItem(candidate));
113 active.add(candidate);
114 log.finer("Submitting candidate: " + candidate);
115 }
116 }
117 }
118
119 }
120 }
121
122 private final Map<WorkItem, Optional<WorkItem>> pending;
123 private final Set<WorkItem> active;
124 private final Deque<Path> scratchPaths;
125
126 private void submitOrSchedule(WorkItem item) {
127
128 synchronized (executor) {
129 for (var activeItem : active) {
130 if (!activeItem.concurrentWith(item)) {
131
132 for (var pendingItem : pending.entrySet()) {
133 // If there are pending items of the same type that we cannot run concurrently with, replace them.
134 if (pendingItem.getKey().getClass().equals(item.getClass()) && !pendingItem.getKey().concurrentWith(item)) {
135 log.finer("Discarding obsoleted item " + pendingItem.getKey() +
136 " in favor of item " + item);
137 pending.remove(pendingItem.getKey());
138 // There can't be more than one
139 break;
140 }
141 }
142
143 pending.put(item, Optional.of(activeItem));
144 return;
145 }
146 }
147
148 executor.submit(new RunnableWorkItem(item));
149 active.add(item);
150 }
151 }
152
153 private void drain(Duration timeout) throws TimeoutException {
154
155 Instant start = Instant.now();
156
157 while (Instant.now().isBefore(start.plus(timeout))) {
158 while (true) {
159 var head = (ScheduledFuture<?>) executor.getQueue().peek();
160 if (head != null) {
161 log.fine("Waiting for future to complete");
162 try {
163 head.get();
164 } catch (InterruptedException | ExecutionException e) {
165 log.warning("Exception during queue drain");
166 log.throwing("BotRunner", "drain", e);
167 }
168 } else {
169 log.finest("Queue is now empty");
170 break;
171 }
172 }
173
174 synchronized (executor) {
175 if (pending.isEmpty() && active.isEmpty()) {
176 log.fine("Nothing awaiting scheduling - drain is finished");
177 return;
178 } else {
179 log.finest("Waiting for flighted tasks");
180 }
181 }
182 try {
183 Thread.sleep(1);
184 } catch (InterruptedException e) {
185 log.warning("Exception during queue drain");
186 log.throwing("BotRunner", "drain", e);
187 }
188 }
189
190 throw new TimeoutException();
191 }
192
193 private final BotRunnerConfiguration config;
194 private final List<Bot> bots;
195 private final ScheduledThreadPoolExecutor executor;
196 private final Logger log;
197
198 public BotRunner(BotRunnerConfiguration config, List<Bot> bots) {
199 this.config = config;
200 this.bots = bots;
201
202 pending = new HashMap<>();
203 active = new HashSet<>();
204 scratchPaths = new LinkedList<>();
205
206 for (int i = 0; i < config.concurrency(); ++i) {
207 var folder = config.scratchFolder().resolve("scratch-" + i);
208 scratchPaths.addLast(folder);
209 }
210
211 executor = new ScheduledThreadPoolExecutor(config.concurrency());
212 log = Logger.getLogger("org.openjdk.skara.bot");
213 }
214
215 private void checkPeriodicItems() {
216 log.log(Level.FINE, "Starting of checking for periodic items", TaskPhases.BEGIN);
217 try {
218 for (var bot : bots) {
219 var items = bot.getPeriodicItems();
220 for (var item : items) {
221 submitOrSchedule(item);
222 }
223 }
232 private void processRestRequest(JSONValue request) {
233 log.log(Level.FINE, "Starting processing of incoming rest request", TaskPhases.BEGIN);
234 log.fine("Request: " + request);
235 try {
236 for (var bot : bots) {
237 var items = bot.processWebHook(request);
238 for (var item : items) {
239 submitOrSchedule(item);
240 }
241 }
242 } catch (RuntimeException e) {
243 log.severe("Exception during rest request processing: " + e.getMessage());
244 log.throwing("BotRunner", "processRestRequest", e);
245 } finally {
246 log.log(Level.FINE, "Done processing incoming rest request", TaskPhases.END);
247 }
248 }
249
250 public void run() {
251 log.info("Starting BotRunner execution, will run forever.");
266 executor.scheduleAtFixedRate(this::checkPeriodicItems, 0,
267 config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);
268
269 try {
270 executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
271 } catch (InterruptedException e) {
272 e.printStackTrace();
273 }
274
275 if (restReceiver != null) {
276 restReceiver.close();
277 }
278 executor.shutdown();
279 }
280
281 public void runOnce(Duration timeout) throws TimeoutException {
282 log.info("Starting BotRunner execution, will run once");
283 log.info("Timeout: " + timeout);
284 log.info("Concurrency: " + config.concurrency());
285
|
27 import java.io.IOException;
28 import java.nio.file.Path;
29 import java.time.*;
30 import java.util.*;
31 import java.util.concurrent.*;
32 import java.util.logging.*;
33 import java.util.stream.Collectors;
34
35 class BotRunnerError extends RuntimeException {
36 BotRunnerError(String msg) {
37 super(msg);
38 }
39
40 BotRunnerError(String msg, Throwable suppressed) {
41 super(msg);
42 addSuppressed(suppressed);
43 }
44 }
45
46 public class BotRunner {
47 enum TaskPhases {
48 BEGIN,
49 END
50 }
51
52 private class RunnableWorkItem implements Runnable {
53 private final WorkItem item;
54
55 RunnableWorkItem(WorkItem wrappedItem) {
56 item = wrappedItem;
57 }
58
59 public WorkItem get() {
60 return item;
61 }
62
63 @Override
64 public void run() {
65 Path scratchPath;
66
73 scratchPath = scratchPaths.removeFirst();
74 }
75
76 log.log(Level.FINE, "Executing item " + item + " on repository " + scratchPath, TaskPhases.BEGIN);
77 try {
78 item.run(scratchPath);
79 } catch (RuntimeException e) {
80 log.severe("Exception during item execution (" + item + "): " + e.getMessage());
81 item.handleRuntimeException(e);
82 log.throwing(item.toString(), "run", e);
83 } finally {
84 log.log(Level.FINE, "Item " + item + " is now done", TaskPhases.END);
85 }
86
87 synchronized (executor) {
88 scratchPaths.addLast(scratchPath);
89 active.remove(item);
90
91 // Some of the pending items may now be eligible for execution
92 var candidateItems = pending.entrySet().stream()
93 .filter(e -> e.getValue().isEmpty() || !active.containsKey(e.getValue().get()))
94 .map(Map.Entry::getKey)
95 .collect(Collectors.toList());
96
97 // Try the candidates against the current active set
98 for (var candidate : candidateItems) {
99 boolean maySubmit = true;
100 for (var activeItem : active.keySet()) {
101 if (!activeItem.concurrentWith(candidate)) {
102 // Still can't run this candidate, leave it pending
103 log.finer("Cannot submit candidate " + candidate + " - not concurrent with " + activeItem);
104 maySubmit = false;
105 break;
106 }
107 }
108
109 if (maySubmit) {
110 pending.remove(candidate);
111 executor.submit(new RunnableWorkItem(candidate));
112 active.put(candidate, Instant.now());
113 log.finer("Submitting candidate: " + candidate);
114 }
115 }
116 }
117 }
118 }
119
120 private final Map<WorkItem, Optional<WorkItem>> pending;
121 private final Map<WorkItem, Instant> active;
122 private final Deque<Path> scratchPaths;
123
124 private void submitOrSchedule(WorkItem item) {
125 synchronized (executor) {
126 for (var activeItem : active.keySet()) {
127 if (!activeItem.concurrentWith(item)) {
128
129 for (var pendingItem : pending.entrySet()) {
130 // If there are pending items of the same type that we cannot run concurrently with, replace them.
131 if (pendingItem.getKey().getClass().equals(item.getClass()) && !pendingItem.getKey().concurrentWith(item)) {
132 log.finer("Discarding obsoleted item " + pendingItem.getKey() +
133 " in favor of item " + item);
134 pending.remove(pendingItem.getKey());
135 // There can't be more than one
136 break;
137 }
138 }
139
140 pending.put(item, Optional.of(activeItem));
141 return;
142 }
143 }
144
145 executor.submit(new RunnableWorkItem(item));
146 active.put(item, Instant.now());
147 }
148 }
149
150 private void drain(Duration timeout) throws TimeoutException {
151 Instant start = Instant.now();
152
153 while (Instant.now().isBefore(start.plus(timeout))) {
154 while (true) {
155 var head = (ScheduledFuture<?>) executor.getQueue().peek();
156 if (head != null) {
157 log.fine("Waiting for future to complete");
158 try {
159 head.get();
160 } catch (InterruptedException | ExecutionException e) {
161 log.warning("Exception during queue drain");
162 log.throwing("BotRunner", "drain", e);
163 }
164 } else {
165 log.finest("Queue is now empty");
166 break;
167 }
168 }
169
170 synchronized (executor) {
171 if (pending.isEmpty() && active.isEmpty()) {
172 log.fine("Nothing awaiting scheduling - drain is finished");
173 return;
174 } else {
175 log.finest("Waiting for flighted tasks");
176 }
177 }
178 try {
179 Thread.sleep(1);
180 watchdog();
181 } catch (InterruptedException e) {
182 log.warning("Exception during queue drain");
183 log.throwing("BotRunner", "drain", e);
184 }
185 }
186
187 throw new TimeoutException();
188 }
189
190 private final BotRunnerConfiguration config;
191 private final List<Bot> bots;
192 private final ScheduledThreadPoolExecutor executor;
193 private final Logger log;
194
195 public BotRunner(BotRunnerConfiguration config, List<Bot> bots) {
196 this.config = config;
197 this.bots = bots;
198
199 pending = new HashMap<>();
200 active = new HashMap<>();
201 scratchPaths = new LinkedList<>();
202
203 for (int i = 0; i < config.concurrency(); ++i) {
204 var folder = config.scratchFolder().resolve("scratch-" + i);
205 scratchPaths.addLast(folder);
206 }
207
208 executor = new ScheduledThreadPoolExecutor(config.concurrency());
209 log = Logger.getLogger("org.openjdk.skara.bot");
210 }
211
212 private void checkPeriodicItems() {
213 log.log(Level.FINE, "Starting of checking for periodic items", TaskPhases.BEGIN);
214 try {
215 for (var bot : bots) {
216 var items = bot.getPeriodicItems();
217 for (var item : items) {
218 submitOrSchedule(item);
219 }
220 }
229 private void watchdog() {
230 synchronized (executor) {
231 for (var activeItem : active.entrySet()) {
232 var activeDuration = Duration.between(activeItem.getValue(), Instant.now());
233 if (activeDuration.compareTo(config.watchdogTimeout()) > 0) {
234 log.severe("Item " + activeItem.getKey() + " has been active more than " + activeDuration +
235 " - this may be an error!");
236 // Reset the counter to avoid continuous reporting - once every watchdogTimeout is enough
237 activeItem.setValue(Instant.now());
238 }
239 }
240 }
241 }
242
243 private void processRestRequest(JSONValue request) {
244 log.log(Level.FINE, "Starting processing of incoming rest request", TaskPhases.BEGIN);
245 log.fine("Request: " + request);
246 try {
247 for (var bot : bots) {
248 var items = bot.processWebHook(request);
249 for (var item : items) {
250 submitOrSchedule(item);
251 }
252 }
253 } catch (RuntimeException e) {
254 log.severe("Exception during rest request processing: " + e.getMessage());
255 log.throwing("BotRunner", "processRestRequest", e);
256 } finally {
257 log.log(Level.FINE, "Done processing incoming rest request", TaskPhases.END);
258 }
259 }
260
261 public void run() {
262 log.info("Starting BotRunner execution, will run forever.");
277 executor.scheduleAtFixedRate(this::watchdog, 0,
278 config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);
279 executor.scheduleAtFixedRate(this::checkPeriodicItems, 0,
280 config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);
281
282 try {
283 executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
284 } catch (InterruptedException e) {
285 e.printStackTrace();
286 }
287
288 if (restReceiver != null) {
289 restReceiver.close();
290 }
291 executor.shutdown();
292 }
293
294 public void runOnce(Duration timeout) throws TimeoutException {
295 log.info("Starting BotRunner execution, will run once");
296 log.info("Timeout: " + timeout);
297 log.info("Concurrency: " + config.concurrency());
298
|