Instructions
Requirements and Specifications
Screenshots
Source Code
BINARY HEAP
# Bradley N. Miller, David L. Ranum
# Introduction to Data Structures and Algorithms in Python
# Copyright 2005
#
import unittest
# this heap takes key value pairs, we will assume that the keys are integers
class BinHeap:
def __init__(self):
self.heapList = [0]
self.currentSize = 0
def buildHeap(self,alist):
i = len(alist) // 2
self.currentSize = len(alist)
self.heapList = [0] + alist[:]
print(len(self.heapList), i)
while (i > 0):
print(self.heapList, i)
self.percDown(i)
i = i - 1
print(self.heapList,i)
def percDown(self,i):
while (i * 2) <= self.currentSize:
mc = self.minChild(i)
if self.heapList[i] > self.heapList[mc]:
tmp = self.heapList[i]
self.heapList[i] = self.heapList[mc]
self.heapList[mc] = tmp
i = mc
def minChild(self,i):
if i * 2 + 1 > self.currentSize:
return i * 2
else:
if self.heapList[i * 2] < self.heapList[i * 2 + 1]:
return i * 2
else:
return i * 2 + 1
def percUp(self,i):
while i // 2 > 0:
if self.heapList[i] < self.heapList[i//2]:
tmp = self.heapList[i // 2]
self.heapList[i // 2] = self.heapList[i]
self.heapList[i] = tmp
i = i // 2
def insert(self,k):
self.heapList.append(k)
self.currentSize = self.currentSize + 1
self.percUp(self.currentSize)
def delMin(self):
retval = self.heapList[1]
self.heapList[1] = self.heapList[self.currentSize]
self.currentSize = self.currentSize - 1
self.heapList.pop()
self.percDown(1)
return retval
def peekMin(self):
return self.heapList[1]
def isEmpty(self):
if self.currentSize == 0:
return True
else:
return False
def size(self):
return self.currentSize
def __str__(self):
return str(self.heapList[1:])
class FooThing:
def __init__(self,x,y):
self.key = x
self.val = y
def getKey(self):
return self.key
def getValue(self):
return self.val
def setValue(self, newValue):
self.val = newValue
def __lt__(self,other):
if self.key < other.key:
return True
else:
return False
def __gt__(self,other):
if self.key > other.key:
return True
else:
return False
def __hash__(self):
return self.key
class TestBinHeap(unittest.TestCase):
def setUp(self):
self.theHeap = BinHeap()
self.theHeap.insert(FooThing(5,'a'))
self.theHeap.insert(FooThing(9,'d'))
self.theHeap.insert(FooThing(1,'x'))
self.theHeap.insert(FooThing(2,'y'))
self.theHeap.insert(FooThing(3,'z'))
def testInsert(self):
assert self.theHeap.currentSize == 5
def testDelmin(self):
assert self.theHeap.delMin().getValue() == 'x'
assert self.theHeap.delMin().getValue() == 'y'
assert self.theHeap.delMin().getValue() == 'z'
assert self.theHeap.delMin().getValue() == 'a'
def testMixed(self):
myHeap = BinHeap()
myHeap.insert(9)
myHeap.insert(1)
myHeap.insert(5)
assert myHeap.delMin() == 1
myHeap.insert(2)
myHeap.insert(7)
assert myHeap.delMin() == 2
assert myHeap.delMin() == 5
def testDupes(self):
myHeap = BinHeap()
myHeap.insert(9)
myHeap.insert(1)
myHeap.insert(8)
myHeap.insert(1)
assert myHeap.currentSize == 4
assert myHeap.delMin() == 1
assert myHeap.delMin() == 1
assert myHeap.delMin() == 8
def testBuildHeap(self):
myHeap = BinHeap()
myHeap.buildHeap([9,5,6,2,3])
f = myHeap.delMin()
print("f = ", f)
assert f == 2
assert myHeap.delMin() == 3
assert myHeap.delMin() == 5
assert myHeap.delMin() == 6
assert myHeap.delMin() == 9
if __name__ == '__main__':
## d = {}
## d[FooThing(1,'z')] = 10
## unittest.main()
suite = unittest.makeSuite(TestBinHeap)
unittest.TextTestRunner().run(suite)
PRIORITY QUEUE
### File: priority_queue.py
from binheap import BinHeap
class PriorityQueue:
def __init__(self):
self._heap = BinHeap()
def isEmpty(self):
return self._heap.isEmpty()
def enqueue(self, item):
self._heap.insert(item)
def dequeue(self):
return self._heap.delMin()
def peek(self):
return self._heap.peekMin()
def size(self):
return self._heap.size()
def __str__(self):
return str(self._heap)
class PriorityQueueEntry:
def __init__(self,priority,value):
self.priority = priority
self.val = value
def getPriority(self):
return self.priority
def getValue(self):
return self.val
def setValue(self, newValue):
self.val = newValue
def __lt__(self,other):
if self.priority < other.priority:
return True
else:
return False
def __gt__(self,other):
if self.priority > other.priority:
return True
else:
return False
def __hash__(self):
return self.key
import unittest
class TestPriorityQueue(unittest.TestCase):
def setUp(self):
self.pq = PriorityQueue()
self.pq.enqueue(PriorityQueueEntry(5,'a'))
self.pq.enqueue(PriorityQueueEntry(9,'d'))
self.pq.enqueue(PriorityQueueEntry(1,'x'))
self.pq.enqueue(PriorityQueueEntry(2,'y'))
self.pq.enqueue(PriorityQueueEntry(3,'z'))
def testInsert(self):
assert self.pq.size() == 5
def testPeek(self):
assert self.pq.peek().getValue() == 'x'
assert self.pq.peek().getValue() == 'x'
assert self.pq.peek().getValue() == 'x'
assert self.pq.peek().getValue() == 'x'
def testDelmin(self):
assert self.pq.dequeue().getValue() == 'x'
assert self.pq.dequeue().getValue() == 'y'
assert self.pq.dequeue().getValue() == 'z'
assert self.pq.dequeue().getValue() == 'a'
def testMixed(self):
myPQ = PriorityQueue()
myPQ.enqueue(9)
myPQ.enqueue(1)
myPQ.enqueue(5)
assert myPQ.dequeue() == 1
myPQ.enqueue(2)
myPQ.enqueue(7)
assert myPQ.dequeue() == 2
assert myPQ.dequeue() == 5
def testDupes(self):
myPQ = PriorityQueue()
myPQ.enqueue(9)
myPQ.enqueue(1)
myPQ.enqueue(8)
myPQ.enqueue(1)
assert myPQ.size() == 4
assert myPQ.dequeue() == 1
assert myPQ.dequeue() == 1
assert myPQ.dequeue() == 8
## def testBuildHeap(self):
## myPQ = priorityQueue()
## myPQ.buildHeap([9,5,6,2,3])
## f = myPQ.dequeue()
## print("f = ", f)
## assert f == 2
## assert myPQ.dequeue() == 3
## assert myPQ.dequeue() == 5
## assert myPQ.dequeue() == 6
## assert myPQ.dequeue() == 9
if __name__ == '__main__':
## d = {}
## d[PriorityQueueEntry(1,'z')] = 10
## unittest.main()
suite = unittest.makeSuite(TestPriorityQueue)
unittest.TextTestRunner().run(suite)
PROJECT 3
""" File: project3.py
Description: Project 3. Discrete-Event Simulation of multiple check-out lines.
"""
from priority_queue import PriorityQueue
from priority_queue import PriorityQueueEntry
from queue_text import QueueText
import random
def main():
# Gather simulation parameters from the user
totalNumberCashiers = int(input("Enter total # of cashiers: "))
numberCashiers = int(input("Enter # of open cashiers: "))
lineLengthThreshold = int(input("Enter limit of queue length for opening new cashier: "))
simulationLength = int(input("Enter simulation length (in minutes): "))
probabilityOfArrival = float(input("Enter probability of customer arrival each minute (0 to 1): "))
averageCustomerTime = int(input("Enter average # minutes for a customer: "))
# Set-up data structures needed by the simulation
# holds cashier-idle events "sorted" by event time containing value of cashier # going idle
cashierIdleEventQueue = PriorityQueue()
# parallel lists (i.e., index 0 of each list comprises state of cashier 0, etc.) to
# store the state of all cashiers:
customersServedByCashierList = [] # number of customers served by each cashier
cashierIdleList = [] # whether the cashier is idle (True) or busy (False)
checkOutLines = [] # FIFO queue to hold waiting customers in cashier's
# check-out line. Holds a customer's check-out duration
cashierLoad = [] # current total load of cashier
totalWaiting = [] # total waiting time for all customers of cashier
# for each cashier set-up their initial state
for cashier in range(numberCashiers):
checkOutLines.append(QueueText()) # create empty queue for each cashier
cashierIdleList.append(True) # each cashiers initially idle
customersServedByCashierList.append(0) # tracks # customers served per cashier
cashierLoad.append(0)
totalWaiting.append(0)
# run simulation for each minute
for clock in range(simulationLength):
# see if a customer arrives and add them to the shortest line if they do
if customerArrived(probabilityOfArrival):
if addCustomerToShortestLineWithTime(checkOutLines, cashierLoad, averageCustomerTime, clock, lineLengthThreshold):
if numberCashiers < totalNumberCashiers:
checkOutLines.append(QueueText())
cashierIdleList.append(True)
customersServedByCashierList.append(0)
cashierLoad.append(0)
totalWaiting.append(0)
for line in range(numberCashiers):
if not checkOutLines[line].isEmpty():
entry = checkOutLines[line].removeRear()
checkOutLines[numberCashiers].enqueue(entry)
cashierLoad[numberCashiers] += entry[0]
numberCashiers += 1
# check to see if any cashiers become idle
updateCashiersState(clock, cashierIdleList, cashierIdleEventQueue,
customersServedByCashierList)
# start next customers if cashier is idle
startCustomersAtIdleCashiers(clock, checkOutLines, cashierIdleList,
cashierIdleEventQueue, totalWaiting)
for c in range(numberCashiers):
if cashierLoad[c] > 0:
cashierLoad[c] -= 1
# print('clock:', clock )
# print(cashierLoad)
# print(cashierIdleList)
# print(customersServedByCashierList)
# for c in range(numberCashiers):
# print(c, ': ', checkOutLines[c])
# print()
printSimulationSummary(checkOutLines, customersServedByCashierList, totalWaiting)
def customerArrived(probabilityOfArrival):
""" Returns a Boolean indicating whether a customer arrives at check-out lines
within the current minute randomly based on the probabilityOfArrival. """
randomValue = random.random() # random float in range [0,1)
if randomValue < probabilityOfArrival:
return True
else:
return False
def addCustomerToShortestLineWithTime(checkOutLines, cashierLoad, averageCustomerTime, clock, lineLengthThreshold):
""" Determines the duration of the arriving customer's check-out time, then
adds the customer to the rear of the shortest check-out line. """
checkOutDuration = random.randint(1, round(2*averageCustomerTime))
smallestLine = determineSmallestLine(cashierLoad)
cashierLoad[smallestLine] += checkOutDuration
checkOutLines[smallestLine].enqueue([checkOutDuration, clock])
if checkOutLines[smallestLine].size() >= lineLengthThreshold:
return True
else:
return False
def determineSmallestLine(cashierLoad):
""" Returns the cashier # with the fewest load in their check-out line. """
smallestLine = 0
for line in range(1, len(cashierLoad)):
if cashierLoad[line] < cashierLoad[smallestLine]:
smallestLine = line
return smallestLine
def updateCashiersState(clock, cashierIdleList, cashierIdleEventQueue,
customersServedByCashierList):
""" Check whether cashiers become idle in this clock tick, and update their
state if they do. """
while not cashierIdleEventQueue.isEmpty() and cashierIdleEventQueue.peek().getPriority() <= clock:
cashierGoingIdle = cashierIdleEventQueue.dequeue().getValue()
cashierIdleList[cashierGoingIdle] = True
customersServedByCashierList[cashierGoingIdle] += 1
#print("Cashier", cashierGoingIdle, "going idle at time", clock)
def startCustomersAtIdleCashiers(clock, checkOutLines, cashierIdleList, cashierIdleEventQueue, totalWaiting):
""" Start the next customer at cashiers which are idle and have someone in their
check-out line. """
for cashier in range(len(cashierIdleList)):
if cashierIdleList[cashier] and not checkOutLines[cashier].isEmpty():
[customerServeTime, startTime] = checkOutLines[cashier].dequeue()
totalWaiting[cashier] += clock - startTime
cashierIdleList[cashier] = False
# schedule future event for when cashier will go idle
cashierIdleEventQueue.enqueue(PriorityQueueEntry(clock+customerServeTime+1,cashier))
def printSimulationSummary(checkOutLines, customersServedByCashierList, totalWaiting):
""" For each cashier, prints the # of customers served and the # of customers
still waiting in their check-out line when the simulation ends. """
for cashier in range(len(checkOutLines)):
waitingTime = 0
if customersServedByCashierList[cashier] > 0:
waitingTime = totalWaiting[cashier] / customersServedByCashierList[cashier]
print("\nCashier", cashier, "checked out", customersServedByCashierList[cashier],
"customers with", checkOutLines[cashier].size(),
"customers in their line at end of simulation. Average waiting time:", waitingTime)
main() # start simulation running