Practice Discrete Event Simulation Using FIFO And Priority Queues in Python Assignment Solution.

## Instructions

Objective
Write a python homework to practice discrete event simulation using FIFO and priority queues.

## Requirements and Specifications

Source Code

```BINARY HEAP # Bradley N. Miller, David L. Ranum # Introduction to Data Structures and Algorithms in Python # Copyright 2005 # import unittest # this heap takes key value pairs, we will assume that the keys are integers class BinHeap:     def __init__(self):         self.heapList = [0]         self.currentSize = 0     def buildHeap(self,alist):         i = len(alist) // 2         self.currentSize = len(alist)         self.heapList = [0] + alist[:]         print(len(self.heapList), i)         while (i > 0):             print(self.heapList, i)             self.percDown(i)             i = i - 1         print(self.heapList,i)     def percDown(self,i):         while (i * 2) <= self.currentSize:             mc = self.minChild(i)             if self.heapList[i] > self.heapList[mc]:                 tmp = self.heapList[i]                 self.heapList[i] = self.heapList[mc]                 self.heapList[mc] = tmp             i = mc     def minChild(self,i):         if i * 2 + 1 > self.currentSize:             return i * 2         else:             if self.heapList[i * 2] < self.heapList[i * 2 + 1]:                 return i * 2             else:                 return i * 2 + 1     def percUp(self,i):         while i // 2 > 0:             if self.heapList[i] < self.heapList[i//2]:                tmp = self.heapList[i // 2]                self.heapList[i // 2] = self.heapList[i]                self.heapList[i] = tmp             i = i // 2     def insert(self,k):         self.heapList.append(k)         self.currentSize = self.currentSize + 1         self.percUp(self.currentSize)     def delMin(self):         retval = self.heapList[1]         self.heapList[1] = self.heapList[self.currentSize]         self.currentSize = self.currentSize - 1         self.heapList.pop()         self.percDown(1)         return retval     def peekMin(self):         return self.heapList[1]     def isEmpty(self):         if self.currentSize == 0:             return True         else:             return False     def size(self):         return self.currentSize     def __str__(self):         return str(self.heapList[1:]) class FooThing:     def __init__(self,x,y):         self.key = x         self.val = y     def getKey(self):         return self.key     def getValue(self):         return self.val     def setValue(self, newValue):         self.val = newValue     def __lt__(self,other):         if self.key < other.key:             return True         else:             return False     def __gt__(self,other):         if self.key > other.key:             return True         else:             return False     def __hash__(self):         return self.key class TestBinHeap(unittest.TestCase):     def setUp(self):         self.theHeap = BinHeap()         self.theHeap.insert(FooThing(5,'a'))         self.theHeap.insert(FooThing(9,'d'))         self.theHeap.insert(FooThing(1,'x'))         self.theHeap.insert(FooThing(2,'y'))         self.theHeap.insert(FooThing(3,'z'))     def testInsert(self):         assert self.theHeap.currentSize == 5     def testDelmin(self):         assert self.theHeap.delMin().getValue() == 'x'         assert self.theHeap.delMin().getValue() == 'y'         assert self.theHeap.delMin().getValue() == 'z'         assert self.theHeap.delMin().getValue() == 'a'     def testMixed(self):         myHeap = BinHeap()         myHeap.insert(9)         myHeap.insert(1)         myHeap.insert(5)         assert myHeap.delMin() == 1         myHeap.insert(2)         myHeap.insert(7)         assert myHeap.delMin() == 2         assert myHeap.delMin() == 5     def testDupes(self):         myHeap = BinHeap()         myHeap.insert(9)         myHeap.insert(1)         myHeap.insert(8)         myHeap.insert(1)         assert myHeap.currentSize == 4         assert myHeap.delMin() == 1         assert myHeap.delMin() == 1         assert myHeap.delMin() == 8     def testBuildHeap(self):         myHeap = BinHeap()         myHeap.buildHeap([9,5,6,2,3])         f = myHeap.delMin()         print("f = ", f)         assert f == 2         assert myHeap.delMin() == 3         assert myHeap.delMin() == 5         assert myHeap.delMin() == 6         assert myHeap.delMin() == 9 if __name__ == '__main__': ## d = {} ## d[FooThing(1,'z')] = 10 ## unittest.main()     suite = unittest.makeSuite(TestBinHeap)     unittest.TextTestRunner().run(suite) PRIORITY QUEUE ### File: priority_queue.py from binheap import BinHeap class PriorityQueue:     def __init__(self):         self._heap = BinHeap()     def isEmpty(self):         return self._heap.isEmpty()     def enqueue(self, item):         self._heap.insert(item)     def dequeue(self):         return self._heap.delMin()     def peek(self):         return self._heap.peekMin()     def size(self):         return self._heap.size()     def __str__(self):         return str(self._heap) class PriorityQueueEntry:     def __init__(self,priority,value):         self.priority = priority         self.val = value     def getPriority(self):         return self.priority     def getValue(self):         return self.val     def setValue(self, newValue):         self.val = newValue     def __lt__(self,other):         if self.priority < other.priority:             return True         else:             return False     def __gt__(self,other):         if self.priority > other.priority:             return True         else:             return False     def __hash__(self):         return self.key import unittest class TestPriorityQueue(unittest.TestCase):     def setUp(self):         self.pq = PriorityQueue()         self.pq.enqueue(PriorityQueueEntry(5,'a'))         self.pq.enqueue(PriorityQueueEntry(9,'d'))         self.pq.enqueue(PriorityQueueEntry(1,'x'))         self.pq.enqueue(PriorityQueueEntry(2,'y'))         self.pq.enqueue(PriorityQueueEntry(3,'z'))     def testInsert(self):         assert self.pq.size() == 5     def testPeek(self):         assert self.pq.peek().getValue() == 'x'         assert self.pq.peek().getValue() == 'x'         assert self.pq.peek().getValue() == 'x'         assert self.pq.peek().getValue() == 'x'     def testDelmin(self):         assert self.pq.dequeue().getValue() == 'x'         assert self.pq.dequeue().getValue() == 'y'         assert self.pq.dequeue().getValue() == 'z'         assert self.pq.dequeue().getValue() == 'a'     def testMixed(self):         myPQ = PriorityQueue()         myPQ.enqueue(9)         myPQ.enqueue(1)         myPQ.enqueue(5)         assert myPQ.dequeue() == 1         myPQ.enqueue(2)         myPQ.enqueue(7)         assert myPQ.dequeue() == 2         assert myPQ.dequeue() == 5     def testDupes(self):         myPQ = PriorityQueue()         myPQ.enqueue(9)         myPQ.enqueue(1)         myPQ.enqueue(8)         myPQ.enqueue(1)         assert myPQ.size() == 4         assert myPQ.dequeue() == 1         assert myPQ.dequeue() == 1         assert myPQ.dequeue() == 8 ## def testBuildHeap(self): ## myPQ = priorityQueue() ## myPQ.buildHeap([9,5,6,2,3]) ## f = myPQ.dequeue() ## print("f = ", f) ## assert f == 2 ## assert myPQ.dequeue() == 3 ## assert myPQ.dequeue() == 5 ## assert myPQ.dequeue() == 6 ## assert myPQ.dequeue() == 9 if __name__ == '__main__': ## d = {} ## d[PriorityQueueEntry(1,'z')] = 10 ## unittest.main()     suite = unittest.makeSuite(TestPriorityQueue)     unittest.TextTestRunner().run(suite) PROJECT 3 """ File: project3.py     Description: Project 3. Discrete-Event Simulation of multiple check-out lines. """ from priority_queue import PriorityQueue from priority_queue import PriorityQueueEntry from queue_text import QueueText import random def main():     # Gather simulation parameters from the user     totalNumberCashiers = int(input("Enter total # of cashiers: "))     numberCashiers = int(input("Enter # of open cashiers: "))     lineLengthThreshold = int(input("Enter limit of queue length for opening new cashier: "))     simulationLength = int(input("Enter simulation length (in minutes): "))     probabilityOfArrival = float(input("Enter probability of customer arrival each minute (0 to 1): "))     averageCustomerTime = int(input("Enter average # minutes for a customer: "))     # Set-up data structures needed by the simulation     # holds cashier-idle events "sorted" by event time containing value of cashier # going idle     cashierIdleEventQueue = PriorityQueue()     # parallel lists (i.e., index 0 of each list comprises state of cashier 0, etc.) to     # store the state of all cashiers:     customersServedByCashierList = [] # number of customers served by each cashier     cashierIdleList = [] # whether the cashier is idle (True) or busy (False)     checkOutLines = [] # FIFO queue to hold waiting customers in cashier's                                        # check-out line. Holds a customer's check-out duration     cashierLoad = [] # current total load of cashier     totalWaiting = [] # total waiting time for all customers of cashier     # for each cashier set-up their initial state     for cashier in range(numberCashiers):         checkOutLines.append(QueueText()) # create empty queue for each cashier         cashierIdleList.append(True) # each cashiers initially idle         customersServedByCashierList.append(0) # tracks # customers served per cashier         cashierLoad.append(0)         totalWaiting.append(0)     # run simulation for each minute     for clock in range(simulationLength):         # see if a customer arrives and add them to the shortest line if they do         if customerArrived(probabilityOfArrival):             if addCustomerToShortestLineWithTime(checkOutLines, cashierLoad, averageCustomerTime, clock, lineLengthThreshold):                 if numberCashiers < totalNumberCashiers:                     checkOutLines.append(QueueText())                     cashierIdleList.append(True)                     customersServedByCashierList.append(0)                     cashierLoad.append(0)                     totalWaiting.append(0)                     for line in range(numberCashiers):                         if not checkOutLines[line].isEmpty():                             entry = checkOutLines[line].removeRear()                             checkOutLines[numberCashiers].enqueue(entry)                             cashierLoad[numberCashiers] += entry[0]                     numberCashiers += 1         # check to see if any cashiers become idle         updateCashiersState(clock, cashierIdleList, cashierIdleEventQueue,                            customersServedByCashierList)         # start next customers if cashier is idle         startCustomersAtIdleCashiers(clock, checkOutLines, cashierIdleList,                                     cashierIdleEventQueue, totalWaiting)         for c in range(numberCashiers):             if cashierLoad[c] > 0:                 cashierLoad[c] -= 1         # print('clock:', clock )         # print(cashierLoad)         # print(cashierIdleList)         # print(customersServedByCashierList)         # for c in range(numberCashiers):         # print(c, ': ', checkOutLines[c])         # print()     printSimulationSummary(checkOutLines, customersServedByCashierList, totalWaiting) def customerArrived(probabilityOfArrival):     """ Returns a Boolean indicating whether a customer arrives at check-out lines         within the current minute randomly based on the probabilityOfArrival. """     randomValue = random.random() # random float in range [0,1)     if randomValue < probabilityOfArrival:         return True     else:         return False def addCustomerToShortestLineWithTime(checkOutLines, cashierLoad, averageCustomerTime, clock, lineLengthThreshold):     """ Determines the duration of the arriving customer's check-out time, then         adds the customer to the rear of the shortest check-out line. """     checkOutDuration = random.randint(1, round(2*averageCustomerTime))     smallestLine = determineSmallestLine(cashierLoad)     cashierLoad[smallestLine] += checkOutDuration     checkOutLines[smallestLine].enqueue([checkOutDuration, clock])     if checkOutLines[smallestLine].size() >= lineLengthThreshold:         return True     else:         return False def determineSmallestLine(cashierLoad):     """ Returns the cashier # with the fewest load in their check-out line. """     smallestLine = 0     for line in range(1, len(cashierLoad)):         if cashierLoad[line] < cashierLoad[smallestLine]:             smallestLine = line     return smallestLine def updateCashiersState(clock, cashierIdleList, cashierIdleEventQueue,                         customersServedByCashierList):     """ Check whether cashiers become idle in this clock tick, and update their         state if they do. """     while not cashierIdleEventQueue.isEmpty() and cashierIdleEventQueue.peek().getPriority() <= clock:         cashierGoingIdle = cashierIdleEventQueue.dequeue().getValue()         cashierIdleList[cashierGoingIdle] = True         customersServedByCashierList[cashierGoingIdle] += 1         #print("Cashier", cashierGoingIdle, "going idle at time", clock) def startCustomersAtIdleCashiers(clock, checkOutLines, cashierIdleList, cashierIdleEventQueue, totalWaiting):     """ Start the next customer at cashiers which are idle and have someone in their         check-out line. """     for cashier in range(len(cashierIdleList)):         if cashierIdleList[cashier] and not checkOutLines[cashier].isEmpty():             [customerServeTime, startTime] = checkOutLines[cashier].dequeue()             totalWaiting[cashier] += clock - startTime             cashierIdleList[cashier] = False             # schedule future event for when cashier will go idle             cashierIdleEventQueue.enqueue(PriorityQueueEntry(clock+customerServeTime+1,cashier)) def printSimulationSummary(checkOutLines, customersServedByCashierList, totalWaiting):     """ For each cashier, prints the # of customers served and the # of customers         still waiting in their check-out line when the simulation ends. """     for cashier in range(len(checkOutLines)):         waitingTime = 0         if customersServedByCashierList[cashier] > 0:             waitingTime = totalWaiting[cashier] / customersServedByCashierList[cashier]         print("\nCashier", cashier, "checked out", customersServedByCashierList[cashier],               "customers with", checkOutLines[cashier].size(),               "customers in their line at end of simulation. Average waiting time:", waitingTime) main() # start simulation running```