diff --git a/amqptest/server/exchange.go b/amqptest/server/exchange.go index 2683549..d62741b 100644 --- a/amqptest/server/exchange.go +++ b/amqptest/server/exchange.go @@ -92,3 +92,33 @@ func (d *DirectExchange) route(route string, delivery *Delivery) error { return fmt.Errorf("No bindings to route: %s", route) } + +// FanoutExchange routes messages to all of the queues that are +// bound to it and the routing key is ignored. +type FanoutExchange struct { + name string + bindings map[string]*Queue +} + +func NewFanoutExchange(name string) *FanoutExchange { + return &FanoutExchange{ + name: name, + bindings: make(map[string]*Queue), + } +} + +func (t *FanoutExchange) addBinding(route string, q *Queue) { + t.bindings[q.name] = q +} + +func (t *FanoutExchange) delBinding(route string) { + delete(t.bindings, route) +} + +func (t *FanoutExchange) route(route string, d *Delivery) error { + for _, q := range t.bindings { + q.data <- d + } + + return nil +} diff --git a/amqptest/server/vhost.go b/amqptest/server/vhost.go index ba95ef5..62482e3 100644 --- a/amqptest/server/vhost.go +++ b/amqptest/server/vhost.go @@ -31,6 +31,7 @@ func (v *VHost) createDefaultExchanges() { exchs := make(map[string]Exchange) exchs["amq.topic"] = NewTopicExchange("amq.topic") exchs["amq.direct"] = NewDirectExchange("amq.direct") + exchs["amq.fanout"] = NewFanoutExchange("amq.fanout") exchs["topic"] = NewTopicExchange("topic") exchs["direct"] = NewDirectExchange("direct") exchs[""] = NewDirectExchange("amq.direct") @@ -84,6 +85,8 @@ func (v *VHost) exchangeDeclare(name, kind string, passive bool, opt wabbit.Opti v.exchanges[name] = NewTopicExchange(name) case "direct": v.exchanges[name] = NewDirectExchange(name) + case "fanout": + v.exchanges[name] = NewFanoutExchange(name) default: return fmt.Errorf("Invalid exchange type: %s", kind) } diff --git a/amqptest/server/vhost_test.go b/amqptest/server/vhost_test.go index 7fae4e4..fa39f84 100644 --- a/amqptest/server/vhost_test.go +++ b/amqptest/server/vhost_test.go @@ -13,8 +13,8 @@ func TestVHostWithDefaults(t *testing.T) { t.Errorf("Invalid broker name: %s", vh.name) } - if len(vh.exchanges) < 5 || vh.exchanges[""] == nil || - vh.exchanges["amq.direct"] == nil || vh.exchanges["amq.topic"] == nil { + if len(vh.exchanges) < 6 || vh.exchanges[""] == nil || + vh.exchanges["amq.direct"] == nil || vh.exchanges["amq.topic"] == nil || vh.exchanges["amq.fanout"] == nil { t.Errorf("VHost created without the required exchanges specified by amqp 0.9.1") } } @@ -70,7 +70,7 @@ func TestBasicExchangeDeclare(t *testing.T) { return } - if len(vh.exchanges) != 6 { + if len(vh.exchanges) != 7 { t.Errorf("Exchange not properly created: %d", len(vh.exchanges)) return }