2024-06-06 16:02:04 +02:00
|
|
|
package event
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"slices"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
2024-06-07 11:03:09 +02:00
|
|
|
// HandlerFunc is the function being called when receiving an event.
|
|
|
|
type HandlerFunc func(context.Context, any)
|
|
|
|
|
|
|
|
// Emit and event to the given topic
|
|
|
|
func Emit(topic string, event any) {
|
|
|
|
stream.emit(topic, event)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Subscribe a HandlerFunc to the given topic.
|
|
|
|
// A Subscription is being returned that can be used
|
|
|
|
// to unsubscribe from the topic.
|
|
|
|
func Subscribe(topic string, h HandlerFunc) Subscription {
|
|
|
|
return stream.subscribe(topic, h)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Unsubscribe unsubribes the given Subscription from its topic.
|
|
|
|
func Unsubscribe(sub Subscription) {
|
|
|
|
stream.unsubscribe(sub)
|
2024-06-06 16:02:04 +02:00
|
|
|
}
|
|
|
|
|
2024-06-07 11:03:09 +02:00
|
|
|
// Stop stops the event stream, cleaning up its resources.
|
|
|
|
func Stop() {
|
|
|
|
stream.stop()
|
|
|
|
}
|
2024-06-07 09:23:06 +02:00
|
|
|
|
2024-06-07 11:03:09 +02:00
|
|
|
var stream *eventStream
|
2024-06-07 09:23:06 +02:00
|
|
|
|
2024-06-07 11:03:09 +02:00
|
|
|
type event struct {
|
|
|
|
topic string
|
|
|
|
message any
|
|
|
|
}
|
2024-06-06 16:02:04 +02:00
|
|
|
|
|
|
|
// Subscription represents a handler subscribed to a specific topic.
|
|
|
|
type Subscription struct {
|
|
|
|
Topic string
|
|
|
|
CreatedAt int64
|
2024-06-07 11:03:09 +02:00
|
|
|
Fn HandlerFunc
|
2024-06-06 16:02:04 +02:00
|
|
|
}
|
|
|
|
|
2024-06-07 11:03:09 +02:00
|
|
|
type eventStream struct {
|
2024-06-06 16:02:04 +02:00
|
|
|
mu sync.RWMutex
|
|
|
|
subs map[string][]Subscription
|
|
|
|
eventch chan event
|
|
|
|
quitch chan struct{}
|
|
|
|
}
|
|
|
|
|
2024-06-07 11:03:09 +02:00
|
|
|
func newStream() *eventStream {
|
|
|
|
e := &eventStream{
|
2024-06-06 16:02:04 +02:00
|
|
|
subs: make(map[string][]Subscription),
|
|
|
|
eventch: make(chan event, 128),
|
|
|
|
quitch: make(chan struct{}),
|
|
|
|
}
|
|
|
|
go e.start()
|
|
|
|
return e
|
|
|
|
}
|
|
|
|
|
2024-06-07 11:03:09 +02:00
|
|
|
func (e *eventStream) start() {
|
2024-06-06 16:02:04 +02:00
|
|
|
ctx := context.Background()
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-e.quitch:
|
|
|
|
return
|
|
|
|
case evt := <-e.eventch:
|
|
|
|
if handlers, ok := e.subs[evt.topic]; ok {
|
|
|
|
for _, sub := range handlers {
|
2024-06-07 11:03:09 +02:00
|
|
|
go sub.Fn(ctx, evt.message)
|
2024-06-06 16:02:04 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-06-07 11:03:09 +02:00
|
|
|
func (e *eventStream) stop() {
|
2024-06-06 16:02:04 +02:00
|
|
|
e.quitch <- struct{}{}
|
|
|
|
}
|
|
|
|
|
2024-06-07 11:03:09 +02:00
|
|
|
func (e *eventStream) emit(topic string, v any) {
|
2024-06-06 16:02:04 +02:00
|
|
|
e.eventch <- event{
|
|
|
|
topic: topic,
|
|
|
|
message: v,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-06-07 11:03:09 +02:00
|
|
|
func (e *eventStream) subscribe(topic string, h HandlerFunc) Subscription {
|
2024-06-06 16:02:04 +02:00
|
|
|
e.mu.RLock()
|
|
|
|
defer e.mu.RUnlock()
|
|
|
|
|
|
|
|
sub := Subscription{
|
|
|
|
CreatedAt: time.Now().UnixNano(),
|
|
|
|
Topic: topic,
|
2024-06-07 11:03:09 +02:00
|
|
|
Fn: h,
|
2024-06-06 16:02:04 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
if _, ok := e.subs[topic]; !ok {
|
|
|
|
e.subs[topic] = []Subscription{}
|
|
|
|
}
|
|
|
|
|
|
|
|
e.subs[topic] = append(e.subs[topic], sub)
|
|
|
|
|
|
|
|
return sub
|
|
|
|
}
|
|
|
|
|
2024-06-07 11:03:09 +02:00
|
|
|
func (e *eventStream) unsubscribe(sub Subscription) {
|
2024-06-06 16:02:04 +02:00
|
|
|
e.mu.RLock()
|
|
|
|
defer e.mu.RUnlock()
|
|
|
|
|
|
|
|
if _, ok := e.subs[sub.Topic]; ok {
|
|
|
|
e.subs[sub.Topic] = slices.DeleteFunc(e.subs[sub.Topic], func(e Subscription) bool {
|
|
|
|
return sub.CreatedAt == e.CreatedAt
|
|
|
|
})
|
|
|
|
}
|
2024-06-07 11:03:09 +02:00
|
|
|
if len(e.subs[sub.Topic]) == 0 {
|
|
|
|
delete(e.subs, sub.Topic)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
stream = newStream()
|
2024-06-06 16:02:04 +02:00
|
|
|
}
|