Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Wabbit - Go AMQP Mocking Library

[![GoDoc](https://godoc.org/github.com/NeowayLabs/wabbit?status.svg)](https://godoc.org/github.com/NeowayLabs/wabbit)
[![Go Report Card](https://goreportcard.com/badge/github.com/NeowayLabs/wabbit)](https://goreportcard.com/report/github.com/NeowayLabs/wabbit)
[![GoDoc](https://godoc.org/github.com/c-brooks/wabbit?status.svg)](https://godoc.org/github.com/c-brooks/wabbit)
[![Go Report Card](https://goreportcard.com/badge/github.com/c-brooks/wabbit)](https://goreportcard.com/report/github.com/c-brooks/wabbit)

> Elmer Fudd: Shhh. Be vewy vewy quiet, I'm hunting wabbits

Expand Down Expand Up @@ -35,9 +35,9 @@ package main

import (
"testing"
"github.com/NeowayLabs/wabbit/amqptest"
"github.com/NeowayLabs/wabbit/amqptest/server"
"github.com/NeowayLabs/wabbit/amqp"
"github.com/c-brooks/wabbit/amqptest"
"github.com/c-brooks/wabbit/amqptest/server"
"github.com/c-brooks/wabbit/amqp"
)


Expand Down
4 changes: 2 additions & 2 deletions _examples/publisher/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package main

import (
"flag"
"github.com/NeowayLabs/wabbit"
"github.com/NeowayLabs/wabbit/amqp"
"github.com/c-brooks/wabbit"
"github.com/c-brooks/wabbit/amqp"
"log"
)

Expand Down
4 changes: 2 additions & 2 deletions _examples/publisher/worker_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package main

import (
"flag"
"github.com/NeowayLabs/wabbit"
"github.com/NeowayLabs/wabbit/amqp"
"github.com/c-brooks/wabbit"
"github.com/c-brooks/wabbit/amqp"
"log"
)

Expand Down
4 changes: 2 additions & 2 deletions _examples/simple-consumer/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"log"
"time"

"github.com/NeowayLabs/wabbit"
"github.com/NeowayLabs/wabbit/amqp"
"github.com/c-brooks/wabbit"
"github.com/c-brooks/wabbit/amqp"
)

var (
Expand Down
1 change: 1 addition & 0 deletions amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type (
Cancel(consumer string, noWait bool) error
ExchangeDeclare(name, kind string, opt Option) error
ExchangeDeclarePassive(name, kind string, opt Option) error
QueueInspect(name string) (Queue, error)
QueueDeclare(name string, args Option) (Queue, error)
QueueDeclarePassive(name string, args Option) (Queue, error)
QueueDelete(name string, args Option) (int, error)
Expand Down
9 changes: 7 additions & 2 deletions amqp/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package amqp
import (
"errors"

"github.com/NeowayLabs/wabbit"
"github.com/NeowayLabs/wabbit/utils"
"github.com/c-brooks/wabbit"
"github.com/c-brooks/wabbit/utils"
"github.com/streadway/amqp"
)

Expand Down Expand Up @@ -174,6 +174,11 @@ func (ch *Channel) exchangeDeclare(name, kind string, passive bool, opt wabbit.O
return ch.Channel.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args)
}

func (ch *Channel) QueueInspect(name string) (wabbit.Queue, error) {
q, err := ch.Channel.QueueInspect(name)
return &Queue{&q}, err
}

func (ch *Channel) QueueUnbind(name, route, exchange string, _ wabbit.Option) error {
return ch.Channel.QueueUnbind(name, route, exchange, nil)
}
Expand Down
2 changes: 1 addition & 1 deletion amqp/delivery.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package amqp

import (
"github.com/NeowayLabs/wabbit"
"github.com/c-brooks/wabbit"
"github.com/streadway/amqp"
)

Expand Down
4 changes: 2 additions & 2 deletions amqp/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package amqp
import (
"time"

"github.com/NeowayLabs/wabbit"
"github.com/NeowayLabs/wabbit/utils"
"github.com/c-brooks/wabbit"
"github.com/c-brooks/wabbit/utils"
"github.com/streadway/amqp"
)

Expand Down
4 changes: 2 additions & 2 deletions amqp/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"testing"
"time"

"github.com/NeowayLabs/wabbit"
"github.com/fsouza/go-dockerclient"
"github.com/c-brooks/wabbit"
docker "github.com/fsouza/go-dockerclient"
"github.com/tiago4orion/conjure"
)

Expand Down
2 changes: 1 addition & 1 deletion amqp/publisher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package amqp

import "github.com/NeowayLabs/wabbit"
import "github.com/c-brooks/wabbit"

type Publisher struct {
conn wabbit.Conn
Expand Down
6 changes: 3 additions & 3 deletions amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"testing"
"time"

"github.com/NeowayLabs/wabbit"
"github.com/NeowayLabs/wabbit/amqptest"
"github.com/NeowayLabs/wabbit/amqptest/server"
"github.com/c-brooks/wabbit"
"github.com/c-brooks/wabbit/amqptest"
"github.com/c-brooks/wabbit/amqptest/server"
)

func TestBasicUsage(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions amqptest/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"sync"
"time"

"github.com/NeowayLabs/wabbit"
"github.com/NeowayLabs/wabbit/amqptest/server"
"github.com/NeowayLabs/wabbit/utils"
"github.com/c-brooks/wabbit"
"github.com/c-brooks/wabbit/amqptest/server"
"github.com/c-brooks/wabbit/utils"
"github.com/pborman/uuid"
)

Expand Down
4 changes: 2 additions & 2 deletions amqptest/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"testing"
"time"

"github.com/NeowayLabs/wabbit"
"github.com/NeowayLabs/wabbit/amqptest/server"
"github.com/c-brooks/wabbit"
"github.com/c-brooks/wabbit/amqptest/server"
"github.com/pborman/uuid"
)

Expand Down
2 changes: 1 addition & 1 deletion amqptest/publisher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package amqptest

import "github.com/NeowayLabs/wabbit"
import "github.com/c-brooks/wabbit"

type Publisher struct {
channel wabbit.Publisher
Expand Down
4 changes: 2 additions & 2 deletions amqptest/server/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"sync"
"sync/atomic"

"github.com/NeowayLabs/wabbit"
"github.com/NeowayLabs/wabbit/utils"
"github.com/c-brooks/wabbit"
"github.com/c-brooks/wabbit/utils"
"github.com/streadway/amqp"
)

Expand Down
2 changes: 1 addition & 1 deletion amqptest/server/delivery.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package server

import "github.com/NeowayLabs/wabbit"
import "github.com/c-brooks/wabbit"

type (
// Delivery is an interface to delivered messages
Expand Down
30 changes: 30 additions & 0 deletions amqptest/server/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,33 @@ func (d *DirectExchange) route(route string, delivery *Delivery) error {
return fmt.Errorf("No bindings to route: %s", route)

}

// FanoutExchange routes messages to all of the queues that are
// bound to it and the routing key is ignored.
type FanoutExchange struct {
name string
bindings map[string]*Queue
}

func NewFanoutExchange(name string) *FanoutExchange {
return &FanoutExchange{
name: name,
bindings: make(map[string]*Queue),
}
}

func (t *FanoutExchange) addBinding(route string, q *Queue) {
t.bindings[q.name] = q
}

func (t *FanoutExchange) delBinding(route string) {
delete(t.bindings, route)
}

func (t *FanoutExchange) route(route string, d *Delivery) error {
for _, q := range t.bindings {
q.data <- d
}

return nil
}
2 changes: 1 addition & 1 deletion amqptest/server/queue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package server

import "github.com/NeowayLabs/wabbit"
import "github.com/c-brooks/wabbit"

const (
QueueMaxLen = 2 << 8
Expand Down
4 changes: 2 additions & 2 deletions amqptest/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"sync"

"github.com/NeowayLabs/wabbit"
"github.com/NeowayLabs/wabbit/utils"
"github.com/c-brooks/wabbit"
"github.com/c-brooks/wabbit/utils"
)

const (
Expand Down
14 changes: 13 additions & 1 deletion amqptest/server/vhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package server

import (
"fmt"
"sync"

"github.com/NeowayLabs/wabbit"
"github.com/c-brooks/wabbit"
)

// VHost is a fake AMQP virtual host
type VHost struct {
name string
exchanges map[string]Exchange
queues map[string]*Queue
mu sync.Mutex
}

// NewVHost create a new fake AMQP Virtual Host
Expand All @@ -29,6 +31,7 @@ func (v *VHost) createDefaultExchanges() {
exchs := make(map[string]Exchange)
exchs["amq.topic"] = NewTopicExchange("amq.topic")
exchs["amq.direct"] = NewDirectExchange("amq.direct")
exchs["amq.fanout"] = NewFanoutExchange("amq.fanout")
exchs["topic"] = NewTopicExchange("topic")
exchs["direct"] = NewDirectExchange("direct")
exchs[""] = NewDirectExchange("amq.direct")
Expand Down Expand Up @@ -74,12 +77,18 @@ func (v *VHost) exchangeDeclare(name, kind string, passive bool, opt wabbit.Opti
v.exchanges[name] = NewTopicExchange(name)
case "direct":
v.exchanges[name] = NewDirectExchange(name)
case "fanout":
v.exchanges[name] = NewFanoutExchange(name)

default:
return fmt.Errorf("Invalid exchange type: %s", kind)
}

return nil
}
func (v *VHost) QueueInspect(name string) (wabbit.Queue, error) {
return v.QueueInspect(name)
}

func (v *VHost) QueueDeclare(name string, args wabbit.Option) (wabbit.Queue, error) {
return v.queueDeclare(name, false, args)
Expand All @@ -90,6 +99,9 @@ func (v *VHost) QueueDeclarePassive(name string, args wabbit.Option) (wabbit.Que
}

func (v *VHost) queueDeclare(name string, passive bool, args wabbit.Option) (wabbit.Queue, error) {
v.mu.Lock()
defer v.mu.Unlock()

if q, ok := v.queues[name]; ok {
return q, nil
}
Expand Down
8 changes: 4 additions & 4 deletions amqptest/server/vhost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package server
import (
"testing"

"github.com/NeowayLabs/wabbit"
"github.com/c-brooks/wabbit"
)

func TestVHostWithDefaults(t *testing.T) {
Expand All @@ -13,8 +13,8 @@ func TestVHostWithDefaults(t *testing.T) {
t.Errorf("Invalid broker name: %s", vh.name)
}

if len(vh.exchanges) < 5 || vh.exchanges[""] == nil ||
vh.exchanges["amq.direct"] == nil || vh.exchanges["amq.topic"] == nil {
if len(vh.exchanges) < 6 || vh.exchanges[""] == nil ||
vh.exchanges["amq.direct"] == nil || vh.exchanges["amq.topic"] == nil || vh.exchanges["amq.fanout"] == nil {
t.Errorf("VHost created without the required exchanges specified by amqp 0.9.1")
}
}
Expand Down Expand Up @@ -70,7 +70,7 @@ func TestBasicExchangeDeclare(t *testing.T) {
return
}

if len(vh.exchanges) != 6 {
if len(vh.exchanges) != 7 {
t.Errorf("Exchange not properly created: %d", len(vh.exchanges))
return
}
Expand Down
10 changes: 10 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module github.com/c-brooks/wabbit

go 1.16

require (
github.com/fsouza/go-dockerclient v1.7.2
github.com/pborman/uuid v1.2.1
github.com/streadway/amqp v1.0.0
github.com/tiago4orion/conjure v0.0.0-20150908101743-93cb30b9d218
)
Loading