Assignment P2 - Uniprocessor Scheduling Algorithms
Due: Friday, February 14, 2025, at 10:00pm
You may work alone or with a partner, but you must type up the code yourself. You may also discuss the assignment at a high level with other students. You should list any student with whom you discussed the assignment, and the manner of discussion (high-level, partner, etc.) in a readme.txt file that you include with your submission.
You should submit your assignment as group of files on Gradescope. (See submission instructions, below.)
Starter code: p2.tar
Parts of this assignment:
In the first programming assignment, you did some verification of existing schedules. For this assignment, you’ll be constructing the schedules yourself for three scheduling algorithms: FIFO, EDF, and RM.
Representing a scheduling algorithm
Each scheduling algorithm you will implement is contained in a class that descends from the SchedulerAlgorithm class.
class SchedulerAlgorithm(object):
def __init__(self, taskSet):
self.taskSet = taskSet
self.schedule = Schedule(None, taskSet)In addition to the taskSet and schedule (which you’ll build by adding intervals) members, you will also have a priorityQueue member. The first step of building the schedule is to initialize the priority queue, which calls into SchedulerAlgorithm._buildPriorityQueue. (This has been done for you.) The corresponding priority queue descends from the PriorityQueue class.
class PriorityQueue(object):
def __init__(self, jobReleaseDict):
"""
Builds the priority queue of all jobs.
This will need to be sorted to match the scheduling algorithm.
"""
self.jobs = []
releaseTimes = sorted(jobReleaseDict.keys())
for time in releaseTimes:
for job in jobReleaseDict[time]:
self.jobs.append(job)
self._sortQueue()
...
def popFirst(self, t):
"""
Removes and returns the job with the highest priority at time t,
if one exists.
"""
index = self._findFirst(t)
if index >= 0:
return self.jobs.pop(index)
def _sortQueue(self):
raise NotImplementedError
def _findFirst(self, t):
raise NotImplementedError
def popNextJob(self, t):
raise NotImplementedError
def popPreemptingJob(self, t, job):
raise NotImplementedErrorSome of the PriorityQueue methods are implemented for you. Others must be implemented by all child classes. You can find more information about these in the docstrings for the methods in the child classes.
Getting started
If you want to view schedules for this assignment, you will want Pygame. You should be able to get it via pip using the command line / terminal. Here are some instructions that might help you out.
Once you have pygame, you can comment-in the lines at the bottom of each scheduler file that draw the resulting schedule. This is very helpful for debugging.
## display = SchedulingDisplay(width=800, height=480, fps=33, scheduleData=schedule)
## display.run()The code you’ll write for this assignment builds on the code from Assignment P1. Make sure to copy over your functions from P1 to schedule.py before getting started.
Note: For this assignment, you are especially encouraged to share images of your current scheduling results on Slack. Feel free to suggest things, without going into too much detail. For example, if you recognize that a classmate isn’t handling the rest of a job correctly after a preemption, you could remind them to make sure they re-add the job to the priority queue after reducing its remaining WCET.
Part 1: FIFO
The work of each scheduler is broken up into two methods. The creation of the entire schedule is handled by the buildSchedule method. This method should call _makeSchedulingDecision to determine the start of the next scheduling interval, and the job that will execute during it, if any.
For the FIFO scheduler, buildSchedule is implemented for you, as is the FifoPriorityQueue class. You should fill in the implementation of _makeSchedulingDecision:
class FifoScheduler(SchedulerAlgorithm):
...
def _makeSchedulingDecision(self, t, previousJob):
"""
Makes a scheduling decision after time t. Assumes there is at least one job
left in the priority queue.
t: the beginning of the previous time interval, if one exists (or 0 otherwise)
previousJob: the job that was previously executing, and will either complete or be preempted
returns: (ScheduleInterval instance, Job instance of next job to execute)
"""
# TODO: Make a scheduling decision at the next scheduling time point after time t.
# Let's call this new time nextTime.
#
# If there was no previous job executing, the next job will come from the priority
# queue at or after time t.
# If there was a previous job executing, choose the highest-priority job from the
# priority queue at nextTime.
#
# Note that if there was a previous job but when it finishes, no more jobs are ready
# (released prior to that time), then there should be no job associated with the new
# interval. The ScheduleInterval.initialize method will handle this, you just have
# to provide it with None rather than a job.
#
# Once you have the next job to execute (if any), build the interval (which starts at
# nextTime) and return it and the next job.
return interval, nextJob # TODO: make these variablesMake sure to carefully read the buildSchedule method so that you understand what is provided to the _makeSchedulingDecision method, and how its output is used.
There are some methods in the FifoPriorityQueue class (and its parent, PriorityQueue, as shown above) that might help you out:
class FifoPriorityQueue(PriorityQueue):
...
def popNextJob(self, t):
"""
Removes and returns the highest-priority job of those released at or after t,
or None if no jobs are released at or after t.
"""
if not self.isEmpty() and self.jobs[0].releaseTime >= t:
return self.jobs.pop(0)
else:
return None
def popPreemptingJob(self, t, job):
"""
Removes and returns the job that will preempt job 'job' after time 't', or None
if no such preemption will occur (i.e., if no higher-priority jobs
are released before job 'job' will finish executing).
t: the time after which a preemption may occur
job: the job that is executing at time 't', and which may be preempted
"""
return NoneAlso, be sure to take a look at how ScheduleInterval.initialize is used in _makeSchedulingDecision:
# Add the final idle interval
finalInterval = ScheduleInterval()
finalInterval.intialize(time, None, False)
self.schedule.addInterval(finalInterval)You can check your work against a variety of test cases, but I recommend especially checking p1_test5.json:

