< prev index next >

bot/src/main/java/org/openjdk/skara/bot/BotRunner.java

Print this page
*** 42,11 ***
          addSuppressed(suppressed);
      }
  }
  
  public class BotRunner {
- 
      enum TaskPhases {
          BEGIN,
          END
      }
  
--- 42,10 ---

*** 89,18 ***
                  scratchPaths.addLast(scratchPath);
                  active.remove(item);
  
                  // Some of the pending items may now be eligible for execution
                  var candidateItems = pending.entrySet().stream()
!                                             .filter(e -> !e.getValue().isPresent() || !active.contains(e.getValue().get()))
                                              .map(Map.Entry::getKey)
                                              .collect(Collectors.toList());
  
                  // Try the candidates against the current active set
                  for (var candidate : candidateItems) {
                      boolean maySubmit = true;
!                     for (var activeItem : active) {
                          if (!activeItem.concurrentWith(candidate)) {
                              // Still can't run this candidate, leave it pending
                              log.finer("Cannot submit candidate " + candidate + " - not concurrent with " + activeItem);
                              maySubmit = false;
                              break;
--- 88,18 ---
                  scratchPaths.addLast(scratchPath);
                  active.remove(item);
  
                  // Some of the pending items may now be eligible for execution
                  var candidateItems = pending.entrySet().stream()
!                                             .filter(e -> e.getValue().isEmpty() || !active.containsKey(e.getValue().get()))
                                              .map(Map.Entry::getKey)
                                              .collect(Collectors.toList());
  
                  // Try the candidates against the current active set
                  for (var candidate : candidateItems) {
                      boolean maySubmit = true;
!                     for (var activeItem : active.keySet()) {
                          if (!activeItem.concurrentWith(candidate)) {
                              // Still can't run this candidate, leave it pending
                              log.finer("Cannot submit candidate " + candidate + " - not concurrent with " + activeItem);
                              maySubmit = false;
                              break;

*** 108,27 ***
                      }
  
                      if (maySubmit) {
                          pending.remove(candidate);
                          executor.submit(new RunnableWorkItem(candidate));
!                         active.add(candidate);
                          log.finer("Submitting candidate: " + candidate);
                      }
                  }
              }
- 
          }
      }
  
      private final Map<WorkItem, Optional<WorkItem>> pending;
!     private final Set<WorkItem> active;
      private final Deque<Path> scratchPaths;
  
      private void submitOrSchedule(WorkItem item) {
- 
          synchronized (executor) {
!             for (var activeItem : active) {
                  if (!activeItem.concurrentWith(item)) {
  
                      for (var pendingItem : pending.entrySet()) {
                          // If there are pending items of the same type that we cannot run concurrently with, replace them.
                          if (pendingItem.getKey().getClass().equals(item.getClass()) && !pendingItem.getKey().concurrentWith(item)) {
--- 107,25 ---
                      }
  
                      if (maySubmit) {
                          pending.remove(candidate);
                          executor.submit(new RunnableWorkItem(candidate));
!                         active.put(candidate, Instant.now());
                          log.finer("Submitting candidate: " + candidate);
                      }
                  }
              }
          }
      }
  
      private final Map<WorkItem, Optional<WorkItem>> pending;
!     private final Map<WorkItem, Instant> active;
      private final Deque<Path> scratchPaths;
  
      private void submitOrSchedule(WorkItem item) {
          synchronized (executor) {
!             for (var activeItem : active.keySet()) {
                  if (!activeItem.concurrentWith(item)) {
  
                      for (var pendingItem : pending.entrySet()) {
                          // If there are pending items of the same type that we cannot run concurrently with, replace them.
                          if (pendingItem.getKey().getClass().equals(item.getClass()) && !pendingItem.getKey().concurrentWith(item)) {

*** 144,16 ***
                      return;
                  }
              }
  
              executor.submit(new RunnableWorkItem(item));
!             active.add(item);
          }
      }
  
      private void drain(Duration timeout) throws TimeoutException {
- 
          Instant start = Instant.now();
  
          while (Instant.now().isBefore(start.plus(timeout))) {
              while (true) {
                  var head = (ScheduledFuture<?>) executor.getQueue().peek();
--- 141,15 ---
                      return;
                  }
              }
  
              executor.submit(new RunnableWorkItem(item));
!             active.put(item, Instant.now());
          }
      }
  
      private void drain(Duration timeout) throws TimeoutException {
          Instant start = Instant.now();
  
          while (Instant.now().isBefore(start.plus(timeout))) {
              while (true) {
                  var head = (ScheduledFuture<?>) executor.getQueue().peek();

*** 179,10 ***
--- 175,11 ---
                      log.finest("Waiting for flighted tasks");
                  }
              }
              try {
                  Thread.sleep(1);
+                 watchdog();
              } catch (InterruptedException e) {
                  log.warning("Exception during queue drain");
                  log.throwing("BotRunner", "drain", e);
              }
          }

*** 198,11 ***
      public BotRunner(BotRunnerConfiguration config, List<Bot> bots) {
          this.config = config;
          this.bots = bots;
  
          pending = new HashMap<>();
!         active = new HashSet<>();
          scratchPaths = new LinkedList<>();
  
          for (int i = 0; i < config.concurrency(); ++i) {
              var folder = config.scratchFolder().resolve("scratch-" + i);
              scratchPaths.addLast(folder);
--- 195,11 ---
      public BotRunner(BotRunnerConfiguration config, List<Bot> bots) {
          this.config = config;
          this.bots = bots;
  
          pending = new HashMap<>();
!         active = new HashMap<>();
          scratchPaths = new LinkedList<>();
  
          for (int i = 0; i < config.concurrency(); ++i) {
              var folder = config.scratchFolder().resolve("scratch-" + i);
              scratchPaths.addLast(folder);

*** 227,10 ***
--- 224,24 ---
          } finally {
              log.log(Level.FINE, "Done checking periodic items", TaskPhases.END);
          }
      }
  
+     private void watchdog() {
+         synchronized (executor) {
+             for (var activeItem : active.entrySet()) {
+                 var activeDuration = Duration.between(activeItem.getValue(), Instant.now());
+                 if (activeDuration.compareTo(config.watchdogTimeout()) > 0) {
+                     log.severe("Item " + activeItem.getKey() + " has been active more than " + activeDuration +
+                                        " - this may be an error!");
+                     // Reset the counter to avoid continuous reporting - once every watchdogTimeout is enough
+                     activeItem.setValue(Instant.now());
+                 }
+             }
+         }
+     }
+ 
      private void processRestRequest(JSONValue request) {
          log.log(Level.FINE, "Starting processing of incoming rest request", TaskPhases.BEGIN);
          log.fine("Request: " + request);
          try {
              for (var bot : bots) {

*** 261,10 ***
--- 272,12 ---
                  log.warning("Failed to create RestReceiver");
                  log.throwing("BotRunner", "run", e);
              }
          }
  
+         executor.scheduleAtFixedRate(this::watchdog, 0,
+                                      config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);
          executor.scheduleAtFixedRate(this::checkPeriodicItems, 0,
                                       config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);
  
          try {
              executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
< prev index next >