Skip to content
This repository was archived by the owner on Mar 11, 2019. It is now read-only.

Commit ee3dc34

Browse files
committed
fix(LinuxHelper): fix a bug in the LinuxHelper
The application crashed when the getProcesses method was called with a non-existing application fix(PowerSpy): fix a problem with Process/Application targets The idle power was added to the dynamic power for these targets, it was not correct refactoring(targets): add missing toString method Add a toString method to each target refactoring(PowerChannel): modify the AggregatePowerReport Given the fact the AggregatePowerReport aggregates data from several targets and sources, all these informations are kept in order to de displayed to the end-user.
1 parent 9603790 commit ee3dc34

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+215
-330
lines changed

powerapi-core/src/main/scala/org/powerapi/PowerMeter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,5 +239,5 @@ trait PowerDisplay {
239239
/**
240240
* Displays data from power reports.
241241
*/
242-
def display(timestamp: Long, target: Target, device: String, power: Power)
242+
def display(timestamp: Long, targets: Set[Target], devices: Set[String], power: Power)
243243
}

powerapi-core/src/main/scala/org/powerapi/core/MonitorActors.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,12 @@ import akka.event.LoggingReceive
2828
import java.util.UUID
2929
import org.powerapi.core.ClockChannel.ClockTick
3030
import org.powerapi.core.power._
31-
import org.powerapi.module.PowerChannel.PowerReport
3231
import org.powerapi.core.target.Target
3332
import org.powerapi.{PowerDisplay, PowerMonitoring}
3433
import org.powerapi.core.ClockChannel.{startClock, stopClock, subscribeClockTick, unsubscribeClockTick}
3534
import org.powerapi.core.MonitorChannel.{MonitorAggFunction, MonitorStart, MonitorStarted, MonitorStop, MonitorStopAll, formatMonitorChildName, subscribeMonitorsChannel}
3635
import org.powerapi.core.MonitorChannel.{publishMonitorTick, setAggFunction, stopMonitor}
37-
import org.powerapi.module.PowerChannel.{AggregateReport, render, subscribePowerReport, unsubscribePowerReport, subscribeAggPowerReport}
36+
import org.powerapi.module.PowerChannel.{AggregatePowerReport, RawPowerReport, render, subscribeRawPowerReport, unsubscribeRawPowerReport, subscribeAggPowerReport}
3837
import org.powerapi.module.SensorChannel.{monitorAllStopped, monitorStopped}
3938
import org.powerapi.reporter.ReporterComponent
4039
import scala.concurrent.duration.FiniteDuration
@@ -58,10 +57,10 @@ class MonitorChild(eventBus: MessageBus,
5857
/**
5958
* Running state.
6059
*/
61-
def running(aggR: AggregateReport, aggFunction: Seq[Power] => Power): Actor.Receive = LoggingReceive {
60+
def running(aggR: AggregatePowerReport, aggFunction: Seq[Power] => Power): Actor.Receive = LoggingReceive {
6261
case tick: ClockTick => produceMessages(tick)
6362
case MonitorAggFunction(_, id, aggF) if muid == id => setAggregatingFunction(aggR, aggF)
64-
case powerReport: PowerReport => aggregate(aggR, powerReport, aggFunction)
63+
case powerReport: RawPowerReport => aggregate(aggR, powerReport, aggFunction)
6564
case MonitorStop(_, id) if muid == id => stop()
6665
case _: MonitorStopAll => stop()
6766
} orElse default
@@ -73,15 +72,15 @@ class MonitorChild(eventBus: MessageBus,
7372
def start(): Unit = {
7473
startClock(frequency)(eventBus)
7574
subscribeClockTick(frequency)(eventBus)(self)
76-
subscribePowerReport(muid)(eventBus)(self)
75+
subscribeRawPowerReport(muid)(eventBus)(self)
7776
log.info("monitor is started, muid: {}", muid)
78-
context.become(running(AggregateReport(muid, SUM), SUM))
77+
context.become(running(AggregatePowerReport(muid, SUM), SUM))
7978
}
8079

8180
/**
8281
* Change the aggregating function for this monitor
8382
*/
84-
def setAggregatingFunction(aggR: AggregateReport, aggF: Seq[Power] => Power): Unit = {
83+
def setAggregatingFunction(aggR: AggregatePowerReport, aggF: Seq[Power] => Power): Unit = {
8584
log.info("aggregating function is changed")
8685
context.become(running(aggR, aggF))
8786
}
@@ -97,11 +96,11 @@ class MonitorChild(eventBus: MessageBus,
9796
* Wait to retrieve power reports of all targets from a same monitor to aggregate them
9897
* into once power report.
9998
*/
100-
def aggregate(aggR: AggregateReport, powerReport: PowerReport, aggF: Seq[Power] => Power): Unit = {
99+
def aggregate(aggR: AggregatePowerReport, powerReport: RawPowerReport, aggF: Seq[Power] => Power): Unit = {
101100
aggR += powerReport
102101
if (aggR.size >= targets.size) {
103102
render(aggR)(eventBus)
104-
context.become(running(AggregateReport(muid, aggF), aggF))
103+
context.become(running(AggregatePowerReport(muid, aggF), aggF))
105104
}
106105
else
107106
context.become(running(aggR, aggF))
@@ -114,7 +113,7 @@ class MonitorChild(eventBus: MessageBus,
114113
def stop(): Unit = {
115114
stopClock(frequency)(eventBus)
116115
unsubscribeClockTick(frequency)(eventBus)(self)
117-
unsubscribePowerReport(muid)(eventBus)(self)
116+
unsubscribeRawPowerReport(muid)(eventBus)(self)
118117
log.info("monitor is stopped, muid: {}", muid)
119118
self ! PoisonPill
120119
}

powerapi-core/src/main/scala/org/powerapi/core/OSHelper.scala

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,21 +66,21 @@ trait OSHelper {
6666
/**
6767
* Get the list of frequencies available on the CPU.
6868
*/
69-
def getCPUFrequencies(topology: Map[Int, Iterable[Int]]): Iterable[Long]
69+
def getCPUFrequencies: Set[Long]
7070

7171
/**
7272
* Get the list of processes behind an Application.
7373
*
7474
* @param application: targeted application.
7575
*/
76-
def getProcesses(application: Application): Iterable[Process]
76+
def getProcesses(application: Application): Set[Process]
7777

7878
/**
7979
* Get the list of thread behind a Process.
8080
*
8181
* @param process: targeted process.
8282
*/
83-
def getThreads(process: Process): Iterable[Thread]
83+
def getThreads(process: Process): Set[Thread]
8484

8585
/**
8686
* Get the process execution time on the cpu.
@@ -128,7 +128,7 @@ trait OSHelper {
128128
class LinuxHelper extends OSHelper with Configuration {
129129
private val log = LogManager.getLogger
130130

131-
private val PSFormat = """^\s*(\d+)""".r
131+
private val PSFormat = """^\s*(\d+)\s*""".r
132132
private val GlobalStatFormat = """cpu\s+([\d\s]+)""".r
133133
private val TimeInStateFormat = """(\d+)\s+(\d+)""".r
134134

@@ -177,44 +177,44 @@ class LinuxHelper extends OSHelper with Configuration {
177177
/**
178178
* CPU's topology.
179179
*/
180-
lazy val topology: Map[Int, Iterable[Int]] = load { conf =>
180+
lazy val topology: Map[Int, Set[Int]] = load { conf =>
181181
(for (item: Config <- conf.getConfigList("powerapi.cpu.topology"))
182-
yield (item.getInt("core"), item.getDoubleList("indexes").map(_.toInt).toList)).toMap
182+
yield (item.getInt("core"), item.getDoubleList("indexes").map(_.toInt).toSet)).toMap
183183
} match {
184184
case ConfigValue(values) => values
185185
case _ => Map()
186186
}
187187

188-
def getCPUFrequencies(topology: Map[Int, Iterable[Int]]): Iterable[Long] = {
188+
def getCPUFrequencies: Set[Long] = {
189189
(for(index <- topology.values.flatten) yield {
190190
try {
191191
using(frequenciesPath.replace("%?core", s"$index"))(source => {
192192
log.debug("using {} as a frequencies path", frequenciesPath)
193-
source.getLines.toIndexedSeq(0).split("\\s").toList.map(_.toLong)
193+
source.getLines.toIndexedSeq(0).split("\\s").map(_.toLong).toSet
194194
})
195195
}
196196
catch {
197-
case ioe: IOException => log.warn("i/o exception: {}", ioe.getMessage); List()
197+
case ioe: IOException => log.warn("i/o exception: {}", ioe.getMessage); Set[Long]()
198198
}
199199
}).flatten.toSet
200200
}
201201

202-
def getProcesses(application: Application): Iterable[Process] = {
203-
Seq("ps", "-C", application.name, "-o", "pid", "--no-headers").!!.split("\n").map {
202+
def getProcesses(application: Application): Set[Process] = {
203+
Seq("ps", "-C", application.name, "-o", "pid", "--no-headers").lineStream_!.map {
204204
case PSFormat(pid) => Process(pid.toInt)
205-
}
205+
}.toSet
206206
}
207207

208-
def getThreads(process: Process): Iterable[Thread] = {
208+
def getThreads(process: Process): Set[Thread] = {
209209
val pidDirectory = new File(taskPath.replace("%?pid", s"${process.pid}"))
210210

211211
if (pidDirectory.exists && pidDirectory.isDirectory) {
212212
/**
213213
* The pid is removed because it corresponds to the main thread.
214214
*/
215-
pidDirectory.listFiles.filter(dir => dir.isDirectory && dir.getName != s"${process.pid}").map(dir => Thread(dir.getName.toInt))
215+
pidDirectory.listFiles.filter(dir => dir.isDirectory && dir.getName != s"${process.pid}").map(dir => Thread(dir.getName.toInt)).toSet
216216
}
217-
else Iterable[Thread]()
217+
else Set[Thread]()
218218
}
219219

220220
def getProcessCpuTime(process: Process): Option[Long] = {

powerapi-core/src/main/scala/org/powerapi/core/target/Target.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ trait Target
3838
* @author <a href="mailto:romain.rouvoy@univ-lille1.fr">Romain Rouvoy</a>
3939
* @author <a href="mailto:maxime.colmant@gmail.com">Maxime Colmant</a>
4040
*/
41-
case class Process(pid: Int) extends Target
41+
case class Process(pid: Int) extends Target {
42+
override def toString(): String = s"$pid"
43+
}
4244

4345
/**
4446
* Monitoring target for a specific application.
@@ -48,7 +50,9 @@ case class Process(pid: Int) extends Target
4850
* @author <a href="mailto:romain.rouvoy@univ-lille1.fr">Romain Rouvoy</a>
4951
* @author <a href="mailto:maxime.colmant@gmail.com">Maxime Colmant</a>
5052
*/
51-
case class Application(name: String) extends Target
53+
case class Application(name: String) extends Target {
54+
override def toString(): String = name
55+
}
5256

5357
/**
5458
* Target usage ratio.

powerapi-core/src/main/scala/org/powerapi/module/PowerChannel.scala

Lines changed: 21 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import java.util.UUID
2727
import org.powerapi.core.{Message, MessageBus, Channel}
2828
import org.powerapi.core.ClockChannel.ClockTick
2929
import org.powerapi.core.power._
30-
import org.powerapi.core.target.{intToProcess, Target}
30+
import org.powerapi.core.target.Target
3131
import scala.concurrent.duration.DurationInt
3232

3333
/**
@@ -41,18 +41,12 @@ object PowerChannel extends Channel {
4141

4242
type M = PowerReport
4343

44-
case object PowerReport
45-
4644
/**
4745
* Base trait for each power report
4846
*/
4947
trait PowerReport extends Message {
5048
def muid: UUID
51-
def target: Target
52-
def power: Power
53-
def device: String
5449
def tick: ClockTick
55-
override def toString() = s"timestamp=${tick.timestamp};target=$target;device=$device;value=$power"
5650
}
5751

5852
/**
@@ -73,46 +67,37 @@ object PowerChannel extends Channel {
7367
tick: ClockTick) extends PowerReport
7468

7569
/**
76-
* Used to represent an aggregated PowerReport.
77-
*
78-
* @param muid: monitor unique identifier (MUID), which is at the origin of the report flow.
79-
* @param aggFunction: aggregate power estimation for a specific sample of power reports.
70+
* AggregatePowerReport is represented as a dedicated type of message.
8071
*/
81-
case class AggregateReport(muid: UUID, aggFunction: Seq[Power] => Power) extends PowerReport {
82-
private val values: collection.mutable.Buffer[Power] = collection.mutable.Buffer.empty
83-
private lazy val agg = aggFunction(values.seq)
84-
private var lastPowerReport: PowerReport = RawPowerReport(aggPowerReportTopic(muid),
85-
muid,
86-
-1,
87-
0.W,
88-
"none",
89-
ClockTick("none", 0.milliseconds))
72+
case class AggregatePowerReport(muid: UUID, aggFunction: Seq[Power] => Power) extends PowerReport {
73+
private val reports = collection.mutable.Buffer[RawPowerReport]()
74+
private lazy val agg = aggFunction(for(report <- reports) yield report.power)
9075

91-
def size: Int = values.size
92-
def +=(value: PowerReport):AggregateReport = {
93-
values append value.power
94-
lastPowerReport = value
76+
def size: Int = reports.size
77+
78+
def +=(report: RawPowerReport): AggregatePowerReport = {
79+
reports += report
9580
this
9681
}
9782

9883
val topic: String = aggPowerReportTopic(muid)
99-
def target: Target = lastPowerReport.target
84+
def targets: Set[Target] = (for(report <- reports) yield report.target).toSet
10085
def power: Power = agg
101-
def device: String = lastPowerReport.device
102-
def tick: ClockTick = lastPowerReport.tick
86+
def devices: Set[String] = (for(report <- reports) yield report.device).toSet
87+
def tick: ClockTick = if(reports.nonEmpty) reports.last.tick else ClockTick("", 0.seconds)
10388
}
10489

10590
/**
106-
* Publish a power report in the event bus.
91+
* Publish a raw power report in the event bus.
10792
*/
108-
def publishPowerReport(muid: UUID, target: Target, power: Power, device: String, tick: ClockTick): MessageBus => Unit = {
109-
publish(RawPowerReport(powerReportMuid(muid), muid, target, power, device, tick))
93+
def publishRawPowerReport(muid: UUID, target: Target, power: Power, device: String, tick: ClockTick): MessageBus => Unit = {
94+
publish(RawPowerReport(rawPowerReportMuid(muid), muid, target, power, device, tick))
11095
}
11196

11297
/**
11398
* Publish an aggregated power report in the event bus.
11499
*/
115-
def render(aggR: AggregateReport): MessageBus => Unit = {
100+
def render(aggR: AggregatePowerReport): MessageBus => Unit = {
116101
publish(aggR)
117102
}
118103

@@ -130,18 +115,18 @@ object PowerChannel extends Channel {
130115
/**
131116
* External method used by the MonitorChild actors for interacting with the bus.
132117
*/
133-
def subscribePowerReport(muid: UUID): MessageBus => ActorRef => Unit = {
134-
subscribe(powerReportMuid(muid))
118+
def subscribeRawPowerReport(muid: UUID): MessageBus => ActorRef => Unit = {
119+
subscribe(rawPowerReportMuid(muid))
135120
}
136121

137-
def unsubscribePowerReport(muid: UUID): MessageBus => ActorRef => Unit = {
138-
unsubscribe(powerReportMuid(muid))
122+
def unsubscribeRawPowerReport(muid: UUID): MessageBus => ActorRef => Unit = {
123+
unsubscribe(rawPowerReportMuid(muid))
139124
}
140125

141126
/**
142127
* Use to format a MUID to an associated topic.
143128
*/
144-
private def powerReportMuid(muid: UUID): String = {
129+
private def rawPowerReportMuid(muid: UUID): String = {
145130
s"power:$muid"
146131
}
147132

powerapi-core/src/main/scala/org/powerapi/module/cpu/dvfs/CpuFormula.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ package org.powerapi.module.cpu.dvfs
2525
import org.powerapi.core.MessageBus
2626
import org.powerapi.core.power._
2727
import org.powerapi.module.cpu.UsageMetricsChannel.{subscribeDvfsUsageReport, UsageReport}
28-
import org.powerapi.module.PowerChannel.publishPowerReport
28+
import org.powerapi.module.PowerChannel.publishRawPowerReport
2929

3030
/**
3131
* CPU formula component giving CPU energy of a given process in computing the ratio between
@@ -71,6 +71,6 @@ class CpuFormula(eventBus: MessageBus, tdp: Double, tdpFactor: Double, frequenci
7171
case _ => 0d.W
7272
}
7373

74-
publishPowerReport(sensorReport.muid, sensorReport.target, p, "cpu", sensorReport.tick)(eventBus)
74+
publishRawPowerReport(sensorReport.muid, sensorReport.target, p, "cpu", sensorReport.tick)(eventBus)
7575
}
7676
}

powerapi-core/src/main/scala/org/powerapi/module/cpu/simple/CpuFormula.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.powerapi.core.{Configuration, MessageBus}
2626
import org.powerapi.core.power._
2727
import org.powerapi.module.FormulaComponent
2828
import org.powerapi.module.cpu.UsageMetricsChannel.{subscribeSimpleUsageReport, UsageReport}
29-
import org.powerapi.module.PowerChannel.publishPowerReport
29+
import org.powerapi.module.PowerChannel.publishRawPowerReport
3030

3131

3232
/**
@@ -45,6 +45,6 @@ class CpuFormula(eventBus: MessageBus, tdp: Double, tdpFactor: Double) extends F
4545

4646
def compute(sensorReport: UsageReport): Unit = {
4747
lazy val power = ((tdp * tdpFactor) * sensorReport.targetRatio.ratio).W
48-
publishPowerReport(sensorReport.muid, sensorReport.target, power, "cpu", sensorReport.tick)(eventBus)
48+
publishRawPowerReport(sensorReport.muid, sensorReport.target, power, "cpu", sensorReport.tick)(eventBus)
4949
}
5050
}

powerapi-core/src/main/scala/org/powerapi/module/libpfm/LibpfmCoreModule.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.powerapi.module.libpfm.cycles.{LibpfmCoreCyclesFormulaConfiguration,
2828
import scala.collection.BitSet
2929
import scala.concurrent.duration.FiniteDuration
3030

31-
class LibpfmCoreModule(timeout: Timeout, topology: Map[Int, Iterable[Int]], configuration: BitSet, events: List[String],
31+
class LibpfmCoreModule(timeout: Timeout, topology: Map[Int, Set[Int]], configuration: BitSet, events: Set[String],
3232
cyclesThreadName: String, cyclesRefName: String, formulae: Map[Double, List[Double]], samplingInterval: FiniteDuration) extends PowerModule {
3333

3434
lazy val underlyingSensorsClasses = Seq((classOf[LibpfmCoreSensor], Seq(timeout, topology, configuration, events)))
@@ -37,11 +37,11 @@ class LibpfmCoreModule(timeout: Timeout, topology: Map[Int, Iterable[Int]], conf
3737

3838
object LibpfmCoreModule extends LibpfmCoreSensorConfiguration with LibpfmCoreCyclesFormulaConfiguration {
3939
def apply(): LibpfmCoreModule = {
40-
new LibpfmCoreModule(timeout, topology, configuration, events, cyclesThreadName, cyclesThreadName, formulae, samplingInterval)
40+
new LibpfmCoreModule(timeout, topology, configuration, events, cyclesThreadName, cyclesRefName, formulae, samplingInterval)
4141
}
4242
}
4343

44-
class LibpfmCoreSensorModule(timeout: Timeout, topology: Map[Int, Iterable[Int]], configuration: BitSet, events: List[String]) extends PowerModule {
44+
class LibpfmCoreSensorModule(timeout: Timeout, topology: Map[Int, Set[Int]], configuration: BitSet, events: Set[String]) extends PowerModule {
4545
lazy val underlyingSensorsClasses = Seq((classOf[LibpfmCoreSensor], Seq(timeout, topology, configuration, events)))
4646
lazy val underlyingFormulaeClasses = Seq()
4747
}
@@ -51,7 +51,7 @@ object LibpfmCoreSensorModule extends LibpfmCoreSensorConfiguration {
5151
new LibpfmCoreSensorModule(timeout, topology, configuration, events)
5252
}
5353

54-
def apply(events: List[String]): LibpfmCoreSensorModule = {
54+
def apply(events: Set[String]): LibpfmCoreSensorModule = {
5555
new LibpfmCoreSensorModule(timeout, topology, configuration, events)
5656
}
5757
}

powerapi-core/src/main/scala/org/powerapi/module/libpfm/LibpfmCoreProcessModule.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.powerapi.module.libpfm.cycles.{LibpfmCoreCyclesFormulaConfiguration,
2929
import scala.collection.BitSet
3030
import scala.concurrent.duration.FiniteDuration
3131

32-
class LibpfmCoreProcessModule(timeout: Timeout, topology: Map[Int, Iterable[Int]], configuration: BitSet, events: List[String], inDepth: Boolean,
32+
class LibpfmCoreProcessModule(timeout: Timeout, topology: Map[Int, Set[Int]], configuration: BitSet, events: Set[String], inDepth: Boolean,
3333
cyclesThreadName: String, cyclesRefName: String, formulae: Map[Double, List[Double]], samplingInterval: FiniteDuration) extends PowerModule {
3434

3535
lazy val underlyingSensorsClasses = Seq((classOf[LibpfmCoreProcessSensor], Seq(new LinuxHelper, timeout, topology, configuration, events, inDepth)))

powerapi-core/src/main/scala/org/powerapi/module/libpfm/LibpfmCoreProcessSensor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import scala.reflect.ClassTag
4141
*
4242
* @author <a href="mailto:maxime.colmant@gmail.com">Maxime Colmant</a>
4343
*/
44-
class LibpfmCoreProcessSensor(eventBus: MessageBus, osHelper: OSHelper, timeout: Timeout, topology: Map[Int, Iterable[Int]], configuration: BitSet, events: List[String], inDepth: Boolean) extends APIComponent {
44+
class LibpfmCoreProcessSensor(eventBus: MessageBus, osHelper: OSHelper, timeout: Timeout, topology: Map[Int, Set[Int]], configuration: BitSet, events: Set[String], inDepth: Boolean) extends APIComponent {
4545
val processClaz = implicitly[ClassTag[Process]].runtimeClass
4646
val appClaz = implicitly[ClassTag[Application]].runtimeClass
4747

0 commit comments

Comments
 (0)