Skip to content
This repository was archived by the owner on Mar 29, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions amqptest/server/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,33 @@ func (d *DirectExchange) route(route string, delivery *Delivery) error {
return fmt.Errorf("No bindings to route: %s", route)

}

// FanoutExchange routes messages to all of the queues that are
// bound to it and the routing key is ignored.
type FanoutExchange struct {
name string
bindings map[string]*Queue
}

func NewFanoutExchange(name string) *FanoutExchange {
return &FanoutExchange{
name: name,
bindings: make(map[string]*Queue),
}
}

func (t *FanoutExchange) addBinding(route string, q *Queue) {
t.bindings[q.name] = q
}

func (t *FanoutExchange) delBinding(route string) {
delete(t.bindings, route)
}

func (t *FanoutExchange) route(route string, d *Delivery) error {
for _, q := range t.bindings {
q.data <- d
}

return nil
}
3 changes: 3 additions & 0 deletions amqptest/server/vhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (v *VHost) createDefaultExchanges() {
exchs := make(map[string]Exchange)
exchs["amq.topic"] = NewTopicExchange("amq.topic")
exchs["amq.direct"] = NewDirectExchange("amq.direct")
exchs["amq.fanout"] = NewFanoutExchange("amq.fanout")
exchs["topic"] = NewTopicExchange("topic")
exchs["direct"] = NewDirectExchange("direct")
exchs[""] = NewDirectExchange("amq.direct")
Expand Down Expand Up @@ -84,6 +85,8 @@ func (v *VHost) exchangeDeclare(name, kind string, passive bool, opt wabbit.Opti
v.exchanges[name] = NewTopicExchange(name)
case "direct":
v.exchanges[name] = NewDirectExchange(name)
case "fanout":
v.exchanges[name] = NewFanoutExchange(name)
default:
return fmt.Errorf("Invalid exchange type: %s", kind)
}
Expand Down
6 changes: 3 additions & 3 deletions amqptest/server/vhost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ func TestVHostWithDefaults(t *testing.T) {
t.Errorf("Invalid broker name: %s", vh.name)
}

if len(vh.exchanges) < 5 || vh.exchanges[""] == nil ||
vh.exchanges["amq.direct"] == nil || vh.exchanges["amq.topic"] == nil {
if len(vh.exchanges) < 6 || vh.exchanges[""] == nil ||
vh.exchanges["amq.direct"] == nil || vh.exchanges["amq.topic"] == nil || vh.exchanges["amq.fanout"] == nil {
t.Errorf("VHost created without the required exchanges specified by amqp 0.9.1")
}
}
Expand Down Expand Up @@ -70,7 +70,7 @@ func TestBasicExchangeDeclare(t *testing.T) {
return
}

if len(vh.exchanges) != 6 {
if len(vh.exchanges) != 7 {
t.Errorf("Exchange not properly created: %d", len(vh.exchanges))
return
}
Expand Down