Skip to content

Latest commit

 

History

History
402 lines (277 loc) · 14.3 KB

File metadata and controls

402 lines (277 loc) · 14.3 KB

Anomaly detection algorithm interface and structures

This software is pre-production and should not be deployed to production servers.

  • Counter type of metric - is a monotonically increasing counter,
  • Gauge type of metric - value that can arbitrarily go up and down,
  • Task represents single Mesos task, that matches single currently running Aurora job instance and is running in single Linux container (using Mesos containerizer) or single Kubernetes pod

Check Prometheus metrics types for further reference

We support both Mesos and Kubernetes.

As a reference configuration file use one located in configs directory.

You can configure system to detect and report anomalies in following way in config.yaml:

runner: !DetectionRunner
  measurement_runner: !MeasurementRunner
    node: !MesosNode
  detector: !ExampleAnomalyDetector      # implementation of abstract AnomalyDetector class
    example_config_int: 1
    example_config_list: [1, 4]
  ...

where ExampleAnomalyDetector class must implement following interface:

class AnomalyDetector(ABC):

    @abstractmethod
    def detect(self,
               platform: Platform,
               tasks_data: TasksData
    ) -> (List[Anomaly], List[Metric]):
        ...

Example implementation:

class ExampleAnomalyDetector:

    def __init__(self, example_config_int, example_config_list):
        self.example_config_list = example_config_list
        ...

    def detect(self, platform: Platform,
               tasks_data: TasksData
                     ) -> (List[Anomaly], List[Metric]):
        return [], []

All config values provided under detector key in configuration are treated as simple types (including lists and dict), and are passed to constructor as keywords parameters.

To be able to use externally provided implementation it is necessary to register external component using command line like this:

wca --config some_mesos_config.yaml --component external_package.external_module:ContentionAnomalyDetector --level debug

After that you can instantiate this class using configuration file.

In example above ContentionAnomalyDetector implements all required methods of AnomalyDetector.

AnomalyDetector defines interface where Platform class represents capacity and utilization information covering whole system and TasksMeasurements class represents individual measurements for specific Mesos tasks running on this node. TasksResources class represents initial resource assigment as defined in orchestration software API (e.g. Mesos/Aurora).

Implementation of AnomalyDetector is responsible for returning new immutable instances of Anomaly and in specific case of "resource contention" should return subclass called ContentionAnomaly with extended context. Additionally for debugging purposes can return any metrics that will be stored in persistent storage (e.g. Kafka).

detect function is called in periodical manner depending on delay specified by configuration file.

Note, that most of measurements provided to detection algorithm are raw type of counters (monotonically increasing) and AnomalyDetector is responsible to calculate derivative (difference) based on delay to calculate rate of increase (e.g. instructions per second, bytes per second and so on).

# Helper types
CpuId = int  # 0-based logical processor number (matches the value of "processor" in /proc/cpuinfo)

@dataclass
class Platform:

    # Topology:
    sockets: int  # number of sockets
    cores: int    # number of physical cores in total (sum over all sockets)
    cpus: int     # logical processors equal to the output of "nproc" Linux command

    # Utilization (usage):
    cpus_usage: Dict[CpuId, int]     # counter like, sum of all modes based on /proc/stat "CPU line" with 10ms resolution expressed in [ms]
    total_memory_used: int      # [bytes] based on /proc/meminfo (gauge like) difference between MemTotal and MemAvail (or MemFree)

    timestamp: float # [unit timestamp] just after all necessary data was collected for platform object (time.time())

This is example of how to Platform instance looks like on two sockets "Intel(R) Xeon(R) CPU E5-2660 v4" with 377 GB RAM system:

platform = Platform(

    # Topology
    sockets = 2,
    cores = 28,
    cpus = 56,

    # Additional information about platform.
    cpu_model = "Intel(R) Xeon(R) CPU E5-2660 v4"

    # Utilization
    cpus_usage = {
        0: 4412451,
        1: 4747332,
        ...,
        7: 3469724,
    },
    total_memory_used = 6759489536,  # in bytes (about 6GB)
)
MetricValue = Union[float, int]

