From 1b5fc6bda9a4ce582c34343528ec6185b55e3194 Mon Sep 17 00:00:00 2001 From: anthdm Date: Fri, 7 Jun 2024 11:03:09 +0200 Subject: [PATCH] fixed installer --- bootstrap/app/events/foo_event.go | 4 +- event/event.go | 69 ++++++++++++++++++++----------- event/event_test.go | 30 +++++++++++++- install.go | 20 +++++++-- 4 files changed, 92 insertions(+), 31 deletions(-) diff --git a/bootstrap/app/events/foo_event.go b/bootstrap/app/events/foo_event.go index d8fb211..be003e4 100644 --- a/bootstrap/app/events/foo_event.go +++ b/bootstrap/app/events/foo_event.go @@ -2,6 +2,4 @@ package events import "context" -func HandleFooEvent(ctx context.Context, event any) { - -} +func HandleFooEvent(ctx context.Context, event any) {} diff --git a/event/event.go b/event/event.go index 00b0539..ee256b1 100644 --- a/event/event.go +++ b/event/event.go @@ -7,36 +7,54 @@ import ( "time" ) +// HandlerFunc is the function being called when receiving an event. +type HandlerFunc func(context.Context, any) + +// Emit and event to the given topic +func Emit(topic string, event any) { + stream.emit(topic, event) +} + +// Subscribe a HandlerFunc to the given topic. +// A Subscription is being returned that can be used +// to unsubscribe from the topic. +func Subscribe(topic string, h HandlerFunc) Subscription { + return stream.subscribe(topic, h) +} + +// Unsubscribe unsubribes the given Subscription from its topic. +func Unsubscribe(sub Subscription) { + stream.unsubscribe(sub) +} + +// Stop stops the event stream, cleaning up its resources. +func Stop() { + stream.stop() +} + +var stream *eventStream + type event struct { topic string message any } -type CreateUserEvent struct{} - -func HandleCreateUserEvent(ctx context.Context, event CreateUserEvent) {} - -// Handler is the function being called when receiving an event. -type Handler func(context.Context, any) - // Subscription represents a handler subscribed to a specific topic. type Subscription struct { Topic string CreatedAt int64 - Handler Handler + Fn HandlerFunc } -// EventStream -type EventStream struct { +type eventStream struct { mu sync.RWMutex subs map[string][]Subscription eventch chan event quitch chan struct{} } -// New return a new EventStream -func New() *EventStream { - e := &EventStream{ +func newStream() *eventStream { + e := &eventStream{ subs: make(map[string][]Subscription), eventch: make(chan event, 128), quitch: make(chan struct{}), @@ -45,7 +63,7 @@ func New() *EventStream { return e } -func (e *EventStream) start() { +func (e *eventStream) start() { ctx := context.Background() for { select { @@ -54,35 +72,32 @@ func (e *EventStream) start() { case evt := <-e.eventch: if handlers, ok := e.subs[evt.topic]; ok { for _, sub := range handlers { - go sub.Handler(ctx, evt.message) + go sub.Fn(ctx, evt.message) } } } } } -// Stop stops the EventStream -func (e *EventStream) Stop() { +func (e *eventStream) stop() { e.quitch <- struct{}{} } -// Emit an event by specifying a topic and an arbitrary data type -func (e *EventStream) Emit(topic string, v any) { +func (e *eventStream) emit(topic string, v any) { e.eventch <- event{ topic: topic, message: v, } } -// Subscribe subscribes a handler to the given topic -func (e *EventStream) Subscribe(topic string, h Handler) Subscription { +func (e *eventStream) subscribe(topic string, h HandlerFunc) Subscription { e.mu.RLock() defer e.mu.RUnlock() sub := Subscription{ CreatedAt: time.Now().UnixNano(), Topic: topic, - Handler: h, + Fn: h, } if _, ok := e.subs[topic]; !ok { @@ -94,8 +109,7 @@ func (e *EventStream) Subscribe(topic string, h Handler) Subscription { return sub } -// Unsubscribe unsubscribes the given Subscription -func (e *EventStream) Unsubscribe(sub Subscription) { +func (e *eventStream) unsubscribe(sub Subscription) { e.mu.RLock() defer e.mu.RUnlock() @@ -104,4 +118,11 @@ func (e *EventStream) Unsubscribe(sub Subscription) { return sub.CreatedAt == e.CreatedAt }) } + if len(e.subs[sub.Topic]) == 0 { + delete(e.subs, sub.Topic) + } +} + +func init() { + stream = newStream() } diff --git a/event/event_test.go b/event/event_test.go index 620748e..1d14c75 100644 --- a/event/event_test.go +++ b/event/event_test.go @@ -1,8 +1,36 @@ package event import ( + "context" + "reflect" + "sync" "testing" ) -func TestEventStream(t *testing.T) { +func TestEventSubscribeEmit(t *testing.T) { + var ( + expect = 1 + wg = sync.WaitGroup{} + ) + wg.Add(1) + Subscribe("foo.bar", func(_ context.Context, event any) { + value, ok := event.(int) + if !ok { + t.Errorf("expected int got %v", reflect.TypeOf(event)) + } + if value != 1 { + t.Errorf("expected %d got %d", expect, value) + } + wg.Done() + }) + Emit("foo.bar", expect) + wg.Wait() +} + +func TestUnsubscribe(t *testing.T) { + sub := Subscribe("foo.bar", func(_ context.Context, _ any) {}) + Unsubscribe(sub) + if _, ok := stream.subs["foo.bar"]; ok { + t.Errorf("expected topic foo.bar to be deleted") + } } diff --git a/install.go b/install.go index 935d8f1..d72b05a 100644 --- a/install.go +++ b/install.go @@ -31,16 +31,30 @@ func main() { projectName := args[0] - clone := exec.Command("git clone " + reponame) + // check if gothkit folder already exists, if so, delete + fi, err := os.Stat("gothkit") + if err != nil { + log.Fatal(err) + } + if fi.IsDir() { + fmt.Println("-- deleting gothkit folder cause its already present") + if err := os.RemoveAll("gothkit"); err != nil { + log.Fatal(err) + } + } + + fmt.Println("-- cloning", reponame) + clone := exec.Command("git", "clone", reponame) if err := clone.Run(); err != nil { log.Fatal(err) } + fmt.Println("-- rename bootstrap to", projectName) if err := os.Rename(path.Join("gothkit", bootstrapFolderName), projectName); err != nil { log.Fatal(err) } - err := filepath.Walk(path.Join(projectName), func(fullPath string, info fs.FileInfo, err error) error { + err = filepath.Walk(path.Join(projectName), func(fullPath string, info fs.FileInfo, err error) error { if err != nil { return err } @@ -72,5 +86,5 @@ func main() { log.Fatal(err) } - fmt.Printf("project (%s) successfully installed!\n", projectName) + fmt.Printf("-- project (%s) successfully installed!\n", projectName) }