初步完成对gfsnotify的改进,增加对特定回调的取消注册功能

This commit is contained in:
john 2018-11-05 16:26:08 +08:00
parent d257e554dd
commit e2111ceb3e
9 changed files with 226 additions and 59 deletions

View File

@ -8,16 +8,14 @@
package gfcache
import (
"gitee.com/johng/gf/g/os/gcache"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/os/gfsnotify"
"gitee.com/johng/gf/g/os/gcache"
)
type Cache struct {
cap *gtype.Int // 缓存容量(byte)设置为0表示不限制
size *gtype.Int // 缓存大小(Byte)
cache *gcache.Cache // 缓存对象
notify *gfsnotify.Watcher // 文件监控管理对象
}
const (
@ -35,12 +33,10 @@ func New(cap ... int) *Cache {
if len(cap) > 0 {
c = cap[0]
}
notify, _ := gfsnotify.New()
return &Cache {
cap : gtype.NewInt(c),
size : gtype.NewInt(),
cache : gcache.New(),
notify : notify,
}
}

View File

@ -53,7 +53,7 @@ func (c *Cache) addMonitor(path string) {
if c.cache.Get(path) != nil {
return
}
c.notify.Add(path, func(event *gfsnotify.Event) {
gfsnotify.Add(path, func(event *gfsnotify.Event) {
//glog.Debug("gfcache:", event)
r := c.cache.Get(path).([]byte)
// 是否删除

View File

@ -9,6 +9,10 @@
package gfsnotify
import (
"container/list"
"errors"
"fmt"
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/container/gqueue"
"gitee.com/johng/gf/g/container/gtype"
@ -19,19 +23,29 @@ import (
// 监听管理对象
type Watcher struct {
watcher *fsnotify.Watcher // 底层fsnotify对象
events *gqueue.Queue // 过滤后的事件通知,不会出现重复事件
closeChan chan struct{} // 关闭事件
callbacks *gmap.StringInterfaceMap // 监听的回调函数
cache *gcache.Cache // 缓存对象,用于事件重复过滤
watcher *fsnotify.Watcher // 底层fsnotify对象
events *gqueue.Queue // 过滤后的事件通知,不会出现重复事件
closeChan chan struct{} // 关闭事件
callbacks *gmap.StringInterfaceMap // 监听的回调函数
cache *gcache.Cache // 缓存对象,用于事件重复过滤
}
// 注册的监听回调方法
type Callback struct {
Id int // 唯一ID
Func func(event *Event) // 回调方法
Path string // 监听的文件/目录
elem *list.Element // 指向监听链表中的元素项位置
parent *Callback // 父级callback有这个属性表示该callback为被自动管理的callback
subs *glist.List // 子级回调对象指针列表
}
// 监听事件对象
type Event struct {
event fsnotify.Event // 底层事件对象
Path string // 文件绝对路径
Op Op // 触发监听的文件操作
Watcher *Watcher // 事件对应的监听对象
event fsnotify.Event // 底层事件对象
Path string // 文件绝对路径
Op Op // 触发监听的文件操作
Watcher *Watcher // 事件对应的监听对象
}
// 按位进行识别的操作集合
@ -53,9 +67,11 @@ const (
var (
// 全局监听对象,方便应用端调用
watchers = make([]*Watcher, DEFAULT_WATCHER_COUNT)
watchers = make([]*Watcher, DEFAULT_WATCHER_COUNT)
// 默认的watchers是否初始化使用时才创建
watcherInited = gtype.NewBool()
watcherInited = gtype.NewBool()
// 回调方法ID与对象指针的映射哈希表用于根据ID快速查找回调对象
callbackIdMap = gmap.NewIntInterfaceMap()
)
// 初始化创建8个watcher对象用于包默认管理监听
@ -75,11 +91,11 @@ func initWatcher() {
func New() (*Watcher, error) {
if watch, err := fsnotify.NewWatcher(); err == nil {
w := &Watcher {
cache : gcache.New(),
watcher : watch,
events : gqueue.New(),
closeChan : make(chan struct{}),
callbacks : gmap.NewStringInterfaceMap(),
cache : gcache.New(),
watcher : watch,
events : gqueue.New(),
closeChan : make(chan struct{}),
callbacks : gmap.NewStringInterfaceMap(),
}
w.startWatchLoop()
w.startEventLoop()
@ -90,15 +106,27 @@ func New() (*Watcher, error) {
}
// 添加对指定文件/目录的监听,并给定回调函数;如果给定的是一个目录,默认递归监控。
func Add(path string, callback func(event *Event), recursive...bool) error {
return getWatcherByPath(path).Add(path, callback, recursive...)
func Add(path string, callbackFunc func(event *Event), recursive...bool) (callback *Callback, err error) {
return getWatcherByPath(path).Add(path, callbackFunc, recursive...)
}
// 移除监听,默认递归删除。
// 递归移除对指定文件/目录的所有监听回调
func Remove(path string) error {
return getWatcherByPath(path).Remove(path)
}
// 根据指定的回调函数ID移出指定的inotify回调函数
func RemoveCallback(callbackId int) error {
callback := (*Callback)(nil)
if r := callbackIdMap.Get(callbackId); r != nil {
callback = r.(*Callback)
}
if callback == nil {
return errors.New(fmt.Sprintf(`callback for id %d not found`, callbackId))
}
return getWatcherByPath(callback.Path).RemoveCallback(callbackId)
}
// 根据path计算对应的watcher对象
func getWatcherByPath(path string) *Watcher {
initWatcher()

View File

@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/os/gtime"
)
// 关闭监听管理对象
@ -20,13 +21,41 @@ func (w *Watcher) Close() {
}
// 添加对指定文件/目录的监听,并给定回调函数
func (w *Watcher) addWatch(path string, callback func(event *Event)) error {
func (w *Watcher) addWatch(path string, calbackFunc func(event *Event), parentCallback *Callback) (callback *Callback, err error) {
// 这里统一转换为当前系统的绝对路径,便于统一监控文件名称
t := fileRealPath(path)
if t == "" {
return errors.New(fmt.Sprintf(`"%s" does not exist`, path))
return nil, errors.New(fmt.Sprintf(`"%s" does not exist`, path))
}
path = t
// 添加成功后会注册该callback id到全局的哈希表并绑定到父级的注册回调中
defer func() {
if err == nil {
if parentCallback == nil {
// 只有主callback才记录到id map中因为子callback是自动管理的
callbackIdMap.Set(callback.Id, callback)
}
if parentCallback != nil {
// 需要递归查找到顶级的callback
parent := parentCallback
for {
if p := parent.parent; p != nil {
parent = p
} else {
break
}
}
parent.subs.PushFront(callback)
}
}
}()
callback = &Callback {
Id : int(gtime.Nanosecond()),
Func : calbackFunc,
Path : path,
subs : glist.New(),
parent : parentCallback,
}
// 注册回调函数
w.callbacks.LockFunc(func(m map[string]interface{}) {
var result interface{}
@ -36,52 +65,97 @@ func (w *Watcher) addWatch(path string, callback func(event *Event)) error {
} else {
result = v
}
result.(*glist.List).PushBack(callback)
callback.elem = result.(*glist.List).PushBack(callback)
})
// 添加底层监听
w.watcher.Add(path)
return nil
return
}
// 添加监控path参数支持文件或者目录路径recursive为非必需参数默认为递归添加监控(当path为目录时)
func (w *Watcher) Add(path string, callback func(event *Event), recursive...bool) error {
// 添加监控path参数支持文件或者目录路径recursive为非必需参数默认为递归添加监控(当path为目录时)。
// 如果添加目录这里只会返回目录的callback按照callback删除时会递归删除。
func (w *Watcher) addWithCallback(parentCallback *Callback, path string, callbackFunc func(event *Event), recursive...bool) (callback *Callback, err error) {
// 首先添加这个目录
if callback, err = w.addWatch(path, callbackFunc, parentCallback); err != nil {
return nil, err
}
// 其次递归添加其下的文件/目录
if fileIsDir(path) && (len(recursive) == 0 || recursive[0]) {
paths, _ := fileScanDir(path, "*", true)
list := []string{path}
list = append(list, paths...)
for _, v := range list {
if err := w.addWatch(v, callback); err != nil {
for _, v := range paths {
w.addWatch(v, callbackFunc, callback)
}
}
return
}
// 添加监控path参数支持文件或者目录路径recursive为非必需参数默认为递归添加监控(当path为目录时)。
// 如果添加目录这里只会返回目录的callback按照callback删除时会递归删除。
func (w *Watcher) Add(path string, callbackFunc func(event *Event), recursive...bool) (callback *Callback, err error) {
return w.addWithCallback(nil, path, callbackFunc, recursive...)
}
// 递归移除对指定文件/目录的所有监听回调
func (w *Watcher) Remove(path string) error {
if fileIsDir(path) {
paths, _ := fileScanDir(path, "*", true)
paths = append(paths, path)
for _, v := range paths {
if err := w.removeAll(v); err != nil {
return err
}
}
return nil
} else {
return w.addWatch(path, callback)
return w.removeAll(path)
}
}
// 移除监听
func (w *Watcher) removeWatch(path string) error {
// 移除对指定文件/目录的所有监听
func (w *Watcher) removeAll(path string) error {
w.callbacks.Remove(path)
return w.watcher.Remove(path)
}
// 递归移除监听
func (w *Watcher) Remove(path string) error {
if fileIsDir(path) {
paths, _ := fileScanDir(path, "*", true)
list := []string{path}
list = append(list, paths...)
for _, v := range list {
if err := w.removeWatch(v); err != nil {
return err
// 根据指定的回调函数ID移出指定的inotify回调函数
func (w *Watcher) RemoveCallback(callbackId int) error {
callback := (*Callback)(nil)
if r := callbackIdMap.Get(callbackId); r != nil {
callback = r.(*Callback)
}
if callback == nil {
return errors.New(fmt.Sprintf(`callback for id %d not found`, callbackId))
}
// 首先删除主callback
if err := w.removeCallback(callback); err != nil {
return err
}
// 如果存在子级callback那么也一并删除
if callback.subs.Len() > 0 {
for {
if r := callback.subs.PopBack(); r != nil {
w.removeCallback(r.(*Callback))
} else {
break
}
}
return nil
} else {
return w.removeWatch(path)
}
return nil
}
// 移除对指定文件/目录的所有监听
func (w *Watcher) removeCallback(callback *Callback) error {
if r := w.callbacks.Get(callback.Path); r != nil {
list := r.(*glist.List)
list.Remove(callback.elem)
if list.Len() == 0 {
return w.watcher.Remove(callback.Path)
}
} else {
return errors.New(fmt.Sprintf(`callbacks not found for "%s"`, callback.Path))
}
return nil
}
// 监听循环
@ -146,14 +220,16 @@ func (w *Watcher) startEventLoop() {
callbacks := w.getCallbacks(event.Path)
// 如果创建了新的目录,那么将这个目录递归添加到监控中
if event.IsCreate() && fileIsDir(event.Path) {
for _, callback := range callbacks.FrontAll() {
w.Add(event.Path, callback.(func(event *Event)))
for _, v := range callbacks.FrontAll() {
callback := v.(*Callback)
w.addWithCallback(callback, event.Path, callback.Func)
}
}
// 执行回调处理,异步处理
if callbacks != nil {
go func(callbacks *glist.List) {
for _, callback := range callbacks.FrontAll() {
callback.(func(event *Event))(event)
for _, v := range callbacks.FrontAll() {
go v.(*Callback).Func(event)
}
}(callbacks)
}

View File

@ -7,8 +7,10 @@
// 随机数管理
package grand
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
var digits = []rune("0123456789")
var (
letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
digits = []rune("0123456789")
)
// 获得一个 min, max 之间的随机数(min <= x <= max)
func Rand (min, max int) int {

View File

@ -6,7 +6,7 @@ import (
)
func main() {
err := gfsnotify.Add("/home/john/temp", func(event *gfsnotify.Event) {
_, err := gfsnotify.Add("/home/john/temp", func(event *gfsnotify.Event) {
if event.IsCreate() {
glog.Println("创建文件 : ", event.Path)
}

View File

@ -0,0 +1,36 @@
package main
import (
"gitee.com/johng/gf/g/os/gfsnotify"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gtime"
"time"
)
func main() {
c1, err := gfsnotify.Add("/home/john/temp/log", func(event *gfsnotify.Event) {
glog.Println("callback1")
})
if err != nil {
panic(err)
}
c2, err := gfsnotify.Add("/home/john/temp/log", func(event *gfsnotify.Event) {
glog.Println("callback2")
})
if err != nil {
panic(err)
}
// 3秒后移除c1的回调函数注册仅剩c2
gtime.SetTimeout(3*time.Second, func() {
gfsnotify.RemoveCallback(c1.Id)
glog.Println("remove callback c1")
})
// 10秒后移除c2的回调函数注册所有的回调都移除不再有任何打印信息输出
gtime.SetTimeout(10*time.Second, func() {
gfsnotify.RemoveCallback(c2.Id)
glog.Println("remove callback c2")
})
select {}
}

View File

@ -0,0 +1,28 @@
package main
import (
"gitee.com/johng/gf/g/os/gfsnotify"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gtime"
"time"
)
func main() {
callback, err := gfsnotify.Add("/home/john/temp", func(event *gfsnotify.Event) {
glog.Println("callback")
})
if err != nil {
panic(err)
}
// 在此期间创建文件、目录、修改文件、删除文件
// 20秒后移除回调函数注册所有的回调都移除不再有任何打印信息输出
gtime.SetTimeout(20*time.Second, func() {
gfsnotify.RemoveCallback(callback.Id)
glog.Println("remove callback")
})
select {}
}

View File

@ -3,11 +3,12 @@ package main
import (
"fmt"
"gitee.com/johng/gf/g/os/gtime"
"strconv"
"math"
)
func main() {
fmt.Println(strconv.FormatInt(gtime.Nanosecond(), 32))
fmt.Println(gtime.NewFromStr("2018-10-24 00:00:00").Nanosecond())
fmt.Println(math.MaxInt64)
fmt.Println(gtime.Second())
fmt.Println(gtime.Nanosecond())
}