+1 (315) 557-6473 

Practice Discrete Event Simulation Using FIFO And Priority Queues in Python Assignment Solution.


Instructions

Objective
Write a python homework to practice discrete event simulation using FIFO and priority queues.

Requirements and Specifications

program to practice discrete event simulation using FIFO and priority queues in python
program to practice discrete event simulation using FIFO and priority queues in python 1

Screenshots

program to practice discrete event simulation using FIFO and priority queues in python 2

Source Code

BINARY HEAP

# Bradley N. Miller, David L. Ranum

# Introduction to Data Structures and Algorithms in Python

# Copyright 2005

#

import unittest

# this heap takes key value pairs, we will assume that the keys are integers

class BinHeap:

    def __init__(self):

        self.heapList = [0]

        self.currentSize = 0

    def buildHeap(self,alist):

        i = len(alist) // 2

        self.currentSize = len(alist)

        self.heapList = [0] + alist[:]

        print(len(self.heapList), i)

        while (i > 0):

            print(self.heapList, i)

            self.percDown(i)

            i = i - 1

        print(self.heapList,i)

    def percDown(self,i):

        while (i * 2) <= self.currentSize:

            mc = self.minChild(i)

            if self.heapList[i] > self.heapList[mc]:

                tmp = self.heapList[i]

                self.heapList[i] = self.heapList[mc]

                self.heapList[mc] = tmp

            i = mc

    def minChild(self,i):

        if i * 2 + 1 > self.currentSize:

            return i * 2

        else:

            if self.heapList[i * 2] < self.heapList[i * 2 + 1]:

                return i * 2

            else:

                return i * 2 + 1

    def percUp(self,i):

        while i // 2 > 0:

            if self.heapList[i] < self.heapList[i//2]:

               tmp = self.heapList[i // 2]

               self.heapList[i // 2] = self.heapList[i]

               self.heapList[i] = tmp

            i = i // 2

    def insert(self,k):

        self.heapList.append(k)

        self.currentSize = self.currentSize + 1

        self.percUp(self.currentSize)

    def delMin(self):

        retval = self.heapList[1]

        self.heapList[1] = self.heapList[self.currentSize]

        self.currentSize = self.currentSize - 1

        self.heapList.pop()

        self.percDown(1)

        return retval

    def peekMin(self):

        return self.heapList[1]

    def isEmpty(self):

        if self.currentSize == 0:

            return True

        else:

            return False

    def size(self):

        return self.currentSize

    def __str__(self):

        return str(self.heapList[1:])

class FooThing:

    def __init__(self,x,y):

        self.key = x

        self.val = y

    def getKey(self):

        return self.key

    def getValue(self):

        return self.val

    def setValue(self, newValue):

        self.val = newValue

    def __lt__(self,other):

        if self.key < other.key:

            return True

        else:

            return False

    def __gt__(self,other):

        if self.key > other.key:

            return True

        else:

            return False

    def __hash__(self):

        return self.key

class TestBinHeap(unittest.TestCase):

    def setUp(self):

        self.theHeap = BinHeap()

        self.theHeap.insert(FooThing(5,'a'))

        self.theHeap.insert(FooThing(9,'d'))

        self.theHeap.insert(FooThing(1,'x'))

        self.theHeap.insert(FooThing(2,'y'))

        self.theHeap.insert(FooThing(3,'z'))

    def testInsert(self):

        assert self.theHeap.currentSize == 5

    def testDelmin(self):

        assert self.theHeap.delMin().getValue() == 'x'

        assert self.theHeap.delMin().getValue() == 'y'

        assert self.theHeap.delMin().getValue() == 'z'

        assert self.theHeap.delMin().getValue() == 'a'

    def testMixed(self):

        myHeap = BinHeap()

        myHeap.insert(9)

        myHeap.insert(1)

        myHeap.insert(5)

        assert myHeap.delMin() == 1

        myHeap.insert(2)

        myHeap.insert(7)

        assert myHeap.delMin() == 2

        assert myHeap.delMin() == 5

    def testDupes(self):

        myHeap = BinHeap()

        myHeap.insert(9)

        myHeap.insert(1)

        myHeap.insert(8)

        myHeap.insert(1)

        assert myHeap.currentSize == 4

        assert myHeap.delMin() == 1

        assert myHeap.delMin() == 1

        assert myHeap.delMin() == 8

    def testBuildHeap(self):

        myHeap = BinHeap()

        myHeap.buildHeap([9,5,6,2,3])

        f = myHeap.delMin()

        print("f = ", f)

        assert f == 2

        assert myHeap.delMin() == 3

        assert myHeap.delMin() == 5

        assert myHeap.delMin() == 6

        assert myHeap.delMin() == 9

if __name__ == '__main__':

## d = {}

## d[FooThing(1,'z')] = 10

## unittest.main()

    suite = unittest.makeSuite(TestBinHeap)

    unittest.TextTestRunner().run(suite)

PRIORITY QUEUE

### File: priority_queue.py

from binheap import BinHeap

class PriorityQueue:

    def __init__(self):

        self._heap = BinHeap()

    def isEmpty(self):

        return self._heap.isEmpty()

    def enqueue(self, item):

        self._heap.insert(item)

    def dequeue(self):

        return self._heap.delMin()

    def peek(self):

        return self._heap.peekMin()

    def size(self):

        return self._heap.size()

    def __str__(self):

        return str(self._heap)

class PriorityQueueEntry:

    def __init__(self,priority,value):

        self.priority = priority

        self.val = value

    def getPriority(self):

        return self.priority

    def getValue(self):

        return self.val

    def setValue(self, newValue):

        self.val = newValue

    def __lt__(self,other):

        if self.priority < other.priority:

            return True

        else:

            return False

    def __gt__(self,other):

        if self.priority > other.priority:

            return True

        else:

            return False

    def __hash__(self):

        return self.key

import unittest

class TestPriorityQueue(unittest.TestCase):

    def setUp(self):

        self.pq = PriorityQueue()

        self.pq.enqueue(PriorityQueueEntry(5,'a'))

        self.pq.enqueue(PriorityQueueEntry(9,'d'))

        self.pq.enqueue(PriorityQueueEntry(1,'x'))

        self.pq.enqueue(PriorityQueueEntry(2,'y'))

        self.pq.enqueue(PriorityQueueEntry(3,'z'))

    def testInsert(self):

        assert self.pq.size() == 5

    def testPeek(self):

        assert self.pq.peek().getValue() == 'x'

        assert self.pq.peek().getValue() == 'x'

        assert self.pq.peek().getValue() == 'x'

        assert self.pq.peek().getValue() == 'x'

    def testDelmin(self):

        assert self.pq.dequeue().getValue() == 'x'

        assert self.pq.dequeue().getValue() == 'y'

        assert self.pq.dequeue().getValue() == 'z'

        assert self.pq.dequeue().getValue() == 'a'

    def testMixed(self):

        myPQ = PriorityQueue()

        myPQ.enqueue(9)

        myPQ.enqueue(1)

        myPQ.enqueue(5)

        assert myPQ.dequeue() == 1

        myPQ.enqueue(2)

        myPQ.enqueue(7)

        assert myPQ.dequeue() == 2

        assert myPQ.dequeue() == 5

    def testDupes(self):

        myPQ = PriorityQueue()

        myPQ.enqueue(9)

        myPQ.enqueue(1)

        myPQ.enqueue(8)

        myPQ.enqueue(1)

        assert myPQ.size() == 4

        assert myPQ.dequeue() == 1

        assert myPQ.dequeue() == 1

        assert myPQ.dequeue() == 8

## def testBuildHeap(self):

## myPQ = priorityQueue()

## myPQ.buildHeap([9,5,6,2,3])

## f = myPQ.dequeue()

## print("f = ", f)

## assert f == 2

## assert myPQ.dequeue() == 3

## assert myPQ.dequeue() == 5

## assert myPQ.dequeue() == 6

## assert myPQ.dequeue() == 9

if __name__ == '__main__':

## d = {}

## d[PriorityQueueEntry(1,'z')] = 10

## unittest.main()

    suite = unittest.makeSuite(TestPriorityQueue)

    unittest.TextTestRunner().run(suite)

PROJECT 3

""" File: project3.py

    Description: Project 3. Discrete-Event Simulation of multiple check-out lines.

"""

from priority_queue import PriorityQueue

from priority_queue import PriorityQueueEntry

from queue_text import QueueText

import random

def main():

    # Gather simulation parameters from the user

    totalNumberCashiers = int(input("Enter total # of cashiers: "))

    numberCashiers = int(input("Enter # of open cashiers: "))

    lineLengthThreshold = int(input("Enter limit of queue length for opening new cashier: "))

    simulationLength = int(input("Enter simulation length (in minutes): "))

    probabilityOfArrival = float(input("Enter probability of customer arrival each minute (0 to 1): "))

    averageCustomerTime = int(input("Enter average # minutes for a customer: "))

    # Set-up data structures needed by the simulation

    # holds cashier-idle events "sorted" by event time containing value of cashier # going idle

    cashierIdleEventQueue = PriorityQueue()

    # parallel lists (i.e., index 0 of each list comprises state of cashier 0, etc.) to

    # store the state of all cashiers:

    customersServedByCashierList = [] # number of customers served by each cashier

    cashierIdleList = [] # whether the cashier is idle (True) or busy (False)

    checkOutLines = [] # FIFO queue to hold waiting customers in cashier's

                                       # check-out line. Holds a customer's check-out duration

    cashierLoad = [] # current total load of cashier

    totalWaiting = [] # total waiting time for all customers of cashier

    # for each cashier set-up their initial state

    for cashier in range(numberCashiers):

        checkOutLines.append(QueueText()) # create empty queue for each cashier

        cashierIdleList.append(True) # each cashiers initially idle

        customersServedByCashierList.append(0) # tracks # customers served per cashier

        cashierLoad.append(0)

        totalWaiting.append(0)

    # run simulation for each minute

    for clock in range(simulationLength):

        # see if a customer arrives and add them to the shortest line if they do

        if customerArrived(probabilityOfArrival):

            if addCustomerToShortestLineWithTime(checkOutLines, cashierLoad, averageCustomerTime, clock, lineLengthThreshold):

                if numberCashiers < totalNumberCashiers:

                    checkOutLines.append(QueueText())

                    cashierIdleList.append(True)

                    customersServedByCashierList.append(0)

                    cashierLoad.append(0)

                    totalWaiting.append(0)

                    for line in range(numberCashiers):

                        if not checkOutLines[line].isEmpty():

                            entry = checkOutLines[line].removeRear()

                            checkOutLines[numberCashiers].enqueue(entry)

                            cashierLoad[numberCashiers] += entry[0]

                    numberCashiers += 1

        # check to see if any cashiers become idle

        updateCashiersState(clock, cashierIdleList, cashierIdleEventQueue,

                           customersServedByCashierList)

        # start next customers if cashier is idle

        startCustomersAtIdleCashiers(clock, checkOutLines, cashierIdleList,

                                    cashierIdleEventQueue, totalWaiting)

        for c in range(numberCashiers):

            if cashierLoad[c] > 0:

                cashierLoad[c] -= 1

        # print('clock:', clock )

        # print(cashierLoad)

        # print(cashierIdleList)

        # print(customersServedByCashierList)

        # for c in range(numberCashiers):

        # print(c, ': ', checkOutLines[c])

        # print()

    printSimulationSummary(checkOutLines, customersServedByCashierList, totalWaiting)

def customerArrived(probabilityOfArrival):

    """ Returns a Boolean indicating whether a customer arrives at check-out lines

        within the current minute randomly based on the probabilityOfArrival. """

    randomValue = random.random() # random float in range [0,1)

    if randomValue < probabilityOfArrival:

        return True

    else:

        return False

def addCustomerToShortestLineWithTime(checkOutLines, cashierLoad, averageCustomerTime, clock, lineLengthThreshold):

    """ Determines the duration of the arriving customer's check-out time, then

        adds the customer to the rear of the shortest check-out line. """

    checkOutDuration = random.randint(1, round(2*averageCustomerTime))

    smallestLine = determineSmallestLine(cashierLoad)

    cashierLoad[smallestLine] += checkOutDuration

    checkOutLines[smallestLine].enqueue([checkOutDuration, clock])

    if checkOutLines[smallestLine].size() >= lineLengthThreshold:

        return True

    else:

        return False

def determineSmallestLine(cashierLoad):

    """ Returns the cashier # with the fewest load in their check-out line. """

    smallestLine = 0

    for line in range(1, len(cashierLoad)):

        if cashierLoad[line] < cashierLoad[smallestLine]:

            smallestLine = line

    return smallestLine

def updateCashiersState(clock, cashierIdleList, cashierIdleEventQueue,

                        customersServedByCashierList):

    """ Check whether cashiers become idle in this clock tick, and update their

        state if they do. """

    while not cashierIdleEventQueue.isEmpty() and cashierIdleEventQueue.peek().getPriority() <= clock:

        cashierGoingIdle = cashierIdleEventQueue.dequeue().getValue()

        cashierIdleList[cashierGoingIdle] = True

        customersServedByCashierList[cashierGoingIdle] += 1

        #print("Cashier", cashierGoingIdle, "going idle at time", clock)

def startCustomersAtIdleCashiers(clock, checkOutLines, cashierIdleList, cashierIdleEventQueue, totalWaiting):

    """ Start the next customer at cashiers which are idle and have someone in their

        check-out line. """

    for cashier in range(len(cashierIdleList)):

        if cashierIdleList[cashier] and not checkOutLines[cashier].isEmpty():

            [customerServeTime, startTime] = checkOutLines[cashier].dequeue()

            totalWaiting[cashier] += clock - startTime

            cashierIdleList[cashier] = False

            # schedule future event for when cashier will go idle

            cashierIdleEventQueue.enqueue(PriorityQueueEntry(clock+customerServeTime+1,cashier))

def printSimulationSummary(checkOutLines, customersServedByCashierList, totalWaiting):

    """ For each cashier, prints the # of customers served and the # of customers

        still waiting in their check-out line when the simulation ends. """

    for cashier in range(len(checkOutLines)):

        waitingTime = 0

        if customersServedByCashierList[cashier] > 0:

            waitingTime = totalWaiting[cashier] / customersServedByCashierList[cashier]

        print("\nCashier", cashier, "checked out", customersServedByCashierList[cashier],

              "customers with", checkOutLines[cashier].size(),

              "customers in their line at end of simulation. Average waiting time:", waitingTime)

main() # start simulation running