Part 2: EDF (preemptive)
For the EDF scheduler, the priority queue is still given to you (including the change in logic to handle prioritizing by absolute deadline rather than release time, and the more complex popPreemptingJob).
However, you will have to implement both the buildSchedule and _makeSchedulingDecision methods of the EdfScheduler class. Make sure to spend some time thinking about how these methods need to differ from FIFO. (Hint: They may not differ much!)
class EdfScheduler(SchedulerAlgorithm):
def __init__(self, taskSet):
SchedulerAlgorithm.__init__(self, taskSet)
SchedulerAlgorithm.__init__(self, taskSet)
def buildSchedule(self, startTime, endTime):
self._buildPriorityQueue(EdfPriorityQueue)
# TODO: build a scheduling using EDF, using _makeSchedulingDecision() to determine
# the next decision to be made (think about how this differs from FIFO)
return self.schedule
def _makeSchedulingDecision(self, t, previousJob):
"""
Makes a scheduling decision after time t. Assumes there is at least one job
left in the priority queue.
t: the beginning of the previous time interval, if one exists (or 0 otherwise)
previousJob: the job that was previously executing, and will either complete or be preempted
returns: (ScheduleInterval instance, Job instance of next job to execute)
"""
# TODO: Make a scheduling decision at the next scheduling time point after time t.
# Let's call this new time nextTime.
#
# If there was no previous job executing, the next job will come from the priority
# queue at or after time t.
# If there was a previous job executing, choose the highest-priority job from the
# priority queue of those released at or before time nextTime.
#
# Note that if there was a previous job but when it finishes, no more jobs are ready
# (released prior to that time), then there should be no job associated with the new
# interval. The ScheduleInterval.initialize method will handle this, you just have
# to provide it with None rather than a job.
#
# Once you have the next job to execute (if any), build the interval (which starts at
# nextTime) and return it and the next job.
interval = ScheduleInterval() # TODO: replace this line!
nextJob = None # TODO: this one too!
return interval, nextJobThere are a couple of methods in the Job class that might help you track the work of jobs that have been preempted:
class Job(object):
def __init__(self, task, jobId, releaseTime):
...
self.remainingTime = self.task.wcet
def execute(self, time):
executionTime = min(self.remainingTime, time)
self.remainingTime -= executionTime
return executionTime
def executeToCompletion(self):
return self.execute(self.remainingTime)
def isCompleted(self):
return self.remainingTime == 0Note: Don’t forget to re-add a job to the priority queue if you remove it to execute part of it.
As before, you can check your work against a variety of tests. You should especially compare to the ones with names like w1_probX.json, as there are pictures in Assignment W1 you can compare to.
Part 3: RM
For the Rate Monotonic scheduler, you should implement both the priority queue methods and the two methods in the RmScheduler class. Make sure to think very carefully about what needs to change between EDF and RM, given that both are preemptive scheduling algorithms.
class RmScheduler(SchedulerAlgorithm):
def __init__(self, taskSet):
SchedulerAlgorithm.__init__(self, taskSet)
SchedulerAlgorithm.__init__(self, taskSet)
def buildSchedule(self, startTime, endTime):
self._buildPriorityQueue(RmPriorityQueue)
# TODO: build a scheduling using RM, using _makeSchedulingDecision() to determine
# the next decision to be made (think about how this differs from FIFO/EDF)
return self.schedule
def _makeSchedulingDecision(self, t, previousJob):
"""
Makes a scheduling decision after time t. Assumes there is at least one job
left in the priority queue.
t: the beginning of the previous time interval, if one exists (or 0 otherwise)
previousJob: the job that was previously executing, and will either complete or be preempted
returns: (ScheduleInterval instance, Job instance of next job to execute)
"""
# TODO: Make a scheduling decision at the next scheduling time point after time t.
# Let's call this new time nextTime.
#
# If there was no previous job executing, the next job will come from the priority
# queue at or after time t.
# If there was a previous job executing, choose the highest-priority job from the
# priority queue of those released at or before time nextTime.
#
# Note that if there was a previous job but when it finishes, no more jobs are ready
# (released prior to that time), then there should be no job associated with the new
# interval. The ScheduleInterval.initialize method will handle this, you just have
# to provide it with None rather than a job.
#
# Once you have the next job to execute (if any), build the interval (which starts at
# nextTime) and return it and the next job.
interval = ScheduleInterval() # TODO: replace this line!
nextJob = None # TODO: this one too!
return interval, nextJob
class RmPriorityQueue(PriorityQueue):
def __init__(self, jobReleaseDict):
"""
Creates a priority queue of jobs ordered by period.
"""
PriorityQueue.__init__(self, jobReleaseDict)
def _sortQueue(self):
# RM orders by period
# TODO: implement this!
raise NotImplementedError
def _findFirst(self, t):
"""
Returns the index of the highest-priority job released at or before t,
or -1 if the queue is empty or if all remaining jobs are released after t.
"""
# TODO: implement this!
raise NotImplementedError
def popNextJob(self, t):
"""
Removes and returns the highest-priority job of those released at or after t,
or None if no jobs are released at or after t.
"""
# TODO: implement this!
raise NotImplementedError
def popPreemptingJob(self, t, job):
"""
Removes and returns the job that will preempt job 'job' after time 't', or None
if no such preemption will occur (i.e., if no higher-priority jobs
are released before job 'job' will finish executing).
t: the time after which a preemption may occur
job: the job that is executing at time 't', and which may be preempted
"""
# TODO: implement this!
raise NotImplementedErrorTry making your own test cases!
Take a look at the file p2_test1.json. This task set is based on Figure 4.13 in the textbook. Now that you have seen the notation for periodic tasks, you should easily be able to create your own test cases as .json files.
What you should submit
You should submit the following files to Gradescope:
readme.txt(collaboration statement listing collaborators and form of collaboration)fifo.pyedf.pyrm.py- any new .json files that you chose to create for testing, and want to share (optional)
An example readme.txt file might be the following:
Author: Lulu Amert
Collaboration statement:
* Cheddar - partner
* Marshmallow - discussed how to update the remaining WCET of a task after partial completionIf you did not collaborate with anyone, you should say so:
Author: Hobbes Amert
Collaboration statement:
For this assignment, I worked alone.