From e2111ceb3e1b9c6961521582289e06ab5b626de5 Mon Sep 17 00:00:00 2001 From: john Date: Mon, 5 Nov 2018 16:26:08 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=8C=E6=88=90=E5=AF=B9gf?= =?UTF-8?q?snotify=E7=9A=84=E6=94=B9=E8=BF=9B=EF=BC=8C=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E5=AF=B9=E7=89=B9=E5=AE=9A=E5=9B=9E=E8=B0=83=E7=9A=84=E5=8F=96?= =?UTF-8?q?=E6=B6=88=E6=B3=A8=E5=86=8C=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/os/gfcache/gfcache.go | 6 +- g/os/gfcache/gfcache_cache.go | 2 +- g/os/gfsnotify/gfsnotify.go | 66 ++++++--- g/os/gfsnotify/gfsnotify_watcher.go | 134 ++++++++++++++---- g/util/grand/grand.go | 6 +- geg/os/gfsnotify/gfsnotify.go | 2 +- geg/os/gfsnotify/gfsnotify_callback.go | 36 +++++ geg/os/gfsnotify/gfsnotify_callback_folder.go | 28 ++++ geg/other/test.go | 5 +- 9 files changed, 226 insertions(+), 59 deletions(-) create mode 100644 geg/os/gfsnotify/gfsnotify_callback.go create mode 100644 geg/os/gfsnotify/gfsnotify_callback_folder.go diff --git a/g/os/gfcache/gfcache.go b/g/os/gfcache/gfcache.go index 2e4f4eae7..26831d481 100644 --- a/g/os/gfcache/gfcache.go +++ b/g/os/gfcache/gfcache.go @@ -8,16 +8,14 @@ package gfcache import ( - "gitee.com/johng/gf/g/os/gcache" "gitee.com/johng/gf/g/container/gtype" - "gitee.com/johng/gf/g/os/gfsnotify" + "gitee.com/johng/gf/g/os/gcache" ) type Cache struct { cap *gtype.Int // 缓存容量(byte),设置为0表示不限制 size *gtype.Int // 缓存大小(Byte) cache *gcache.Cache // 缓存对象 - notify *gfsnotify.Watcher // 文件监控管理对象 } const ( @@ -35,12 +33,10 @@ func New(cap ... int) *Cache { if len(cap) > 0 { c = cap[0] } - notify, _ := gfsnotify.New() return &Cache { cap : gtype.NewInt(c), size : gtype.NewInt(), cache : gcache.New(), - notify : notify, } } diff --git a/g/os/gfcache/gfcache_cache.go b/g/os/gfcache/gfcache_cache.go index c59fcc307..91981fa01 100644 --- a/g/os/gfcache/gfcache_cache.go +++ b/g/os/gfcache/gfcache_cache.go @@ -53,7 +53,7 @@ func (c *Cache) addMonitor(path string) { if c.cache.Get(path) != nil { return } - c.notify.Add(path, func(event *gfsnotify.Event) { + gfsnotify.Add(path, func(event *gfsnotify.Event) { //glog.Debug("gfcache:", event) r := c.cache.Get(path).([]byte) // 是否删除 diff --git a/g/os/gfsnotify/gfsnotify.go b/g/os/gfsnotify/gfsnotify.go index e7f644559..654965b41 100644 --- a/g/os/gfsnotify/gfsnotify.go +++ b/g/os/gfsnotify/gfsnotify.go @@ -9,6 +9,10 @@ package gfsnotify import ( + "container/list" + "errors" + "fmt" + "gitee.com/johng/gf/g/container/glist" "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/container/gqueue" "gitee.com/johng/gf/g/container/gtype" @@ -19,19 +23,29 @@ import ( // 监听管理对象 type Watcher struct { - watcher *fsnotify.Watcher // 底层fsnotify对象 - events *gqueue.Queue // 过滤后的事件通知,不会出现重复事件 - closeChan chan struct{} // 关闭事件 - callbacks *gmap.StringInterfaceMap // 监听的回调函数 - cache *gcache.Cache // 缓存对象,用于事件重复过滤 + watcher *fsnotify.Watcher // 底层fsnotify对象 + events *gqueue.Queue // 过滤后的事件通知,不会出现重复事件 + closeChan chan struct{} // 关闭事件 + callbacks *gmap.StringInterfaceMap // 监听的回调函数 + cache *gcache.Cache // 缓存对象,用于事件重复过滤 +} + +// 注册的监听回调方法 +type Callback struct { + Id int // 唯一ID + Func func(event *Event) // 回调方法 + Path string // 监听的文件/目录 + elem *list.Element // 指向监听链表中的元素项位置 + parent *Callback // 父级callback,有这个属性表示该callback为被自动管理的callback + subs *glist.List // 子级回调对象指针列表 } // 监听事件对象 type Event struct { - event fsnotify.Event // 底层事件对象 - Path string // 文件绝对路径 - Op Op // 触发监听的文件操作 - Watcher *Watcher // 事件对应的监听对象 + event fsnotify.Event // 底层事件对象 + Path string // 文件绝对路径 + Op Op // 触发监听的文件操作 + Watcher *Watcher // 事件对应的监听对象 } // 按位进行识别的操作集合 @@ -53,9 +67,11 @@ const ( var ( // 全局监听对象,方便应用端调用 - watchers = make([]*Watcher, DEFAULT_WATCHER_COUNT) + watchers = make([]*Watcher, DEFAULT_WATCHER_COUNT) // 默认的watchers是否初始化,使用时才创建 - watcherInited = gtype.NewBool() + watcherInited = gtype.NewBool() + // 回调方法ID与对象指针的映射哈希表,用于根据ID快速查找回调对象 + callbackIdMap = gmap.NewIntInterfaceMap() ) // 初始化创建8个watcher对象,用于包默认管理监听 @@ -75,11 +91,11 @@ func initWatcher() { func New() (*Watcher, error) { if watch, err := fsnotify.NewWatcher(); err == nil { w := &Watcher { - cache : gcache.New(), - watcher : watch, - events : gqueue.New(), - closeChan : make(chan struct{}), - callbacks : gmap.NewStringInterfaceMap(), + cache : gcache.New(), + watcher : watch, + events : gqueue.New(), + closeChan : make(chan struct{}), + callbacks : gmap.NewStringInterfaceMap(), } w.startWatchLoop() w.startEventLoop() @@ -90,15 +106,27 @@ func New() (*Watcher, error) { } // 添加对指定文件/目录的监听,并给定回调函数;如果给定的是一个目录,默认递归监控。 -func Add(path string, callback func(event *Event), recursive...bool) error { - return getWatcherByPath(path).Add(path, callback, recursive...) +func Add(path string, callbackFunc func(event *Event), recursive...bool) (callback *Callback, err error) { + return getWatcherByPath(path).Add(path, callbackFunc, recursive...) } -// 移除监听,默认递归删除。 +// 递归移除对指定文件/目录的所有监听回调 func Remove(path string) error { return getWatcherByPath(path).Remove(path) } +// 根据指定的回调函数ID,移出指定的inotify回调函数 +func RemoveCallback(callbackId int) error { + callback := (*Callback)(nil) + if r := callbackIdMap.Get(callbackId); r != nil { + callback = r.(*Callback) + } + if callback == nil { + return errors.New(fmt.Sprintf(`callback for id %d not found`, callbackId)) + } + return getWatcherByPath(callback.Path).RemoveCallback(callbackId) +} + // 根据path计算对应的watcher对象 func getWatcherByPath(path string) *Watcher { initWatcher() diff --git a/g/os/gfsnotify/gfsnotify_watcher.go b/g/os/gfsnotify/gfsnotify_watcher.go index 9cb15f447..5328127e5 100644 --- a/g/os/gfsnotify/gfsnotify_watcher.go +++ b/g/os/gfsnotify/gfsnotify_watcher.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "gitee.com/johng/gf/g/container/glist" + "gitee.com/johng/gf/g/os/gtime" ) // 关闭监听管理对象 @@ -20,13 +21,41 @@ func (w *Watcher) Close() { } // 添加对指定文件/目录的监听,并给定回调函数 -func (w *Watcher) addWatch(path string, callback func(event *Event)) error { +func (w *Watcher) addWatch(path string, calbackFunc func(event *Event), parentCallback *Callback) (callback *Callback, err error) { // 这里统一转换为当前系统的绝对路径,便于统一监控文件名称 t := fileRealPath(path) if t == "" { - return errors.New(fmt.Sprintf(`"%s" does not exist`, path)) + return nil, errors.New(fmt.Sprintf(`"%s" does not exist`, path)) } path = t + // 添加成功后会注册该callback id到全局的哈希表,并绑定到父级的注册回调中 + defer func() { + if err == nil { + if parentCallback == nil { + // 只有主callback才记录到id map中,因为子callback是自动管理的 + callbackIdMap.Set(callback.Id, callback) + } + if parentCallback != nil { + // 需要递归查找到顶级的callback + parent := parentCallback + for { + if p := parent.parent; p != nil { + parent = p + } else { + break + } + } + parent.subs.PushFront(callback) + } + } + }() + callback = &Callback { + Id : int(gtime.Nanosecond()), + Func : calbackFunc, + Path : path, + subs : glist.New(), + parent : parentCallback, + } // 注册回调函数 w.callbacks.LockFunc(func(m map[string]interface{}) { var result interface{} @@ -36,52 +65,97 @@ func (w *Watcher) addWatch(path string, callback func(event *Event)) error { } else { result = v } - result.(*glist.List).PushBack(callback) + callback.elem = result.(*glist.List).PushBack(callback) }) // 添加底层监听 w.watcher.Add(path) - return nil + return } -// 添加监控,path参数支持文件或者目录路径,recursive为非必需参数,默认为递归添加监控(当path为目录时) -func (w *Watcher) Add(path string, callback func(event *Event), recursive...bool) error { +// 添加监控,path参数支持文件或者目录路径,recursive为非必需参数,默认为递归添加监控(当path为目录时)。 +// 如果添加目录,这里只会返回目录的callback,按照callback删除时会递归删除。 +func (w *Watcher) addWithCallback(parentCallback *Callback, path string, callbackFunc func(event *Event), recursive...bool) (callback *Callback, err error) { + // 首先添加这个目录 + if callback, err = w.addWatch(path, callbackFunc, parentCallback); err != nil { + return nil, err + } + // 其次递归添加其下的文件/目录 if fileIsDir(path) && (len(recursive) == 0 || recursive[0]) { paths, _ := fileScanDir(path, "*", true) - list := []string{path} - list = append(list, paths...) - for _, v := range list { - if err := w.addWatch(v, callback); err != nil { + for _, v := range paths { + w.addWatch(v, callbackFunc, callback) + } + } + return +} + +// 添加监控,path参数支持文件或者目录路径,recursive为非必需参数,默认为递归添加监控(当path为目录时)。 +// 如果添加目录,这里只会返回目录的callback,按照callback删除时会递归删除。 +func (w *Watcher) Add(path string, callbackFunc func(event *Event), recursive...bool) (callback *Callback, err error) { + return w.addWithCallback(nil, path, callbackFunc, recursive...) +} + +// 递归移除对指定文件/目录的所有监听回调 +func (w *Watcher) Remove(path string) error { + if fileIsDir(path) { + paths, _ := fileScanDir(path, "*", true) + paths = append(paths, path) + for _, v := range paths { + if err := w.removeAll(v); err != nil { return err } } return nil } else { - return w.addWatch(path, callback) + return w.removeAll(path) } } - -// 移除监听 -func (w *Watcher) removeWatch(path string) error { +// 移除对指定文件/目录的所有监听 +func (w *Watcher) removeAll(path string) error { w.callbacks.Remove(path) return w.watcher.Remove(path) } -// 递归移除监听 -func (w *Watcher) Remove(path string) error { - if fileIsDir(path) { - paths, _ := fileScanDir(path, "*", true) - list := []string{path} - list = append(list, paths...) - for _, v := range list { - if err := w.removeWatch(v); err != nil { - return err +// 根据指定的回调函数ID,移出指定的inotify回调函数 +func (w *Watcher) RemoveCallback(callbackId int) error { + callback := (*Callback)(nil) + if r := callbackIdMap.Get(callbackId); r != nil { + callback = r.(*Callback) + } + if callback == nil { + return errors.New(fmt.Sprintf(`callback for id %d not found`, callbackId)) + } + // 首先删除主callback + if err := w.removeCallback(callback); err != nil { + return err + } + // 如果存在子级callback,那么也一并删除 + if callback.subs.Len() > 0 { + for { + if r := callback.subs.PopBack(); r != nil { + w.removeCallback(r.(*Callback)) + } else { + break } } return nil - } else { - return w.removeWatch(path) } + return nil +} + +// 移除对指定文件/目录的所有监听 +func (w *Watcher) removeCallback(callback *Callback) error { + if r := w.callbacks.Get(callback.Path); r != nil { + list := r.(*glist.List) + list.Remove(callback.elem) + if list.Len() == 0 { + return w.watcher.Remove(callback.Path) + } + } else { + return errors.New(fmt.Sprintf(`callbacks not found for "%s"`, callback.Path)) + } + return nil } // 监听循环 @@ -146,14 +220,16 @@ func (w *Watcher) startEventLoop() { callbacks := w.getCallbacks(event.Path) // 如果创建了新的目录,那么将这个目录递归添加到监控中 if event.IsCreate() && fileIsDir(event.Path) { - for _, callback := range callbacks.FrontAll() { - w.Add(event.Path, callback.(func(event *Event))) + for _, v := range callbacks.FrontAll() { + callback := v.(*Callback) + w.addWithCallback(callback, event.Path, callback.Func) } } + // 执行回调处理,异步处理 if callbacks != nil { go func(callbacks *glist.List) { - for _, callback := range callbacks.FrontAll() { - callback.(func(event *Event))(event) + for _, v := range callbacks.FrontAll() { + go v.(*Callback).Func(event) } }(callbacks) } diff --git a/g/util/grand/grand.go b/g/util/grand/grand.go index a9d581fdf..ff1c66972 100644 --- a/g/util/grand/grand.go +++ b/g/util/grand/grand.go @@ -7,8 +7,10 @@ // 随机数管理 package grand -var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") -var digits = []rune("0123456789") +var ( + letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") + digits = []rune("0123456789") +) // 获得一个 min, max 之间的随机数(min <= x <= max) func Rand (min, max int) int { diff --git a/geg/os/gfsnotify/gfsnotify.go b/geg/os/gfsnotify/gfsnotify.go index 4683b9d70..12cba7c25 100644 --- a/geg/os/gfsnotify/gfsnotify.go +++ b/geg/os/gfsnotify/gfsnotify.go @@ -6,7 +6,7 @@ import ( ) func main() { - err := gfsnotify.Add("/home/john/temp", func(event *gfsnotify.Event) { + _, err := gfsnotify.Add("/home/john/temp", func(event *gfsnotify.Event) { if event.IsCreate() { glog.Println("创建文件 : ", event.Path) } diff --git a/geg/os/gfsnotify/gfsnotify_callback.go b/geg/os/gfsnotify/gfsnotify_callback.go new file mode 100644 index 000000000..76c02177f --- /dev/null +++ b/geg/os/gfsnotify/gfsnotify_callback.go @@ -0,0 +1,36 @@ +package main + +import ( + "gitee.com/johng/gf/g/os/gfsnotify" + "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gtime" + "time" +) + +func main() { + c1, err := gfsnotify.Add("/home/john/temp/log", func(event *gfsnotify.Event) { + glog.Println("callback1") + }) + if err != nil { + panic(err) + } + c2, err := gfsnotify.Add("/home/john/temp/log", func(event *gfsnotify.Event) { + glog.Println("callback2") + }) + if err != nil { + panic(err) + } + // 3秒后移除c1的回调函数注册,仅剩c2 + gtime.SetTimeout(3*time.Second, func() { + gfsnotify.RemoveCallback(c1.Id) + glog.Println("remove callback c1") + }) + // 10秒后移除c2的回调函数注册,所有的回调都移除,不再有任何打印信息输出 + gtime.SetTimeout(10*time.Second, func() { + gfsnotify.RemoveCallback(c2.Id) + glog.Println("remove callback c2") + }) + + select {} + +} \ No newline at end of file diff --git a/geg/os/gfsnotify/gfsnotify_callback_folder.go b/geg/os/gfsnotify/gfsnotify_callback_folder.go new file mode 100644 index 000000000..3d4148c04 --- /dev/null +++ b/geg/os/gfsnotify/gfsnotify_callback_folder.go @@ -0,0 +1,28 @@ +package main + +import ( + "gitee.com/johng/gf/g/os/gfsnotify" + "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gtime" + "time" +) + +func main() { + callback, err := gfsnotify.Add("/home/john/temp", func(event *gfsnotify.Event) { + glog.Println("callback") + }) + if err != nil { + panic(err) + } + + // 在此期间创建文件、目录、修改文件、删除文件 + + // 20秒后移除回调函数注册,所有的回调都移除,不再有任何打印信息输出 + gtime.SetTimeout(20*time.Second, func() { + gfsnotify.RemoveCallback(callback.Id) + glog.Println("remove callback") + }) + + select {} + +} \ No newline at end of file diff --git a/geg/other/test.go b/geg/other/test.go index aa0d5b0f5..d5078187d 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -3,11 +3,12 @@ package main import ( "fmt" "gitee.com/johng/gf/g/os/gtime" - "strconv" + "math" ) func main() { - fmt.Println(strconv.FormatInt(gtime.Nanosecond(), 32)) + fmt.Println(gtime.NewFromStr("2018-10-24 00:00:00").Nanosecond()) + fmt.Println(math.MaxInt64) fmt.Println(gtime.Second()) fmt.Println(gtime.Nanosecond()) } \ No newline at end of file