hrp\pkg\boomer\runner
reportStats
func (r *runner) reportStats() {
data := r.stats.collectReportData()
data["user_count"] = r.controller.getCurrentClientsNum()
data["state"] = atomic.LoadInt32(&r.state)
r.outputOnEvent(data)
}
spawnWorkers
func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan bool, spawnCompleteFunc func())
for {
if r.isStarting() && r.controller.acquire() {
r.goAttach(func() {
for {
select {
case <-quit:
r.controller.increaseFinishedCount()
return
default:
if workerLoop != nil && !workerLoop.acquire() {
r.controller.increaseFinishedCount()
return
}
if r.rateLimitEnabled {
blocked := r.rateLimiter.Acquire()
if !blocked {
task := r.getTask()
r.safeRun(task.Fn)
}
} else {
task := r.getTask()
r.safeRun(task.Fn)
}
if workerLoop != nil {
// finished count of total
r.loop.increaseFinishedCount()
// finished count of single worker
workerLoop.increaseFinishedCount()
if r.loop.isFinished() {
go r.stop()
r.controller.increaseFinishedCount()
return
}
}
if r.controller.erase() {
return
}
}
}
})
continue
<-r.controller.getRebalanceChan()
if r.isStarting() {
// rebalance spawn count
r.controller.setSpawn(r.getSpawnCount(), r.getSpawnRate())
}
getTask
func (r *runner) getTask() *Task {
tasksCount := len(r.tasks)
if tasksCount == 0 {
log.Error().Msg("no valid testcase found")
os.Exit(1)
} else if tasksCount == 1 {
// Fast path
return r.tasks[0]
}
rs := rand.New(rand.NewSource(time.Now().UnixNano()))
totalWeight := r.totalTaskWeight
if totalWeight <= 0 {
// If all the tasks have not weights defined, they have the same chance to run
randNum := rs.Intn(tasksCount)
return r.tasks[randNum]
}
randNum := rs.Intn(totalWeight)
runningSum := 0
for _, task := range r.tasks {
runningSum += task.Weight
if runningSum > randNum {
return task
}
}
statsStart
func (r *runner) statsStart() {
ticker := time.NewTicker(reportStatsInterval)
for {
select {
case <-ticker.C:
r.reportStats()
// close reportedChan and return if the last stats is reported successfully
if !r.isStarting() && !r.isStopping() {
close(r.reportedChan)
log.Info().Msg("Quitting statsStart")
return
}
stop
func (r *runner) stop() {
// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
r.gracefulStop()
if r.rateLimitEnabled {
r.rateLimiter.Stop()
}
r.updateState(StateStopped)
}
func (r *runner) gracefulStop() {
select {
case r.stopChan <- true:
case <-r.doneChan:
return
}
<-r.doneChan
}