class MetricName(str, Enum):
    INSTRUCTIONS = 'instructions'
    CYCLES = 'cycles'
    CACHE_MISSES = 'cache_misses'
    CACHE_REFERENCES = 'cache_references'
    CPU_USAGE_PER_CPU = 'cpu_usage_per_cpu'
    CPU_USAGE_PER_TASK = 'cpu_usage_per_task'
    MEM_BW = 'memory_bandwidth'
    LLC_OCCUPANCY = 'llc_occupancy'
    MEM_USAGE = 'memory_usage'
    MEMSTALL = 'stalls_mem_load'
    SCALING_FACTOR_AVG = 'scaling_factor_avg'
    SCALING_FACTOR_MAX = 'scaling_factor_max'

class MetricType(Enum, str):
    GAUGE = 'gauge'      # arbitrary value (can go up and down)
    COUNTER = 'counter'  # monotonically increasing counter

@dataclass
class Metric:
    name: Union[str, MetricName]
    value: MetricValue
    labels: Dict[str, str]
    type: MetricType = None
    help: str = None

Measurements = Dict[MetricName, MetricValue]

TasksData is a mapping from task_id to TaskData. TaskData inherits from Task and provides measurements and allocations mappings.

TaskMeasurements = Measurements
TaskAllocations = Dict[str, str]


@dataclass
class TaskData(Task):
    measurements: TaskMeasurements = field(default_factory=lambda: {})
    allocations: Optional[TaskAllocations] = None


TasksData = Dict[TaskId, TaskData]

# Example:
tasks_data['user-devel-cassandra-0'] = TaskData(
        name='cassandra-0',
        task_id='user-devel-cassandra-0',
        cgroup_path='/user-devel-cassandra-0',
        subcgroups_path=[],
        labels={'foo': 'bar'},
        resources={
            'cpus': 8.0,
            'mems': 2000.0,
            'disk': 8000.0},
        measurements={
            MetricName.INSTRUCTIONS: 12343141,
            MetricName.CYCLES: 2310124321,
            MetricName.LLC_MISSES: 21212312,
            MetricName.CPU_USAGE: 21212312,
            MetricName.MEM_BW: 21212312,})

# and example call of detect function
anomalies, detection_metrics = anomaly_detector.detect(platform, tasks_data)

Anomaly represents instance of abnormal situation. Every anomaly derives unique identifier to represents combinations of tasks and holds context where and when (timestamp) this situation occurred.

In special case where tasks ids aren't provided the uuid is empty.

The context depends on type of anomaly. The only supported subtype is ContentionAnomaly type with the following structure.

class ContendedResource(str, Enum):
    MEMORY_BW = 'memory bandwidth'
    LLC = 'cache'
    CPUS = 'cpus'
    TDP = 'thermal design power'
    UNKN = 'unknown resource'


@dataclass
class ContentionAnomaly(Anomaly):
    resource: ContendedResource
    contended_task_id: TaskId
    contending_task_ids: List[TaskId]

    # List of metrics describing context of contention
    metrics: List[Metric]

    # Type of anomaly (will be used to label anomaly metrics)
    anomaly_type = 'contention'

Example detection function returning one instance of Anomaly:

def detect(platform, tasks_measurements, tasks_resources):

    anomalies = []

    all_tasks_ids = tasks_measurements.keys()

    if platform.total_memory_used > 0.8*platform.total_memory:
        anomalies.append(
            ContentionAnomaly(
                contended_task_id = all_tasks_ids[0],
                contending_task_ids = all_tasks_ids[1:],
                resource = ContendedResource.MEMORY_BW,
                metrics = [Metric(name="memory_usage_treshold", value=0.8*platform.total_memory type="gauge")]
            )
        )

    return anomalies

All stored information is labeled with platform information such as: host, number of cores, number of sockets and so on. Additionally single anomaly object is serialized as multiple metrics that can be grouped by anomaly.uuid field to find correlated tasks. If anomaly objects contains any additional related metrics, they will be marked with additional label type="anomaly" and uuid pointing to original contention instance.

