Go语言EventBus
EventBus是GoLang的小型轻量级事件总线,具有异步兼容性。
类似于观察者模式和发布订阅模式。
GitHub地址:https://github.com/asaskevich/EventBus
官方文档:https://pkg.go.dev/github.com/asaskevich/EventBus
1、安装使用
1.1 安装
go get github.com/asaskevich/EventBus
1.2 使用
import (
evbus "github.com/asaskevich/EventBus"
)
2、简单例子
package main
import (
"fmt"
"github.com/asaskevich/EventBus"
)
func calculator(a int, b int) {
fmt.Printf("%d\n", a + b)
}
func main() {
bus := EventBus.New();
bus.Subscribe("main:calculator", calculator);
bus.Publish("main:calculator", 20, 40);
bus.Unsubscribe("main:calculator", calculator);
}
# 程序输出
60
进行封装:
package main
import (
"fmt"
"github.com/asaskevich/EventBus"
)
type Bus struct {
EventBus EventBus.Bus
}
func calculator(a int, b int) {
fmt.Printf("a + b = "+"%d\n", a+b)
}
// Subscribe
func (bus *Bus) Subscribe() {
err := bus.EventBus.Subscribe("main:calculator", calculator)
if err != nil {
fmt.Printf("Subscribe Error!")
}
}
// UnSubscribe
func (bus *Bus) UnSubscribe() {
err := bus.EventBus.Unsubscribe("main:calculator", calculator)
if err != nil {
fmt.Printf("UnSubscribe Error!")
}
}
// Publish
func (bus *Bus) Publish() {
bus.EventBus.Publish("main:calculator", 33, 60)
}
func main() {
eventBus := EventBus.New()
bus := &Bus{EventBus: eventBus}
// 订阅一个
// Subscribe
bus.Subscribe()
// Publish
bus.Publish()
// UnSubscribe
bus.UnSubscribe()
}
# 程序输出
a + b = 93
订阅多个:
package main
import (
"fmt"
"github.com/asaskevich/EventBus"
)
type Bus struct {
EventBus EventBus.Bus
}
func calculator1(a int, b int) {
fmt.Printf("a + b = "+"%d\n", a+b)
}
func calculator2(a int, b int) {
fmt.Printf("a - b = "+"%d\n", a-b)
}
// Subscribe
func (bus *Bus) Subscribe() {
err1 := bus.EventBus.Subscribe("main:calculator", calculator1)
if err1 != nil {
fmt.Printf("Subscribe Error!")
}
err2 := bus.EventBus.Subscribe("main:calculator", calculator2)
if err2 != nil {
fmt.Printf("Subscribe Error!")
}
}
// UnSubscribe
func (bus *Bus) UnSubscribe() {
err1 := bus.EventBus.Unsubscribe("main:calculator", calculator1)
if err1 != nil {
fmt.Printf("UnSubscribe Error!")
}
err2 := bus.EventBus.Unsubscribe("main:calculator", calculator2)
if err2 != nil {
fmt.Printf("UnSubscribe Error!")
}
}
// Publish
func (bus *Bus) Publish() {
bus.EventBus.Publish("main:calculator", 33, 60)
}
func main() {
eventBus := EventBus.New()
bus := &Bus{EventBus: eventBus}
// 订阅多个
// Subscribe
bus.Subscribe()
// Publish
bus.Publish()
// UnSubscribe
bus.UnSubscribe()
}
# 程序输出
a + b = 93
a - b = -27
Subscribe 可以放在 init 函数中:
package main
import (
"fmt"
"github.com/asaskevich/EventBus"
)
type Bus struct {
EventBus EventBus.Bus
}
var bus *Bus
func calculator1(a int, b int) {
fmt.Printf("a + b = "+"%d\n", a+b)
}
func calculator2(a int, b int) {
fmt.Printf("a - b = "+"%d\n", a-b)
}
// Subscribe
func (bus *Bus) Subscribe() {
err1 := bus.EventBus.Subscribe("main:calculator", calculator1)
if err1 != nil {
fmt.Printf("Subscribe Error!")
}
err2 := bus.EventBus.Subscribe("main:calculator", calculator2)
if err2 != nil {
fmt.Printf("Subscribe Error!")
}
}
// UnSubscribe
func (bus *Bus) UnSubscribe() {
err1 := bus.EventBus.Unsubscribe("main:calculator", calculator1)
if err1 != nil {
fmt.Printf("UnSubscribe Error!")
}
err2 := bus.EventBus.Unsubscribe("main:calculator", calculator2)
if err2 != nil {
fmt.Printf("UnSubscribe Error!")
}
}
// Publish
func (bus *Bus) Publish() {
bus.EventBus.Publish("main:calculator", 33, 60)
}
func init(){
eventBus := EventBus.New()
bus = &Bus{EventBus: eventBus}
// 订阅多个
// Subscribe
bus.Subscribe()
}
func main() {
// Publish
bus.Publish()
// UnSubscribe
bus.UnSubscribe()
}
# 程序输出
a + b = 93
a - b = -27
3、实现的方法
- New()
- Subscribe()
- SubscribeOnce()
- HasCallback()
- Unsubscribe()
- Publish()
- SubscribeAsync()
- SubscribeOnceAsync()
- WaitAsync()
3.1 New()
New() 返回具有空处理程序的新EventBus。
bus := EventBus.New();
3.2 Subscribe()
Subscribe(topic string, fn interface{}) error
订阅主题,如果 fn 不是函数,则返回 error。
func Handler() { ... }
...
bus.Subscribe("topic:handler", Handler)
3.3 SubscribeOnce
SubscribeOnce(topic string, fn interface{}) error
订阅一个主题一次,执行后将删除处理程序。如果fn不是函数,则返回error。
func HelloWorld() { ... }
...
bus.SubscribeOnce("topic:handler", HelloWorld)
3.4 Unsubscribe
Unsubscribe(topic string, fn interface{}) error
删除为主题定义的回调,如果没有订阅主题的回调,则返回错误。
bus.Unsubscribe("topic:handler", HelloWord);
3.5 HasCallback
HasCallback(topic string) bool
如果存在订阅该主题的任何回调,则返回true。
bus.HasCallback("topic:handler")
3.6 Publish
Publish(topic string, args ...interface{})
发布执行为主题定义的回调,任何额外的参数都将被传输到回调。
func Handler(str string) { ... }
...
bus.Subscribe("topic:handler", Handler)
...
bus.Publish("topic:handler", "Hello, World!");
3.7 SubscribeAsync
SubscribeAsync(topic string, fn interface{}, transactional bool)
订阅具有异步回调的主题,如果fn不是函数,则返回error。
func slowCalculator(a, b int) {
time.Sleep(3 * time.Second)
fmt.Printf("%d\n", a + b)
}
bus := EventBus.New()
bus.SubscribeAsync("main:slow_calculator", slowCalculator, false)
bus.Publish("main:slow_calculator", 20, 60)
fmt.Println("start: do some stuff while waiting for a result")
fmt.Println("end: do some stuff while waiting for a result")
bus.WaitAsync() // wait for all async callbacks to complete
fmt.Println("do some stuff after waiting for result")
transactional 参数确定主题的后续回调是串行运行(true)还是并发运行(false)。
3.8 SubscribeOnceAsync
SubscribeOnceAsync(topic string, args ...interface{})
SubscribeOnceAsync的工作方式与SubscribeOnce类似,只是异步执行回调。
3.9 WaitAsync()
WaitAsync等待所有异步回调完成。文章来源:https://www.toymoban.com/news/detail-501603.html
4、跨流程事件
可与两个rpc服务配合使用:文章来源地址https://www.toymoban.com/news/detail-501603.html
- 从服务器侦听远程发布的事件的客户端服务
- 用于侦听客户端订阅的服务器服务
4.1 server
func main() {
server := NewServer(":2010", "/_server_bus_", New())
server.Start()
// ...
server.EventBus().Publish("main:calculator", 4, 6)
// ...
server.Stop()
}
4.2 client
func main() {
client := NewClient(":2015", "/_client_bus_", New())
client.Start()
client.Subscribe("main:calculator", calculator, ":2010", "/_server_bus_")
// ...
client.Stop()
}
到了这里,关于Go语言EventBus的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!