Utility package: higher-level RabbitMQ AMQP message queuing
I made a higher-level, type-driven, simplified message-queuing API wrapping+hiding+streamlining RabbitMQ & streadway/amqp under the hood, source code is here.
High-Level API Workflow:
-
make a
Context
(later, when done,Close
it) -
for simple messaging, use it to declare a named
Queue
, thenQueue.Publish(anything)
Queue.SubscribeTo<Thing>s(myOnThingHandlers...)
-
to publish to multiple subscribers
- use the
Context
and an unnamedQueue
to declare anExchange
, - then
Exchange.Publish(anything)
- use the
-
for multiple worker instances, set 2
bool
s, as below
Example Scenarios
"Line-of-business object" types used here, BizEvent
and BizFoo
, are included
for demo purposes, to showcase how easily one may "type-safe-ish"ly broadcast
and subscribe-to any kind of custom, in-house struct type.
(Pseudo-code below ignores all the error
s returned that it should in reality check)
Simple publishing via Queue:
ctx := ezmq.NewLocalContext() // guest:guest@localhost:5672
defer ctx.Close()
var qcfg *ezmq.QueueConfig = nil // nil = use 'prudent' defaults
qe,_ := ctx.Queue('myevents', qcfg)
qe.Publish(ezmq.NewBizEvent("evt1", "DisEvent"))
qf,_ := ctx.Queue('myfoos', qcfg)
qf.Publish(&ezmq.BizFoo)
// some more for good measure:
qe.Publish(ezmq.NewBizEvent("evt2", "DatEvent"))
qf.Publish(&ezmq.BizFoo)
qe.Publish(ezmq.NewBizEvent("evt3", "SomeEvent"))
Simple subscribing via Queue:
onBizEvent := func(evt *ezmq.BizEvent)
qe.SubscribeToBizEvents(onBizEvent)
qf.SubscribeToBizFoos(func(foo *ezmq.Foo) )
for true
Multiple subscribers via Exchange:
qm,_ := ctx.Queue('', qcfg) // name MUST be empty
var xcfg *ezmq.ExchangeConfig = nil // as above, nil = defaults
ex,_ := ctx.Exchange('mybroadcast', xcfg, qm) // only pass `Queue`s that were declared with empty `name`
ex.Publish(ezmq.NewBizEvent("evt1", "DisEvent")) // publish via `Exchange`, not via `Queue`, same API
ex.Publish(&ezmq.BizFoo)
ex.Publish(ezmq.NewBizEvent("evt2", "DatEvent"))
ex.Publish(&ezmq.BizFoo)
Enabling multiple worker instances:
// Pub/Sub: pass &qcfg to ctx.Queue:
qcfg := ezmq.ConfigDefaultsQueue
qcfg.Pub.Persistent = true
qcfg.QosMultipleWorkerInstances = true // optionally, also handle `Queue.Config.Sub.OnAckError`
// Multi-Subs Pub: pass &xcfg to ctx.Exchange --- WITH an (unnamed) `Queue` as per above:
xcfg := ezmq.ConfigDefaultsExchange
xcfg.Pub.Persistent = true
// Rest as usual
Usage
var (
// Can be modified. Initially contains prudent defaults quite sensible
// during prototyping, until you *know* what few things you need to tweak and why.
// Used by `Context.Exchange()` if it is passed `nil` for its `cfg` arg.
ConfigDefaultsExchange = ExchangeConfig
)
var (
// Can be modified. Initially contains prudent defaults quite sensible
// during prototyping, until you *know* what few things you need to tweak and why.
// Used by `Context.Queue()` if it is passed `nil` for its `cfg` arg.
ConfigDefaultsQueue = QueueConfig
)
type BizEvent
type BizEvent struct
Just a minimal demo "line-of-business object" type that ezmq can pub/sub. This type as well as BizFoo showcase just how easily one would extend one's in-house / company-specific ezMQ wrapper library by additional custom message types as desired.
func NewBizEvent
func NewBizEvent(id string, name string) *BizEvent
func NewBizEventAt
func NewBizEventAt(id string, name string, date time.Time) *BizEvent
type BizFoo
type BizFoo struct
Just a minimal demo "line-of-business object" type that ezmq can pub/sub. This type as well as BizEvent showcase just how easily one would extend one's in-house / company-specific ezMQ wrapper library by additional custom message types as desired.
type Context
type Context struct
Provides access to the backing message-queue implementation, encapsulating the
underlying connection/channel primitives. Set the fields (as fits the project
context / local setup) before declaring the Queue
s or Exchange
s needed to
publish and subscribe, as those calls will connect if the Context
isn't
already connected. Subsequent field mutations are of course ignored as the
connection is kept alive. For clean-up or manual / pooled connection strategies,
Context
provides the Close
method.
func NewLocalContext
func NewLocalContext() Context
A convenient Context
for local-machine based prototyping/testing:
guest:guest@localhost:5672
func (*Context) Close
func () (chanCloseErr, connCloseErr error)
Be SURE to call this when done with ezmq, to cleanly dispose of underlying resource primitives.
func (*Context) Exchange
func (name string, cfg *ExchangeConfig, bindTo *Queue) (ex *Exchange, err error)
Declares an "exchange" for publishing to multiple subscribers via the specified
Queue
that MUST have been created with an empty name
. (NB. if
multiple-subscribers need not be supported, then no need for an Exchange
: just
use a simple Queue
only.) If cfg
is nil
, a copy of the current
ConfigDefaultsExchange
is used for ex.Config
, else a copy of cfg
. For
name
, see Exchange.Name
.
func (*Context) Queue
func (name string, cfg *QueueConfig) (q *Queue, err error)
Declares a queue with the specified name
for publishing and subscribing. If
cfg
is nil
, a copy of the current ConfigDefaultsQueue
is used for
q.Config
, else a copy of cfg
. For name
, DO refer to Queue.Name
.
type Exchange
type Exchange struct
Used in-place-of / in-conjunction-with a Queue
when needing to publish to
multiple subscribers. ONLY to be constructed via Context.Exchange()
, and
fields are not to be mutated thereafter! It remains associated with that
Context
for all its Publish
calls.
func (*Exchange) Publish
func (obj interface) error
Serializes the specified obj
and publishes it to this exchange.
type ExchangeConfig
type ExchangeConfig struct
Specialist tweaks for declaring an Exchange
to the backing message-queue. If
you don't know their meaning, you're best off keeping our defaults until
admins/dev-ops/unexpected-results suggest otherwise.
type Queue
type Queue struct
Used to publish and to subscribe. ONLY to be constructed via Context.Queue()
,
and fields are not to be mutated thereafter! It remains associated with that
Context
for all its Publish
and SubscribeTo
calls.
func (*Queue) Publish
func (obj interface) error
Serializes the specified obj
and publishes it to this exchange.
func (*Queue) SubscribeTo
func (makeEmptyObjForDeserialization func() interface, onMsg func(interface)) (err error)
Generic subscription mechanism used by the more convenient well-typed wrapper
functions such as SubscribeToBizEvents
and SubscribeToBizFoos
:
Subscribe to messages only of the Type returned by the specified
makeEmptyObjForDeserialization
constructor function used to allocate a new
empty/zeroed non-pointer struct value whenever attempting to deserialize a
message received from this Queue
. If that succeeds, a pointer to that value is
passed to the specified onMsg
event handler: this will always be passed a
non-nil pointer to the value (now populated with data) returned by
makeEmptyObjForDeserialization
, therefore safe to cast back to the Type.
func (*Queue) SubscribeToBizEvents
func (subscribers ...func(*BizEvent)) (err error)
A well-typed (to BizEvent
) wrapper around Queue.SubscribeTo
.
func (*Queue) SubscribeToBizFoos
func (subscribers ...func(*BizFoo)) (err error)
A well-typed (to BizFoo
) wrapper around Queue.SubscribeTo
.
type QueueConfig
type QueueConfig struct
Specialist tweaks for declaring a Queue
to the backing message-queue. If you
don't know their meaning, you're best off keeping our defaults until
admins/dev-ops/unexpected-results suggest otherwise.
type TweakPub
type TweakPub struct
Specialist tweaks for Publish
ing via a Queue
or an Exchange
. If you don't
know their meaning, you're best off keeping our defaults until
admins/dev-ops/unexpected-results suggest otherwise.
type TweakSub
type TweakSub struct
Specialist tweaks used from within Queue.SubscribeTo
. If you don't know their
meaning, you're best off keeping our defaults until
admins/dev-ops/unexpected-results suggest otherwise.