Problem Statement
Design an in-memory concurrent Job Scheduler (similar to the core engine of Quartz Scheduler or Java's ScheduledThreadPoolExecutor). The scheduler must accept task submissions with scheduling parameters (one-time execution after a delay, or periodic recurring executions), execute jobs concurrently using a thread pool, and maintain task schedules in a thread-safe priority structure.
Functional Requirements
- Support scheduling one-time tasks with a specific execution delay.
- Support scheduling recurring periodic tasks that reschedule themselves after execution.
- Maintain tasks ordered by their next execution time using a priority structure.
- Utilize a thread pool to execute scheduled tasks concurrently, ensuring slow-running tasks do not block other jobs.
- Handle thread safety during task submissions, cancellations, and state evaluations.
Objects Required
ScheduledJob(Encapsulation class matching Runnable payload, execution time, and recurrence rules)JobScheduler(Main controller driving priority queues, worker pools, and scheduling threads)
ScheduledJob Class
The ScheduledJob class wraps task logic (a Runnable) and keeps track of execution intervals and next execution times. It implements the Comparable interface to order tasks by execution time.
public class ScheduledJob implements Runnable, Comparable<ScheduledJob> {
private final String id;
private final Runnable task;
private long nextRunTime;
private final long periodMs;
public ScheduledJob(String id, Runnable task, long delayMs, long periodMs) {
this.id = id;
this.task = task;
this.nextRunTime = System.currentTimeMillis() + delayMs;
this.periodMs = periodMs;
}
@Override
public void run() {
task.run();
}
public String getId() { return id; }
public long getNextRunTime() { return nextRunTime; }
public long getPeriodMs() { return periodMs; }
public boolean isRecurring() {
return periodMs > 0;
}
public void updateNextRunTime() {
if (isRecurring()) {
this.nextRunTime = System.currentTimeMillis() + periodMs;
}
}
@Override
public int compareTo(ScheduledJob other) {
return Long.compare(this.nextRunTime, other.nextRunTime);
}
}
The constructor calculates the absolute epoch execution time. isRecurring() determines if the task is a one-time or periodic job, while updateNextRunTime() shifts the target window forward for rescheduled execution blocks.
JobScheduler Class
The JobScheduler class manages scheduling. It utilizes a thread-safe PriorityBlockingQueue to track task times and runs a background loop to trigger executions.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
public class JobScheduler {
private final PriorityBlockingQueue<ScheduledJob> taskQueue;
private final ExecutorService threadPool;
private final Thread schedulerThread;
private volatile boolean running;
public JobScheduler(int poolSize) {
this.taskQueue = new PriorityBlockingQueue<>();
this.threadPool = Executors.newFixedThreadPool(poolSize);
this.schedulerThread = new Thread(this::schedulerLoop);
this.schedulerThread.setName("Job-Scheduler-Daemon");
this.running = false;
}
public synchronized void start() {
if (running) return;
running = true;
schedulerThread.start();
System.out.println("Job Scheduler engine started.");
}
public synchronized void stop() {
running = false;
schedulerThread.interrupt();
threadPool.shutdown();
System.out.println("Job Scheduler engine stopped.");
}
public void schedule(String id, Runnable task, long delayMs, long periodMs) {
ScheduledJob job = new ScheduledJob(id, task, delayMs, periodMs);
taskQueue.add(job);
// Force the scheduler thread to wake up and re-evaluate queue ordering
synchronized (taskQueue) {
taskQueue.notifyAll();
}
}
private void schedulerLoop() {
try {
while (running) {
synchronized (taskQueue) {
while (taskQueue.isEmpty() && running) {
taskQueue.wait();
}
}
if (!running) break;
ScheduledJob nextJob = taskQueue.peek();
long now = System.currentTimeMillis();
if (nextJob != null && nextJob.getNextRunTime() <= now) {
// Poll tasks safely and submit to execution threads
taskQueue.poll();
threadPool.submit(() -> {
try {
nextJob.run();
} finally {
if (nextJob.isRecurring()) {
nextJob.updateNextRunTime();
taskQueue.add(nextJob);
synchronized (taskQueue) {
taskQueue.notifyAll();
}
}
}
});
} else if (nextJob != null) {
// Sleep until the next job is ready to run
long sleepTime = nextJob.getNextRunTime() - now;
synchronized (taskQueue) {
taskQueue.wait(sleepTime);
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Here is an explanation of the core operations in the JobScheduler class:
- The constructor configures concurrency pools and initializes a background daemon monitoring thread.
start()andstop()manage the lifecycle, shutting down workers and interrupting loops cleanly.schedule()adds a task to the queue and notifies wait blocks to re-evaluate sorting constraints.schedulerLoop()monitors the queue. If empty, it waits for notifications. If a task is ready, it submits the job to the thread pool and reschedules periodic tasks. If no task is ready, it waits for the duration of the delay.
Main Driver Class
This class tests our concurrent job scheduler. It submits one-time and periodic tasks, monitors parallel output logs, and handles clean terminations.
public class Main {
public static void main(String[] args) throws InterruptedException {
// Start scheduler with 3 execution threads
JobScheduler scheduler = new JobScheduler(3);
scheduler.start();
System.out.println("Scheduling tasks...");
// Task A: Run once after 1 second
scheduler.schedule("TaskA", () -> {
System.out.println("[" + Thread.currentThread().getName() + "] Task A executed (One-time, expected 1s delay)");
}, 1000, 0);
// Task B: Run periodically every 2 seconds, starting immediately
scheduler.schedule("TaskB", () -> {
System.out.println("[" + Thread.currentThread().getName() + "] Task B executed (Periodic, runs every 2s)");
}, 0, 2000);
// Task C: Run periodically every 3 seconds, starting after a 1 second delay
scheduler.schedule("TaskC", () -> {
System.out.println("[" + Thread.currentThread().getName() + "] Task C executed (Periodic, runs every 3s)");
}, 1000, 3000);
// Allow execution to run for 7 seconds
Thread.sleep(7000);
// Stop the scheduler engine
System.out.println("\nStopping scheduler...");
scheduler.stop();
}
}
The main() driver configures the environment, registers task routines, runs concurrent worker blocks, and triggers safe shutdowns.
Comments
Post a Comment