Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
matrix:
os: [ "ubuntu-20.04" ]
# python-version: [ "3.6.2", "3.7", "3.8", "3.9" ]
python-version: [ "3.8" ]
python-version: [ "3.10" ]
runs-on: self-hosted
timeout-minutes: 40
steps:
Expand Down Expand Up @@ -138,17 +138,17 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
REPOSITORY_USERNAME: ${{ secrets.PYPI_USERNAME }}
REPOSITORY_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
#----------------------------------------------
# Download coverage and publish to CodeClimate
#----------------------------------------------
- name: Download artifacts
uses: actions/download-artifact@v4
with:
name: 'coverage'
path: .
- name: Publish code coverage to CodeClimate
uses: paambaati/codeclimate-action@v2.7.5
env:
CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }}
with:
debug: true
# #----------------------------------------------
# # Download coverage and publish to CodeClimate
# #----------------------------------------------
# - name: Download artifacts
# uses: actions/download-artifact@v4
# with:
# name: 'coverage'
# path: .
# - name: Publish code coverage to CodeClimate
# uses: paambaati/codeclimate-action@v2.7.5
# env:
# CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }}
# with:
# debug: true
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

ARG PARENT_IMAGE
FROM $PARENT_IMAGE
ARG PYTORCH_DEPS=cpuonly
ARG POETRY_VERSION=1.0.0
ARG PYTORCH_DEPS=cu113
ARG POETRY_VERSION=1.5.1
ARG ADD_SB=False
ARG PYTHON_VERSION=3.8

Expand Down Expand Up @@ -33,7 +33,6 @@ RUN curl -o ~/miniconda.sh https://repo.anaconda.com/miniconda/Miniconda3-latest
~/miniconda.sh -b -p /opt/conda && \
rm ~/miniconda.sh && \
/opt/conda/bin/conda install -y python=$PYTHON_VERSION && \
if [ ${ADD_SB} ] ; then /opt/conda/bin/conda install -y pytorch $PYTORCH_DEPS -c pytorch; fi && \
/opt/conda/bin/conda clean -ya
ENV PATH /opt/conda/bin:$PATH

Expand Down Expand Up @@ -64,7 +63,8 @@ RUN echo "source /opt/ros/noetic/setup.bash" >> /root/.bashrc
RUN echo "export ROSLAUNCH_SSH_UNKNOWN=1" >> /root/.bashrc

# Install eagerx-tutorials if ADD_SB, this will also install stable-baselines3
RUN if [ ${ADD_SB} ] ; then pip install eagerx-tutorials; fi
RUN if [ ${ADD_SB} ] ; then pip install eagerx-tutorials stable-baselines3==2.0.0; fi
RUN if [ ${ADD_SB} ] ; then pip3 install --upgrade torch==1.12 --extra-index-url https://download.pytorch.org/whl/${PYTORCH_DEPS}; fi

