< prev index next > bot/src/main/java/org/openjdk/skara/bot/BotRunner.java
Print this page
addSuppressed(suppressed);
}
}
public class BotRunner {
-
enum TaskPhases {
BEGIN,
END
}
scratchPaths.addLast(scratchPath);
active.remove(item);
// Some of the pending items may now be eligible for execution
var candidateItems = pending.entrySet().stream()
! .filter(e -> !e.getValue().isPresent() || !active.contains(e.getValue().get()))
.map(Map.Entry::getKey)
.collect(Collectors.toList());
// Try the candidates against the current active set
for (var candidate : candidateItems) {
boolean maySubmit = true;
! for (var activeItem : active) {
if (!activeItem.concurrentWith(candidate)) {
// Still can't run this candidate, leave it pending
log.finer("Cannot submit candidate " + candidate + " - not concurrent with " + activeItem);
maySubmit = false;
break;
scratchPaths.addLast(scratchPath);
active.remove(item);
// Some of the pending items may now be eligible for execution
var candidateItems = pending.entrySet().stream()
! .filter(e -> e.getValue().isEmpty() || !active.containsKey(e.getValue().get()))
.map(Map.Entry::getKey)
.collect(Collectors.toList());
// Try the candidates against the current active set
for (var candidate : candidateItems) {
boolean maySubmit = true;
! for (var activeItem : active.keySet()) {
if (!activeItem.concurrentWith(candidate)) {
// Still can't run this candidate, leave it pending
log.finer("Cannot submit candidate " + candidate + " - not concurrent with " + activeItem);
maySubmit = false;
break;
}
if (maySubmit) {
pending.remove(candidate);
executor.submit(new RunnableWorkItem(candidate));
! active.add(candidate);
log.finer("Submitting candidate: " + candidate);
}
}
}
-
}
}
private final Map<WorkItem, Optional<WorkItem>> pending;
! private final Set<WorkItem> active;
private final Deque<Path> scratchPaths;
private void submitOrSchedule(WorkItem item) {
-
synchronized (executor) {
! for (var activeItem : active) {
if (!activeItem.concurrentWith(item)) {
for (var pendingItem : pending.entrySet()) {
// If there are pending items of the same type that we cannot run concurrently with, replace them.
if (pendingItem.getKey().getClass().equals(item.getClass()) && !pendingItem.getKey().concurrentWith(item)) {
}
if (maySubmit) {
pending.remove(candidate);
executor.submit(new RunnableWorkItem(candidate));
! active.put(candidate, Instant.now());
log.finer("Submitting candidate: " + candidate);
}
}
}
}
}
private final Map<WorkItem, Optional<WorkItem>> pending;
! private final Map<WorkItem, Instant> active;
private final Deque<Path> scratchPaths;
private void submitOrSchedule(WorkItem item) {
synchronized (executor) {
! for (var activeItem : active.keySet()) {
if (!activeItem.concurrentWith(item)) {
for (var pendingItem : pending.entrySet()) {
// If there are pending items of the same type that we cannot run concurrently with, replace them.
if (pendingItem.getKey().getClass().equals(item.getClass()) && !pendingItem.getKey().concurrentWith(item)) {
return;
}
}
executor.submit(new RunnableWorkItem(item));
! active.add(item);
}
}
private void drain(Duration timeout) throws TimeoutException {
-
Instant start = Instant.now();
while (Instant.now().isBefore(start.plus(timeout))) {
while (true) {
var head = (ScheduledFuture<?>) executor.getQueue().peek();
return;
}
}
executor.submit(new RunnableWorkItem(item));
! active.put(item, Instant.now());
}
}
private void drain(Duration timeout) throws TimeoutException {
Instant start = Instant.now();
while (Instant.now().isBefore(start.plus(timeout))) {
while (true) {
var head = (ScheduledFuture<?>) executor.getQueue().peek();
log.finest("Waiting for flighted tasks");
}
}
try {
Thread.sleep(1);
+ watchdog();
} catch (InterruptedException e) {
log.warning("Exception during queue drain");
log.throwing("BotRunner", "drain", e);
}
}
public BotRunner(BotRunnerConfiguration config, List<Bot> bots) {
this.config = config;
this.bots = bots;
pending = new HashMap<>();
! active = new HashSet<>();
scratchPaths = new LinkedList<>();
for (int i = 0; i < config.concurrency(); ++i) {
var folder = config.scratchFolder().resolve("scratch-" + i);
scratchPaths.addLast(folder);
public BotRunner(BotRunnerConfiguration config, List<Bot> bots) {
this.config = config;
this.bots = bots;
pending = new HashMap<>();
! active = new HashMap<>();
scratchPaths = new LinkedList<>();
for (int i = 0; i < config.concurrency(); ++i) {
var folder = config.scratchFolder().resolve("scratch-" + i);
scratchPaths.addLast(folder);
} finally {
log.log(Level.FINE, "Done checking periodic items", TaskPhases.END);
}
}
+ private void watchdog() {
+ synchronized (executor) {
+ for (var activeItem : active.entrySet()) {
+ var activeDuration = Duration.between(activeItem.getValue(), Instant.now());
+ if (activeDuration.compareTo(config.watchdogTimeout()) > 0) {
+ log.severe("Item " + activeItem.getKey() + " has been active more than " + activeDuration +
+ " - this may be an error!");
+ // Reset the counter to avoid continuous reporting - once every watchdogTimeout is enough
+ activeItem.setValue(Instant.now());
+ }
+ }
+ }
+ }
+
private void processRestRequest(JSONValue request) {
log.log(Level.FINE, "Starting processing of incoming rest request", TaskPhases.BEGIN);
log.fine("Request: " + request);
try {
for (var bot : bots) {
log.warning("Failed to create RestReceiver");
log.throwing("BotRunner", "run", e);
}
}
+ executor.scheduleAtFixedRate(this::watchdog, 0,
+ config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);
executor.scheduleAtFixedRate(this::checkPeriodicItems, 0,
config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
< prev index next >