Initial Commit

pull/11/head
Seven Du 11 months ago
commit 7aa26813e8
  1. 2
      .gitignore
  2. 9
      LICENSE
  3. 13
      Makefile
  4. 7
      README.md
  5. 58
      Tutorial
  6. 4
      core/.gitignore
  7. 363
      core/README.md
  8. 278
      core/app.go
  9. 12
      core/config/config.go
  10. 63
      core/config/constants.go
  11. 19
      core/core.go
  12. 5
      core/ctrl/Makefile
  13. 258
      core/ctrl/bus/bus.go
  14. 118
      core/ctrl/bus/queue.go
  15. 933
      core/ctrl/channel.go
  16. 80
      core/ctrl/channel_test.go
  17. 347
      core/ctrl/client.go
  18. 309
      core/ctrl/ctrl.go
  19. 92
      core/ctrl/errors.go
  20. 127
      core/ctrl/hub.go
  21. 35
      core/ctrl/nats/context.go
  22. 343
      core/ctrl/nats/nats.go
  23. 116
      core/ctrl/nats/nats_test.go
  24. 195
      core/ctrl/nats/options.go
  25. 57
      core/ctrl/node.go
  26. 81
      core/ctrl/proto.go
  27. 451
      core/ctrl/serve.go
  28. 37
      core/ctrl/store/config.go
  29. 45
      core/ctrl/store/register.go
  30. 24
      core/ctrl/store/route.go
  31. 137
      core/ctrl/store/session.go
  32. 84
      core/ctrl/store/store.go
  33. 31
      core/ctrl/store/store_test.go
  34. 32
      core/ctrl/store/template.go
  35. 72
      core/flag.go
  36. 88
      core/perm/options.go
  37. 20
      core/perm/perm.go
  38. 678
      core/proto/base/base.pb.go
  39. 76
      core/proto/base/base.proto
  40. 6664
      core/proto/xctrl/xctrl.pb.go
  41. 722
      core/proto/xctrl/xctrl.pb.stack.go
  42. 785
      core/proto/xctrl/xctrl.proto
  43. 19
      core/proto/xctrlext/xctrl.go
  44. 95
      core/snowflake/id.go
  45. 18
      core/snowflake/id_test.go
  46. 365
      core/snowflake/snowflake.go
  47. 476
      core/snowflake/snowflake_test.go
  48. 100
      core/tboy/README.md
  49. 1656
      core/tboy/tboy.go
  50. 515
      core/tboy/tboy_acd.go
  51. 74
      core/tboy/tboy_simple.go
  52. 13
      example/inbound/go.mod
  53. 745
      example/inbound/go.sum
  54. 80
      example/inbound/main.go
  55. 39
      go.mod
  56. 767
      go.sum
  57. 5
      main.go
  58. 2
      stack/.gitignore
  59. 191
      stack/LICENSE
  60. 48
      stack/README.md
  61. 36
      stack/README.zh-cn.md
  62. 187
      stack/api/api.go
  63. 152
      stack/api/api_test.go
  64. 123
      stack/api/handler/api/api.go
  65. 119
      stack/api/handler/api/util.go
  66. 46
      stack/api/handler/api/util_test.go
  67. 141
      stack/api/handler/event/event.go
  68. 14
      stack/api/handler/handler.go
  69. 100
      stack/api/handler/http/http.go
  70. 126
      stack/api/handler/http/http_test.go
  71. 70
      stack/api/handler/options.go
  72. 522
      stack/api/handler/rpc/rpc.go
  73. 112
      stack/api/handler/rpc/rpc_test.go
  74. 259
      stack/api/handler/rpc/stream.go
  75. 177
      stack/api/handler/web/web.go
  76. 28
      stack/api/internal/proto/message.pb.go
  77. 335
      stack/api/proto/api.pb.go
  78. 21
      stack/api/proto/api.pb.micro.go
  79. 43
      stack/api/proto/api.proto
  80. 38
      stack/api/resolver/grpc/grpc.go
  81. 29
      stack/api/resolver/host/host.go
  82. 33
      stack/api/resolver/options.go
  83. 37
      stack/api/resolver/path/path.go
  84. 44
      stack/api/resolver/resolver.go
  85. 69
      stack/api/resolver/vpath/vpath.go
  86. 52
      stack/api/router/options.go
  87. 498
      stack/api/router/registry/registry.go
  88. 34
      stack/api/router/registry/registry_test.go
  89. 24
      stack/api/router/router.go
  90. 356
      stack/api/router/static/static.go
  91. 27
      stack/api/router/util/LICENSE.txt
  92. 115
      stack/api/router/util/compile.go
  93. 122
      stack/api/router/util/compile_test.go
  94. 363
      stack/api/router/util/parse.go
  95. 321
      stack/api/router/util/parse_test.go
  96. 24
      stack/api/router/util/pattern.go
  97. 283
      stack/api/router/util/runtime.go
  98. 62
      stack/api/router/util/types.go
  99. 93
      stack/api/router/util/types_test.go
  100. 28
      stack/api/server/acme/acme.go
  101. Some files were not shown because too many files have changed in this diff Show More

2
.gitignore vendored

@ -0,0 +1,2 @@
.vscode/
.idea/

@ -0,0 +1,9 @@
MIT License
Copyright (c) <year> <copyright holders>
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