# Use headless opencv
RUN pip uninstall -y opencv-python && pip install opencv-python-headless && rm -rf $HOME/.cache/pip
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
[![codestyle](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Documentation Status](https://readthedocs.org/projects/eagerx/badge/?version=master)](https://eagerx.readthedocs.io/en/master/?badge=master)
[![Continuous Integration](https://github.com/eager-dev/eagerx/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/eager-dev/eagerx/actions/workflows/ci.yml)
[![Maintainability](https://api.codeclimate.com/v1/badges/3146dce3dd4c3537834c/maintainability)](https://codeclimate.com/github/eager-dev/eagerx/maintainability)
[![Test Coverage](https://api.codeclimate.com/v1/badges/3146dce3dd4c3537834c/test_coverage)](https://codeclimate.com/github/eager-dev/eagerx/test_coverage)
[![Test Coverage](coverage.svg)](https://github.com/eager-dev/eagerx/actions/workflows/ci.yml)
<!-- [![Maintainability](https://api.codeclimate.com/v1/badges/3146dce3dd4c3537834c/maintainability)](https://codeclimate.com/github/eager-dev/eagerx/maintainability)
[![Test Coverage](https://api.codeclimate.com/v1/badges/3146dce3dd4c3537834c/test_coverage)](https://codeclimate.com/github/eager-dev/eagerx/test_coverage) -->


What is EAGERx
Expand Down
26 changes: 26 additions & 0 deletions coverage.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 4 additions & 5 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,7 @@ def setup(app):
# -- Extension configuration -------------------------------------------------

# Example configuration for intersphinx: refer to the Python standard library.
# intersphinx_mapping = {
# 'python': ('https://docs.python.org/3/', None),
# 'numpy': ('http://docs.scipy.org/doc/numpy/', None),
# 'torch': ('http://pytorch.org/docs/master/', None),
# }
intersphinx_mapping = {
'python': ('https://docs.python.org/3/', None),
'numpy': ('http://docs.scipy.org/doc/numpy/', None),
}
259 changes: 259 additions & 0 deletions eagerx/backends/ray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
import copy
import numpy as np
import typing
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count
from eagerx.utils.utils import Header
from eagerx.core.pubsub import Publisher, Subscriber, ShutdownService
import eagerx
from eagerx.core.constants import (
WARN,
BackendException,
Unspecified,
)


# A singleton that is used to check if an argument was specified.
_unspecified = Unspecified()


def merge(a: typing.Dict, b: typing.Dict, path=None):
"""merges b into a"""
if path is None:
path = []
for key in b:
if key in a:
if isinstance(a[key], dict) and isinstance(b[key], dict):
merge(a[key], b[key], path + [str(key)])
elif a[key] == b[key]:
pass # same leaf value
else:
a[key] = b[key]
else:
a[key] = b[key]
return a


def split(a: typing.Any):
if isinstance(a, dict):
for key in list(a):
value = a.pop(key)
value = split(value)
keys = [k for k in key.split("/") if len(k) > 0]
for kk in reversed(keys[1:]):
value = {kk: value}
if keys[0] not in a:
a[keys[0]] = value
else:
merge(a[keys[0]], value)
return a


class Ray(eagerx.Backend):

BACKEND = "RAY"
DISTRIBUTED_SUPPORT = True
MULTIPROCESSING_SUPPORT = True
COLAB_SUPPORT = False

MIN_THREADS = 10

@classmethod
def make(cls, log_level=WARN) -> eagerx.specs.BackendSpec:
spec = cls.get_specification()
spec.config.log_level = log_level if isinstance(log_level, str) else eagerx.get_log_level()
return spec

def initialize(self):
self._backend = self
self._pserver = dict()
self._topics = dict()
self._cond = threading.Condition()
self._tpool = ThreadPoolExecutor(max_workers=max(self.MIN_THREADS, cpu_count()))

def Publisher(self, address: str, dtype: str):
return _Publisher(self._backend, self._tpool, self._topics, self._cond, address, dtype)

def Subscriber(self, address: str, dtype: str, callback, header: bool = False, callback_args=tuple()):
return _Subscriber(
self._backend, self._topics, self._cond, address, dtype, callback, header, callback_args=callback_args
)

def register_environment(self, name: str, force_start: bool, fn: typing.Callable):
return _ShutdownService()

def delete_param(self, param: str, level: int = 1) -> None:
try:
keys = [k for k in param.split("/") if len(k) > 0]
val = self._pserver
for key in keys[:-1]:
val = val[key]
val.pop(keys[-1])
self.loginfo(f'Parameters under namespace "{param}" deleted.')
except KeyError as e:
if level == 0:
raise BackendException(e)
elif level == 1:
self.logwarn(e)
else:
pass

def upload_params(self, ns: str, values: typing.Dict, verbose: bool = False) -> None:
values = copy.deepcopy(values)
ns = [k for k in ns.split("/") if len(k) > 0]
ns_values = split(values)
for k in ns:
ns_values = {k: ns_values}
merge(self._pserver, ns_values)

def get_param(self, name: str, default: typing.Any = _unspecified):
try:
keys = [k for k in name.split("/") if len(k) > 0]
val = self._pserver
for key in keys:
val = val[key]
return val
except KeyError as e:
if not isinstance(default, Unspecified):
return default
else:
raise BackendException(e)

def spin(self):
raise NotImplementedError(f"Not implemented, because backend '{self.BACKEND}' does not support multiprocessing.")

def shutdown(self) -> None:
if not self._has_shutdown:
self.logdebug("Backend.shutdown() called.")
self._has_shutdown = True
self._tpool.shutdown(wait=True)


class _Publisher(Publisher):
def __init__(
self,
backend: eagerx.Backend,
tpool: ThreadPoolExecutor,
topics: typing.Dict,
cond: threading.Condition,
address: str,
dtype: str,
):
super().__init__(backend)
self._tpool = tpool
self._cond = cond
self._topics = topics
with self._cond:
if address not in topics:
self._topic = dict(pubs=0, subs=[], latched=None, dtype=dtype)
topics[address] = self._topic
else:
self._topic = topics[address]
assert self._topic["dtype"] == dtype, f"Dtypes do not match for topic {address}."

# Increase publisher count
self._topic["pubs"] += 1

self._address = address
self._dtype = dtype
self._name = f"{self._address}"
self._unregistered = False

def _publish(self, msg: typing.Union[float, bool, int, str, np.ndarray, np.number], header: Header) -> None:
if not self._unregistered:
# todo: check if dtype(msg) == self._dtype?
# Convert python native types to numpy arrays.
if isinstance(msg, float):
msg = np.array(msg, dtype="float32")
elif isinstance(msg, int) and not isinstance(msg, bool):
msg = np.array(msg, dtype="int64")

# Check if message complies with space
if not isinstance(msg, (np.ndarray, np.number, str, bool)):
self._backend.logerr(f"[publisher][{self._name}]: type(recv)={type(msg)}")
time.sleep(10000000)

# with self._cond: # todo: needed?
for cb in self._topic["subs"]:
self._tpool.submit(cb, msg, header)
self._topic["latched"] = msg, header

def unregister(self) -> None:
if not self._unregistered:
with self._cond:
self._unregistered = True
assert self._topic["pubs"] > 0, "According to the counter, there should be no publishers left for this topic."
self._topic["pubs"] -= 1

# If no other subscribers or publishers, remove topic.
if len(self._topic["subs"]) == 0 and self._topic["pubs"] == 0:
self._topics.pop(self._address)


class _Subscriber(Subscriber):
def __init__(
self,
backend: eagerx.Backend,
topics: typing.Dict,
cond: threading.Condition,
address: str,
dtype: str,
callback,
header: bool,
callback_args=tuple(),
):
super().__init__(backend, header)
self._cond = cond
self._topics = topics
with self._cond:
if address not in topics:
self._topic = dict(pubs=0, subs=[], latched=None, dtype=dtype)
topics[address] = self._topic
latched = None
else:
self._topic = topics[address]
assert self._topic["dtype"] == dtype, f"Dtypes do not match for topic {address}."
latched = self._topic["latched"]

self._unregistered = False
self._topic["subs"].append(self.callback)
self._address = address
self._dtype = dtype
self._cb_wrapped = callback
self._cb_args = callback_args
self._name = f"{self._address}"

if latched is not None:
self._backend.logdebug(f"LATCHED: {self._address}")
self.callback(*latched) # todo: inside cond?

def callback(self, msg, header):
# todo: pass header to callback in publisher
# todo: pass header to wrapped callback if self._header
if not self._unregistered:
# todo: check if dtype(msg) == self._dtype?
if not isinstance(msg, (np.ndarray, np.number, str, bool)):
self._backend.logerr(f"[subscriber][{self._name}]: type(recv)={type(msg)}")
time.sleep(10000000)
self._cb_wrapped(msg, header, *self._cb_args) if self._header else self._cb_wrapped(msg, *self._cb_args)

def unregister(self) -> None:
if not self._unregistered:
with self._cond:
self._unregistered = True

self._topic["subs"] = [cb for cb in self._topic["subs"] if not id(cb) == id(self.callback)]

# If no other subscribers or publishers, remove topic.
if len(self._topic["subs"]) == 0 and self._topic["pubs"] == 0:
self._topics.pop(self._address)


class _ShutdownService(ShutdownService):
def __init__(self):
pass

def unregister(self):
pass
Loading
Loading