Example message stored in Kafka using Prometheus exposition format:

# HELP instructions The total number of instructions executed by task.
# TYPE instructions counter
instructions{task_id="user-devel-memacache-0-sasd-cccc",sockets="2",cores="8",host="igk-016"} 123123123 1395066363000
instructions{task_id="user-devel-cassandra-2-aaaa-bbbb",sockets="2",cores="8",host="igk-016"} 123123123 1395066363000
...

# HELP cycles The total number of cycles executed by task.
# TYPE cycles counter
cycles{task_id="user-devel-memacache-0-sasd-cccc",sockets="2",cores="8",host="igk-016"} 329331431 1395066363000
cycles{task_id="user-devel-cassandra-2-aaaa-bbbb",sockets="2",cores="8",host="igk-016"} 329331431 1395066363000
...

# HELP llc_misses The total number of instructions executed by task.
# TYPE llc_misses counter
llc_misses{task_id="user-devel-memacache-0-sasd-cccc",sockets="2",cores="8",host="igk-016"} 1329331431 1395066363000
llc_misses{task_id="user-devel-cassandra-2-aaaa-bbbb",sockets="2",cores="8",host="igk-016"} 3293314311 1395066363000
...


# HELP platform_total_memory_usage_bytes The total usage of RAM in bytes.
# TYPE platform_total_memory_usage_bytes gauge
platform_total_memory_usage_bytes{host="igk-016"} 6759489536 1395066363000

# HELP platform_llc_misses Number of misses system-wide.
# TYPE platform_llc_misses counter
platform_llc_misses{host="igk-016"} 1231231231 1395066363000

# HELP platform_core_usage_ms Number of ms that given cpu was running (in all modes: kernel, user, irq handling and so on...)
# TYPE platform_core_usage_ms counter
platform_core_usage_ms{host="igk-016",cpu="0"} 4412451 1395066363000
platform_core_usage_ms{host="igk-016",cpu="1"} 4747332 1395066363000

# HELP platform_memory_bw Number of bytes transfered to and from socket and memory.
# TYPE platform_memory_bw counter
platform_memory_bw{host="igk-016",socket="0"} 23525923348480 1395066363000
platform_memory_bw{host="igk-016",socket="1"} 13237177459112 1395066363000



# HELP anomaly The total number of anomalies detected on host.
# TYPE anomaly counter
anomaly{type="contention", contended_task_id="task1", contending_task_id="task2",  resource="memory bandwidth", uuid="1234"} 1
anomaly{type="contention", contended_task_id="task1", contending_task_id="task3", resource="memory bandwidth", uuid="1234"} 1
memory_usage_treshold{contended_task_id="task1", uuid="1234", type="anomaly"} 10

Note that not all labels comments where showed for readability.

A helper functionality of WCA agent is to generate additional labels for a task based on any data contained in that task object (e.g. based on the task other label value). That new labels will be attached to tasks metrics and stored.

For that purpose a field task_label_generators can be defined in classes derived from MeasurementsRunner. It is a dictionary, where each key defines a name of new label, and value for that key constitutes an object of a class derived from TaskLabelGenerator.

In the example below the class used to generate label is TaskLabelRegexGenerator. TaskLabelRegexGenerator uses re.sub function to extract needed information from another label value (to see list of available task labels please read Task's metrics labels for Mesos and Task's metrics labels for Kubernetes).

In the example below if label task_name (source parameter) has value root/staging/my_important_task new labels will be attached to the task metrics:

  • application with value "my_important_task",
  • application_version_name with empty string.
runner: !DetectionRunner
  ...
  measurement_runner: !MeasurementRunner
    task_label_generators:
      application: !TaskLabelRegexGenerator
        pattern: '.*\/.*\/(.*)'
        repl: '\1'  # first match group
        source: 'task_name' #default
      application_version_name: !TaskLabelRegexGenerator
        pattern: '.*'
        repl: '' # empty
        source: 'task_name' #default
  ...