@ -0,0 +1,13 @@
GOPATH:=$(shell go env GOPATH)
LOCAL_YML=
ifeq ($(VERSION),)
VERSION := latest
endif
.PHONY: proto
proto:
protoc --proto_path=${GOPATH}/src:. --stack_out=../ core/proto/xctrl/*.proto
java:
protoc --proto_path=${GOPATH}/src:. --java_out=../ core/proto/xctrl/*.proto

@ -0,0 +1,7 @@
# xctrl - XSwitch XCC API Go 语言 SDK
小樱桃在用的Go语言SDK。
- 使用文档参见:https://git.xswitch.cn/xswitch/xctrl/src/branch/master/core
- 示例:https://git.xswitch.cn/xswitch/xcc-examples/src/branch/master/go

@ -0,0 +1,58 @@
# xctrl开放接口使用说明书
## 1、项目中下载使用xctrl库
```
go get git.xswitch.cn/xswitch/xctrl
```
### 使用方法
#### 1、订阅Nats消息并获取nats消息解析
1) 、初始化控制中心,并实现Handler接口
```
ctrl.Init(&Handler{})
ctrl.EnableEvent("topic-test")
type Handler struct {}
// ctx , topic, reply,Params
func Request(ctx context.Context, topic string, reply string, reqest *Request){}
// ctx , topic ,reply Params
func App(ctx context.Context, topic string, reply string, message *Message){}
// ctx , topic , Params
func Event(ctx context.Context, topic string, request *Request){}
// ctx , topic , Params
func Result(ctx context.Context,topic string, result *Result){}
```
2) *message.Params是订阅nats主题接收到的消息,通过Unmarshal(Go语言,其他语言自行处理)获取结构体数据
```
channel := new(ctrl.Channel)
json.Unmarshal(*message.Params, channel)
```
#### 2、调用rpc方法
```
import "git.xswitch.cn/xswitch/xctrl/core/ctrl"
response, err := ctrl.Service().Play(
ctx,
&xctrl.PlayRequest{
Uuid: uuid,
Media: &xctrl.Media{
Type: xctrl.MediaType_FILE.String(),
Data: file,
},
},
ctrl.WithAddress(channel.GetNodeUuid()))
```

4
core/.gitignore vendored

@ -0,0 +1,4 @@
.idea
.vscode
*.pb.go-e

@ -0,0 +1,363 @@
# XCC Core
Core提供XCC公共包。
目录结构
- xlog 日志
- expr 判断表达式
- ctrl 节点管理
- perm 权限控制
- proto 协议
- tboy 是一个冒牌的的FreeSWITCH,用于测试
# bus
`bus`是一个消息总线,相当于一个内部消息队列,支持Pub/Sub模式。
```go
bus.Subscribe("topic", "queue", func(ev *Event) error {
})
ev := NewEvent("Flag", "test-topic", "message", "data")
bus.Publish(ev)
```
`Publish`用于异步地往消息队列中发送一个消息。消息会发送到一个`chan`缓冲队列中,如果队列中未消费的消息达到最大值,`Publish`操作将会被阻塞。默认的最大值为:`inboundBufferSize = 10240000`。
`Subscribe`用于订阅一个主题(`toipc`),收到消息后会回调一个回调函数。如果`queue`参数为空字符串,则回调函数会在一个新的Go Routine中回调,因此可能无法保证顺序。
如果`queue`非空,则为对于每一个订阅者而言,每一个`queue`生成一个Go Routine,所有发送到该`queue`的消息将会被顺序调用,因此应该保证`queue`的粒度,在回调函数中不要过度阻塞。
`queue`的典型应用是针对在FreeSWITCH中的一路通话,每一个Channel UUID都可以作为一个独立的`queue`进行订阅,这样,即使消息回调函数发生阻塞,也只影响这一路通话。
如果Event的`Flag`参数为`DESTROY`,则Go Routine将会终止,并自动取消订阅。
## 过期
在异常情况下,可能由于收不到`DESTROY`相关的消息,导致Go Routine无法正常终止,相关的资源也无法释放。使用`SubscribeWithExpire`,可以在极端情况下保证资源释放。需要检查回调中的`Flag`是否为`TIMEOUT`,如:
```go
bus.SubscribeWithExpire("topic", "queue", time.Hour, func(ev *Event) error {
if ev.Flag == "TIMEOUT" {
bus.Unsubscribe("topic", "queue")
}
})
```
## 多次订阅相同的`topic`和相同的`queue`
在实际生产中会有很多个订阅者同时订阅相同的`topic`和相同的`queue`,多个订阅者是竞争关系,即对于一个特定的消息,有且只有一个订阅者能接收到消息。这一点跟NATS的Queue订阅类似。
## 多次订阅相同的`topic`和不同的`queue`
多个订阅者订阅相同的`topic`和不同的`queue`,对于一条特定的消息,多个订阅者都能收到。跟NATS类似。
## 多次订阅不同的`topic`和相同的`queue`
在实际生产中会有很多个订阅者订阅不同的`topic`和相同的`queue`,`queue`之间没有必然的联系,因为订阅者首先是以Topic区分的。
# ctrl
`ctrl`是FreeSWITCH控制器,用于控制FreeSWITCH。它提供了一些函数方便与FreeSWITCH交互。
## ctrl.Init
初始化
```go
func Init(h Handler, trace bool, addrs string) error
```
* h是一个`ctrl.Handler`类型的结构,必须实现它定义的几个函数,下面会有详细描述。
* `trace`:是否开启内部Trace日志。
* `addrs`是NATS地址,可能可以支持多个以逗号分隔的地址,但未测试。
初始化后,内部会生成一个全局的`globalCtrl`单例,用于存储内部状态。
Handler是一个`interface`,必须实现如下几个函数(可以是空函数)。
```go
type Handler interface {
// ctx , topic, reply,Params
Request(context.Context, string, string, *Request)
// ctx , topic ,reply Params
App(context.Context, string, string, *Message)
// ctx , topic , Params
Event(context.Context, string, *Request)
// ctx , topic , Params
Result(context.Context, string, *Result)
}
```
## ctrl.EnableRequest
```go
func EnableRequest(topic string) error
```
订阅Request请求消息。主要用于处理FreeSWITCH的请求,如`dialplan`、`directory`、`config`等。
## ctrl.EnableApp
```go
func EnableApp(topic string) error
```
订阅一个`Topic`,是一个全能的订阅方式,包括接收Node的事件、返回结果等。
对于`Event.Channel`事件,它将启用一个`bus`订阅进行处理,因而,对于同一个Channel UUID来说,回调是串行的。
对于其它事件,它将使用新的Go Routine进行回调,因而,无法保证顺序。
## EnableEvent
```go
func EnableEvent(topic string, queue string) error
```
订阅事件相应的Topic,如`cn.xswitch.ctrl.cdr`。目前,除`cn.xswitch.ctrl.cdr`是在NATS中串行回调外,其它均为在新的Go Routine中回调。
## Subscribe
```go
func Subscribe(topic string, cb nats.EventCallback, queue string) (nats.Subscriber, error)
```
调用底层的NATS发起一个订阅。所有回调在同一个NATS Go Routine中回调。需要避免阻塞。
## 对Node中Channel的处理
Node侧为FreeSWITCH侧,订阅`cn.xswitch.node`以及`cn.xswitch.node.$node-uuid`。
Ctrl侧为控制侧,订阅`cn.xswitch.ctrl`及`cn.xswitch.ctrl.$ctrl-uuid`。
对于呼入,FreeSWITCH会发送`Event.Channel`消息,第一个消息是`state = START`,最后一个是`state = DESTROY`。
对于呼出,第一个消息是`state = CALLING`、最后一个是`state = DESTROY`。
只要Channel产生,都会产生`Event.CDR`事件。
### 同步处理机制
同步处理机制简单。系统通过`client`包,直接进行NATS同步调用。
收到`state = START`后,执行
```go
result, err := ctrl.Service().Accept(...)
ctrl.Service().Answer(...)
ctrl.Service().Play(...)
ctrl.Service().Hangup(...)
```
由于这些操作都是阻塞的,因而,要保证在一个新的Go Routine中运行,以避免阻塞消息的接收。
同步调用使用简单,但有个明显的不足,比如,`Play`是阻塞的,无法在当前的Go Routine中终止。如果需要提前终止一个长的`Play`操作,可以在其它的Go Routine中执行`Stop`,这通常需要需要外部的触发机制(如API),或提前启动一个Go Routine专门用于定时发`Stop`。
如果`Play`正常结束,会返回`code = 200`,如果被中止,通常会返回`code = 410`。有时候,对端主动挂机,也会导致`Play`提前终止。
可以通过检查Play的返回码,或者根据是否接收到`state = DESTROY`消息,或者主动发`XNode.GetState`接口向Node查询Channel的生存状态。
关于`err`的处理:
上述接口返回的`err`是一个`*errors.Error`类型(在`stack/`中实现),可以按如下方式处理:
```go
if err != nil {
err1 := err.(*errors.Error)
if err1.code == 500 {
}
}
```
### 基于Context的同步处理机制
上述同步处理机制中,如果对端没有响应,则在超时前无法取消。可以使用Context进行超时设置或中途取消。
```go
ctx, cancel := context.ContextWithTimeout(Context.Background(), 1 * time.Second)
resullt, err := ctrl.AService().Play(ctx, ...)
defer cancel()
if err != nil {
err1 := err.(*errors.Error)
if err1.code == 408 {// timeout
}
}
```
```go
ctx, cancel := context.ContextWithTimeout(Context.Background(), 1 * time.Second)
resullt, err := ctrl.AService().Play(ctx, ...)
go func() {
// 100ms后cancel
time.Sleep(100 * time.MilliSecond)
cancel()
}
if err != nil {
err1 := err.(*errors.Error)
if err1.code == 499 {// canceled ...
}
}
```
### 异步处理机制
有一个`ctrl.AsyncService()`可以发送异步的命令。如:
```go
ctrl.AsyncService().Play(...)
```
异步命令调用`nats.Publish`发送消息,会立即返回。除非NATS连接失败,结果永远会返回`code = 201`。
异步命令无法获取执行结果。
虽然有一个`ctrl.EnableResult`可用,但它独占一个订阅主题,实际上用处不大。
### ACall接口
该接口是一个试验接口。
另一种处理方式是不使用上述机制,通过独立的订阅支持ACall接口。
```go
ctrl.Subscribe("cn.xswitch.ctrl."+ctrl.UUID(), EventCallback, "ctrl")
```
订阅后,以在`EventCallback`回调中再调用`ctrl.DoResultCallback`处理结果:
```go
func EventCallback(ctx context.Context, ev nats.Event) error {
xlog.Info(ev.Topic(), string(ev.Message().Body))
var msg ctrl.Message
err := json.Unmarshal(ev.Message().Body, &msg)
if err != nil {
xlog.Error("parse error", ev)
return err
}
if msg.Method == "" { // maybe a result
go ctrl.DoResultCallback(&msg)
return nil
}
xlog.Error(msg.Method)
switch msg.Method {
case "Event.Channel":
...
```
由于该`EventCallback`是调用者自己实现的,因而可以自己选择是否在Go Routine中进行回调。
在调用时,可以通过`ctrl.ACallOption().WithCallback()`传入要回调的函数。
```go
err := ctrl.ACall(node, "Dial",
&xctrl.DialRequest{
CtrlUuid: channel.CtrlUuid,
Destination: &xctrl.Destination{
GlobalParams: map[string]string{
"ignore_early_media": "true",
},
CallParams: []*xctrl.CallParam{
{
Uuid: channel.Uuid,
CidName: "TEST",
CidNumber: "test",
DestNumber: "1008",
DialString: "sofia/public/10000210@rts.xswitch.cn:20003",
Params: map[string]string{
"absolute_codec_string": "PCMA,PCMU",
},
},
},
},
},
ctrl.ACallOption().WithCallback(func(msg *ctrl.Message, data interface{}) {
xlog.Info(string(*msg.Result))
r := &xctrl.DialResponse{}
err := json.Unmarshal(*msg.Result, &r)
if err != nil {
xlog.Error(err)
}
xlog.Info(r.Cause)
}),
)
```
实际使用时,建议使用上面介绍的`bus`队列机制对同一个Channel UUID相关的消息分流到独立的Go Routine中,这样,可以更好的控制生命周期。
### Channel的生命周期
呼入
```graphviz
digraph G {
START -> DESTROY
START -> RING -> ANSWER -> DESTROY[color=green]
START -> ANSWER -> DESTROY[color=blue]
START -> ANSWER -> BRIDGE -> UNBRIDGE -> DESTROY[color=red]
}
```
呼出,其中,M代表有媒体,N代表`ignore_early_media=true`的情况。
```graphviz
digraph G {
CALLING -> RING -> DESTROY
CALLING -> RING -> ANSWER -> DESTROY[color=green]
CALLING -> ANSWER -> DESTROY[style=dashed color=grey]
CALLING -> RING -> MEDIA -> READY -> ANSWER -> DESTROY[color=blue label="M"]
CALLING -> RING -> MEDIA -> ANSWER -> READY-> DESTROY[color=red label="N"]
CALLING -> RING -> MEDIA -> BRIDGE -> ANSWER -> UNBRIDGE -> DESTROY[color=purple label="M"]
CALLING -> RING -> MEDIA -> ANSWER -> BRIDGE -> UNBRIDGE -> DESTROY[color=pink label=N]
}
```
在调用`XNode.Dial`外呼的时候,在`ignore_early_media=false`(默认)的情况下,收到`MEDIA`就会触发READY事件。如果为`true`,则需要等到`ANSWER`以后才会触发`READY`状态。不管什么情况,都需要在收到`READY`状态后才可以对Channel进行控制。
在执行`XNode.Bridge`时,没有`READY`事件,这时可以根据`ANSWER`或`BRIDGE`事件处理业务逻辑。
在XNode中,一个Channel从创建开始(`state = START`或`state = CALLING`),到销毁(`state = DESTROY`),是一个完整的生命周期。销毁前,会发送`Event.CDR`事件,通常会在单独的Topic上发出(可配置)。
由于`Event.Channel`并不包含完整的数据(通道变量等),因而建议在Ctrl侧对Channel数据进行缓存。简单的缓存办法是直接根据Channel UUID放到一个`Map`中。由于Channel更新相对频繁,因而`sync.Map`可能不大适用,直接用`Map + sync.Mutex`可能更直接一些。
一般来说,只要Channel被创建,总会有对应的`DESTROY`消息。但是,在XNode发生崩溃的情况下,需要准备超时垃圾回收机制。
这样Ctrl的总体实现就会很复杂。
另一种实现思路是将Channel相关的状态都在XNode侧用通道变量保存。每次事件都带上全量的通道变量,这样Ctrl侧的逻辑实现就会简单一些,代价是会增加NATS消息吞吐量,因为大多数情况下,绝大部分的通道变量是无用的。全量的通道变量暂时还没有实现。
一种优化方案是根据实际的业务场景选择是否启用和传递哪些通道变量。暂时还没有实现。
## Context
Ctrl中的Context使用了标准的Go Context包,目前没有太大用处,大部分可以直接传入`context.Background()`或`context.TODO()`。
## queueBufferSize
在订阅事件的时候会使用这个变量大小进行channel的初始化,1024容量足够事件使用,太小会导致程序阻塞卡顿,影响运行效率。
## proto 扩展
为了弥补protobuffer对不固定层次对象的处理不友好,在proto 包中定义了一些以ext结尾的包,用于解决不确定对象的解析。
例如xctrlext.NativeJsData Data使用interface{}用于接收任意对象
```
type NativeJsData struct {
Command string `json:"command,omitempty"`
Data interface{} `json:"data,omitempty"`
}
```
## 其它
目前,`EnableXXX`之类的都是在Queue方式订阅的,没有考虑到多Ctrl的情况。有待进一步设计。

@ -0,0 +1,278 @@
package core
import (
"context"
"fmt"
"strings"
"time"
"git.xswitch.cn/xswitch/xctrl/stack"
"git.xswitch.cn/xswitch/xctrl/stack/broker"
"git.xswitch.cn/xswitch/xctrl/stack/broker/nats"
"git.xswitch.cn/xswitch/xctrl/stack/client"
"git.xswitch.cn/xswitch/xctrl/stack/registry"
"git.xswitch.cn/xswitch/xctrl/stack/registry/etcd"
"git.xswitch.cn/xswitch/xctrl/stack/selector"
"git.xswitch.cn/xswitch/xctrl/stack/server"
)
type serviceKey struct{}
// Service is an interface that wraps the lower level libraries
// within stack-rpc. Its a convenience method for building
// and initialising services.
type Service interface {
// The service name
Name() string
// Init initialises options
Init(...stack.Option)
// Options returns the current options
Options() stack.Options
// Client is used to call services
Client() client.Client
// Server is for handling requests and events
Server() server.Server
// Run the service
Run() error
// The service implementation
String() string
}
// Option set option callback
type Option func(*stack.Options)
func Broker(b broker.Broker) stack.Option {
return func(o *stack.Options) {
o.Broker = b
// Update Client and Server
o.Client.Init(client.Broker(b))
o.Server.Init(server.Broker(b))
}
}
func Client(c client.Client) stack.Option {
return func(o *stack.Options) {
o.Client = c
}
}
// Context specifies a context for the service.
// Can be used to signal shutdown of the service.
// Can be used for extra option values.
func Context(ctx context.Context) stack.Option {
return func(o *stack.Options) {
o.Context = ctx
}
}
// HandleSignal toggles automatic installation of the signal handler that
// traps TERM, INT, and QUIT. Users of this feature to disable the signal
// handler, should control liveness of the service through the context.
func HandleSignal(b bool) stack.Option {
return func(o *stack.Options) {
o.Signal = b
}
}
func Server(s server.Server) stack.Option {
return func(o *stack.Options) {
o.Server = s
}
}
// Registry sets the registry for the service
// and the underlying components
func Registry(r registry.Registry) stack.Option {
return func(o *stack.Options) {
o.Registry = r
// Update Client and Server
o.Client.Init(client.Registry(r))
o.Server.Init(server.Registry(r))
// Update Selector
o.Client.Options().Selector.Init(selector.Registry(r))
// Update Broker
o.Broker.Init(broker.Registry(r))
}
}
// Selector sets the selector for the service client
func Selector(s selector.Selector) stack.Option {
return func(o *stack.Options) {
o.Client.Init(client.Selector(s))
}
}
// Address sets the address of the server
func Address(addr string) stack.Option {
return func(o *stack.Options) {
o.Server.Init(server.Address(addr))
}
}
// Name of the service
func Name(n string) stack.Option {
return func(o *stack.Options) {
o.Server.Init(server.Name(n))
}
}
// Version of the service
func Version(v string) stack.Option {
return func(o *stack.Options) {
o.Server.Init(server.Version(v))
}
}
// Metadata associated with the service
func Metadata(md map[string]string) stack.Option {
return func(o *stack.Options) {
o.Server.Init(server.Metadata(md))
}
}
// RegisterTTL specifies the TTL to use when registering the service
func RegisterTTL(t time.Duration) stack.Option {
return func(o *stack.Options) {
o.Server.Init(server.RegisterTTL(t))
}
}
// RegisterInterval specifies the interval on which to re-register
func RegisterInterval(t time.Duration) stack.Option {
return func(o *stack.Options) {
o.Server.Init(server.RegisterInterval(t))
}
}
// WrapClient is a convenience method for wrapping a Client with
// some middleware component. A list of wrappers can be provided.
// Wrappers are applied in reverse order so the last is executed first.
func WrapClient(w ...client.Wrapper) stack.Option {
return func(o *stack.Options) {
// apply in reverse
for i := len(w); i > 0; i-- {
o.Client = w[i-1](o.Client)
}
}
}
// WrapCall is a convenience method for wrapping a Client CallFunc
func WrapCall(w ...client.CallWrapper) stack.Option {
return func(o *stack.Options) {
o.Client.Init(client.WrapCall(w...))
}
}
// WrapHandler adds a handler Wrapper to a list of options passed into the server
func WrapHandler(w ...server.HandlerWrapper) stack.Option {
return func(o *stack.Options) {
var wrappers []server.Option
for _, wrap := range w {
wrappers = append(wrappers, server.WrapHandler(wrap))
}
// Init once
o.Server.Init(wrappers...)
}
}
// WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server
func WrapSubscriber(w ...server.SubscriberWrapper) stack.Option {
return func(o *stack.Options) {
var wrappers []server.Option
for _, wrap := range w {
wrappers = append(wrappers, server.WrapSubscriber(wrap))
}
// Init once
o.Server.Init(wrappers...)
}
}
// Before and Afters
func BeforeStart(fn func() error) stack.Option {
return func(o *stack.Options) {
o.BeforeStart = append(o.BeforeStart, fn)
}
}
func BeforeStop(fn func() error) stack.Option {
return func(o *stack.Options) {
o.BeforeStop = append(o.BeforeStop, fn)
}
}
func AfterStart(fn func() error) stack.Option {
return func(o *stack.Options) {
o.AfterStart = append(o.AfterStart, fn)
}
}
func AfterStop(fn func() error) stack.Option {
return func(o *stack.Options) {
o.AfterStop = append(o.AfterStop, fn)
}
}
// FromContext retrieves a Service from the Context.
func FromContext(ctx context.Context) (Service, bool) {
s, ok := ctx.Value(serviceKey{}).(Service)
return s, ok
}
// NewContext returns a new Context with the Service embedded within it.
func NewContext(ctx context.Context, s Service) context.Context {
return context.WithValue(ctx, serviceKey{}, s)
}
// RegisterHandler is syntactic sugar for registering a handler
func RegisterHandler(s server.Server, h interface{}, opts ...server.HandlerOption) error {
return s.Handle(s.NewHandler(h, opts...))
}
// RegisterSubscriber is syntactic sugar for registering a subscriber
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
return s.Subscribe(s.NewSubscriber(topic, h, opts...))
}
// LogMiddlewareWrapper 请求日志中间件
func LogMiddlewareWrapper(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
start := time.Now()
err := fn(ctx, req, rsp)
duration := time.Since(start)
if req.Method() != "Acd.Getmakecallevents" && req.Method() != "Acd.Getworklist" { // this method might be called too much
fmt.Printf("| %s | %v | %s", req.Method(), duration, string(ParseRequest(req)))
}
return err
}
}
// NewService returns a new mucp service
func NewService(name string, version string, brokerAddress string, registryAddress string) Service {
if name == "mock" {
return stack.NewService(
stack.Name(name),
stack.Version(version),
)
}
b := nats.NewBroker(broker.Addrs(strings.Split(brokerAddress, ",")...))
r := etcd.NewRegistry(registry.Addrs(strings.Split(registryAddress, ",")...))
srv := stack.NewService(
stack.Name(name),
stack.Version(version),
stack.Broker(b),
stack.WrapHandler(LogMiddlewareWrapper),
stack.Registry(r),
stack.RegisterInterval(15*time.Second),
stack.RegisterTTL(30*time.Second),
)
return srv
}

@ -0,0 +1,12 @@
package config
import (
"git.xswitch.cn/xswitch/xctrl/core/ctrl/store"
)
// fromCache .
func fromCache(key string) (string, error) {
return store.NewConfig().Read(key)
}
// FromDB .

@ -0,0 +1,63 @@
package config
const (
// AuthPrivateKey Token私匙
AuthPrivateKey = `core_auth_private_key`
// AuthPublicKey Token签名公钥
AuthPublicKey = `core_auth_public_key`
// AuthExpiry Token 过期时间
AuthExpiry = `core_auth_expiry`
// SmsTemplateCaptcha 验证码短信模板
SmsTemplateCaptcha = `sys_sms_template_captcha`
// EmailTemplateCaptcha 验证码邮件模板
EmailTemplateCaptcha = `sys_email_template_captcha`
// EmailSubjectCaptcha 验证码邮件主题
EmailSubjectCaptcha = `sys_email_subject_captcha`
// SMTPHost 邮件服务器地址
SMTPHost = `sys_smtp_host`
// SMTPPort 邮件服务器端口
SMTPPort = `sys_smtp_port`
// SMTPUsername 系统邮件发送账号
SMTPUsername = `sys_smpt_username`
// SMTPPassword 系统邮件发送密码
SMTPPassword = `sys_smpt_password`
// MediaURI 媒体路径
MediaURI = `core_media_uri`
// CodecPrefs 媒体编码
CodecPrefs = `core_codec_prefs`
// MediaMixInboundOutboundCodecs 协商全部媒体编码
MediaMixInboundOutboundCodecs = `core_media_mix_inbound_outbound_codecs`
// RecordingPath 录音路径
RecordingPath = `core_recording_path`
// RecordingType 录音转换类型
RecordingType = `core_recording_type`
// CallTracking 是否开启呼叫跟踪
CallTracking = `core_call_tracking`
// AgentStateInboundCall 是否启用坐席呼入状态跟踪
AgentStateInboundCall = `agent_state_inbound_call`
// CronExpireDays 定时数据保存时间
CronExpireDays = `cron_expire_days`
// PrimaryDomainName 第三方域
PrimaryDomainName = `core_primary_domain_name`
// UniqueEmployeeNumber 工号是否全局唯一
UniqueEmployeeNumber = `core_unique_employee_number`
// ServiceNames 服务名字列表
ServiceNames = `sys_service_name`
// CacheSettings 缓存配置
CacheSettings = `core_cache_settings`
// AgentFailAcwTime 坐席桥接失败后话后处理时长
AgentFailAcwTime = `agent_fail_acw_time`
//是否开启通话计时挂断:true:开启,false:关闭
CallTimeStart = `core_call_time_start`
)

@ -0,0 +1,19 @@
package core
import (
"encoding/json"
"git.xswitch.cn/xswitch/xctrl/stack/server"
)
// Response .
type Response struct {
Code int `json:"code"`
Message string `json:"message"`
}
// ParseRequest .
func ParseRequest(req server.Request) []byte {
data, _ := json.Marshal(req.Body())
return data
}

@ -0,0 +1,5 @@
test:
go test ./... -cover
testone:
go test -v ./... -cover

@ -0,0 +1,258 @@
package bus
import (
"fmt"
"runtime"
"time"
)
const (
queueBufferSize = 10240
busBufferSize = 102400
)
// Subscriber 订阅者
type Subscriber struct {
// 订阅的主题
subject string
// 队列
queue string
// 自动过期
expire time.Duration
// 消息Handler
handler Handler
// action register|unregister|event
action string
// event, only valid if action == event
event *Event
}
func (s *Subscriber) runWithRecovery(e *Event) {
defer func() {
if r := recover(); r != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
}
}()
if s.handler != nil {
s.handler(e)
}
}
// newSubscriber 创建订阅者
func newSubscriber(subject string, queue string, h Handler) *Subscriber {
return &Subscriber{
subject: subject,
queue: queue,
handler: h,
}
}
// subject 事件订阅者组
type subscriberGroup struct {
subject string
subscribers map[string]*Subscriber
}
// add 添加订阅者
func (g *subscriberGroup) Add(sub *Subscriber) {
g.subscribers[sub.queue] = sub
}
// add 删除订阅者
func (g *subscriberGroup) Del(sub *Subscriber) {
delete(g.subscribers, sub.queue)
}
// newSubscriberGroup 创建订阅组
func newSubscriberGroup(subject string) *subscriberGroup {
return &subscriberGroup{
subject: subject,
subscribers: make(map[string]*Subscriber, 0),
}
}
type eventBus struct {
subscribers map[string]*subscriberGroup
queues map[string]*queue
ch chan *Subscriber
}
var bus *eventBus
func init() {
bus = newBus()
bus.start()
}
func newBus() *eventBus {
b := &eventBus{
subscribers: make(map[string]*subscriberGroup, 0),
queues: make(map[string]*queue, 0),
ch: make(chan *Subscriber, busBufferSize),
}
return b
}
func (h *eventBus) mainLoop() {
for {
// xlog.Error("select")
select {
case subscriber, ok := <-h.ch:
if !ok {
return
}
switch subscriber.action {
case "register":
subgroup, ok := h.subscribers[subscriber.subject]
if !ok {
subgroup = newSubscriberGroup(subscriber.subject)
}
subgroup.Add(subscriber)
h.subscribers[subscriber.subject] = subgroup
if subscriber.queue != "" {
_queue, _ok := h.queues[subscriber.queue]
if _ok {
_queue.addRef()
} else {
h.queues[subscriber.queue] = newQueue(subscriber.queue, subscriber.expire)
}
}
case "unregister":
subgroup, ok := h.subscribers[subscriber.subject]
if ok {
subgroup.Del(subscriber)
if len(subgroup.subscribers) == 0 {
delete(h.subscribers, subgroup.subject)
} else {
h.subscribers[subgroup.subject] = subgroup
}
}
if subscriber.queue != "" {
_queue, ok := h.queues[subscriber.queue]
if ok {
if _queue.release() < 1 {
delete(h.queues, subscriber.queue)
}
}
}
case "event":
ev := subscriber.event
if subgroup, ok := h.subscribers[ev.Topic]; ok {
for _, sub := range subgroup.subscribers {
if sub.queue != "" {
bus.publishToQueue(sub, ev)
} else {
go sub.runWithRecovery(ev)
}
}
}
}
}
}
}
func (h *eventBus) publishToQueue(s *Subscriber, ev *Event) {
if q, ok := h.queues[s.queue]; ok {
newEv := &Event{
Flag: ev.Flag,
Topic: s.subject,
Message: ev.Message,
Params: ev.Params,
Queue: s.queue,
handler: s.handler,
}
select {
case q.inbound <- newEv:
default:
}
}
}
func (h *eventBus) start() {
go h.mainLoop()
}
// Event is given to a subscription handler for processing
type Event struct {
Flag string `json:"flag,omitempty"`
Topic string `json:"topic,omitempty"`
Message interface{} `json:"message,omitempty"`
Params interface{} `json:"params,omitempty"`
Queue string `json:"queue,omitempty"`
handler Handler `json:"-"`
}
func NewEvent(flag string, topic string, data interface{}, params interface{}) *Event {
e := &Event{
Flag: flag,
Message: data,
Topic: topic,
Params: params,
}
return e
}
// Publish 发布事件
func Publish(ev *Event) {
s := &Subscriber{
action: "event",
event: ev,
}
select {
case bus.ch <- s:
default:
fmt.Errorf("event inbound chan block drop event")
}
}
// Handler is used to process messages via a subscription of a topic.
// The handler is passed a publication interface which contains the
// message and optional Ack method to acknowledge receipt of the message.
type Handler func(*Event) error
// Subscribe 订阅事件
func Subscribe(topic string, queue string, h Handler) error {
if h != nil {
s := &Subscriber{
subject: topic,
queue: queue,
handler: h,
action: "register",
}
bus.ch <- s
return nil
}
return fmt.Errorf("handler must not be nil")
}
func SubscribeWithExpire(topic string, queue string, expire time.Duration, h Handler) error {
if h != nil {
s := &Subscriber{
subject: topic,
queue: queue,
expire: expire,
handler: h,
action: "register",
}
bus.ch <- s
return nil
}
return fmt.Errorf("handler must not be nil")
}
// Unsubscribe 取消订阅
func Unsubscribe(topic string, queue string) {
s := &Subscriber{
subject: topic,
queue: queue,
action: "unregister",
}
bus.ch <- s
}

@ -0,0 +1,118 @@
package bus
import (
"fmt"
"runtime"
"sync"
"time"
)
// queueEvent is given to a subscription handler for processing
type queueEvent struct {
handler Handler
ev *Event
}
type queue struct {
name string
members map[string]chan *Event
refCount int
inbound chan *Event
done chan bool
lock sync.Mutex
expire time.Duration
}
func newQueue(name string, expire time.Duration) *queue {
q := &queue{
inbound: make(chan *Event, queueBufferSize),
done: make(chan bool, 1),
name: name,
refCount: 1,
expire: expire,
}
q.start()
return q
}
func (q *queue) runWithRecovery(e *Event) {
defer func() {
if r := recover(); r != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
fmt.Errorf("cron: panic running job: %v\n%s", r, string(buf))
}
}()
if e.handler != nil {
e.handler(e)
}
}
func (q *queue) addRef() {
q.lock.Lock()
q.refCount = q.refCount + 1
q.lock.Unlock()
}
func (q *queue) release() int {
q.lock.Lock()
q.refCount = q.refCount - 1
if q.refCount < 1 {
q.done <- true
}
q.lock.Unlock()
return q.refCount
}
func (q *queue) start() {
go func() {
//@TODO, this may cause cli-ecc crash, we disable this now, need todo
// defer func() {
// close(q.inbound)
// close(q.done)
// }()
running := true
fmt.Printf("Queue %s started", q.name)
if q.expire > 0 {
var handler Handler
for running {
// xlog.Infof("Queue %s running", q.name)
select {
case e, ok := <-q.inbound:
if !ok {
fmt.Errorf("error read from inbound chan")
continue
}
fmt.Printf("%s delivered to handler", e.Topic)
// cache the last Handler
handler = e.handler
q.runWithRecovery(e)
case <-q.done:
running = false
case <-time.After(q.expire):
// sigh, we don't have a handler here ?
q.runWithRecovery(&Event{Flag: "TIMEOUT", handler: handler})
running = false
fmt.Printf("Queue %s timeout %d", q.name, q.expire)
}
}
} else {
for running {
// xlog.Infof("Queue %s running", q.name)
select {
case e, ok := <-q.inbound:
//xlog.Debugf("queue inbound %v\n", e.Queue)
if ok {
q.runWithRecovery(e)
}
case <-q.done:
running = false
}
}
}
fmt.Printf("Queue %s done", q.name)
}()
}

@ -0,0 +1,933 @@
package ctrl
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"git.xswitch.cn/xswitch/xctrl/core/ctrl/nats"
"git.xswitch.cn/xswitch/xctrl/stack/client"
"git.xswitch.cn/xswitch/xctrl/stack/errors"
"github.com/google/uuid"
"git.xswitch.cn/xswitch/xctrl/core/proto/xctrl"
)
// Channel call channel
type Channel struct {
xctrl.ChannelEvent
lock sync.RWMutex
CtrlUuid string
subs []nats.Subscriber
}
// only call at the first time
func NewChannel(channel_uuid string) *Channel {
if channel_uuid == "" {
channel_uuid = uuid.New().String()
}
channel := &Channel{}
channel.CtrlUuid = fmt.Sprintf("channel.%s", channel_uuid)
channel.Uuid = channel.CtrlUuid
return channel.Save()
}
// WithAddress 创建NODE地址
func WithAddressNot() client.CallOption {
return client.WithAddress("cn.xswitch.node")
}
// WithAddress 创建NODE地址
func NodeAddress(nodeUUID string) string {
if nodeUUID == "" {
return "cn.xswitch.node"
}
return fmt.Sprintf("cn.xswitch.node.%s", nodeUUID)
}
// WithAddress 创建NODE地址
func WithAddress(nodeUUID string) client.CallOption {
if nodeUUID == "" {
return client.WithAddress("cn.xswitch.node")
}
return client.WithAddress("cn.xswitch.node." + nodeUUID)
}
func (channel *Channel) Save() *Channel {
return WriteChannel(channel.GetUuid(), channel)
}
func (channel *Channel) FullCtrlUuid() string {
return fmt.Sprintf("cn.xswitch.ctrl.%s", channel.CtrlUuid)
}
// GetChannelEvent .
func (channel *Channel) GetChannelEvent() *xctrl.ChannelEvent {
return &channel.ChannelEvent
}
// NodeAddress 生成NODE地址
func (channel *Channel) NodeAddress() client.CallOption {
return client.WithAddress("cn.xswitch.node." + channel.GetNodeUuid())
}
// Answer 应答
func (channel *Channel) Answer() *xctrl.Response {
response, err := Service().Answer(context.TODO(), &xctrl.Request{
Uuid: channel.GetUuid(),
}, channel.NodeAddress())
if err != nil {
response = new(xctrl.Response)
e := errors.Parse(err.Error())
response.Code = e.Code
response.Message = e.Detail
}
return response
}
// Ready 判断通道是否正常状态
func (channel *Channel) Ready() bool {
if channel == nil {
return false
}
c, err := ReadChannel(channel.GetUuid())
if err != nil || c == nil {
return false
}
if channel.State == `DESTROY` {
return false
}
return true
// response := c.GetStates()
// return response.GetReady()
}
// GetVariable 获取通道变量
func (channel *Channel) GetVariable(key string) string {
if channel == nil {
return ""
}
channel.lock.RLock()
if channel.Params != nil {
if v, ok := channel.Params[key]; ok {
channel.lock.RUnlock()
return v
}
}
channel.lock.RUnlock()
return ""
}
// SetVariable 保存通道变量
func (channel *Channel) SetVariable(key, value string) error {
if channel == nil {
return fmt.Errorf("Unable to locate Channel")
}
channel.lock.Lock()
if channel.Params == nil {
channel.Params = make(map[string]string)
}
channel.Params[key] = value
channel.lock.Unlock()
response := channel.SetVar(&xctrl.SetVarRequest{
Uuid: channel.GetUuid(),
Data: map[string]string{
key: value,
},
})
if response.GetCode() != 200 {
return fmt.Errorf("[%d]%s", response.GetCode(), response.GetMessage())
}
return nil
}
// SetVariables 保存多个通道变量
func (channel *Channel) SetVariables(varKv map[string]string) error {
if channel == nil {
return fmt.Errorf("Unable to locate Channel")
}
channel.lock.Lock()
data := make(map[string]string)
if channel.Params == nil {
channel.Params = make(map[string]string)
}
for k, v := range varKv {
channel.Params[k] = v
data[k] = v
}
channel.lock.Unlock()
response := channel.SetVar(&xctrl.SetVarRequest{
Uuid: channel.GetUuid(),
Data: data,
})
if response.GetCode() != 200 {
return fmt.Errorf("[%d]%s", response.GetCode(), response.GetMessage())
}
return nil
}
// Play 播放一个文件,默认超时时间1小时
func (channel *Channel) Play(req *xctrl.PlayRequest) *xctrl.Response {
return channel.PlayWithTimeout(req, 1*time.Hour)
}
// PlayWithTimeout 播放一个文件,可传入超时时间
func (channel *Channel) PlayWithTimeout(req *xctrl.PlayRequest, timeout time.Duration) *xctrl.Response {
if channel == nil {
return &xctrl.Response{
Code: http.StatusInternalServerError,
Message: "Unable to locate Channel",
}
}
if req.GetUuid() == "" {
req.Uuid = channel.GetUuid()
}
response, err := Service().Play(context.Background(), req, client.WithRequestTimeout(timeout), channel.NodeAddress())
if err != nil {
responseErr := errors.Parse(err.Error())
if response == nil {
response = &xctrl.Response{
Code: responseErr.Code,
Message: responseErr.Detail,
}
} else {
response.Code = responseErr.Code
response.Message = responseErr.Detail
}
}
if response.Code < 200 || response.Code > 300 {
if response.Code < 500 {
fmt.Printf("%s Play %s error: %d %s", channel.GetUuid(), req.Media.Data, response.Code, response.GetMessage())
} else {
fmt.Errorf("%s Play %s error: %d %s", channel.GetUuid(), req.Media.Data, response.Code, response.GetMessage())
}
}
return response
}
// Broadcast 播放多个文件
func (channel *Channel) Broadcast(req *xctrl.BroadcastRequest) *xctrl.Response {
if channel == nil {
return &xctrl.Response{