This commit is contained in:
John 2018-05-18 08:39:40 +08:00
commit 6b256fdc41
57 changed files with 2203 additions and 132 deletions

5
TODO
View File

@ -8,8 +8,9 @@ ON THE WAY:
7. 增加热编译工具,提高开发环境的开发/测试效率媲美PHP开发效率
8. 增加可选择性的orm tag特性用以数据表记录与struct对象转换的键名属性映射
9. orm增加更多数据库支持
10. ghttp.Response增加输出内容后自动退出当前请求机制不需要用户手动return参考beego如何实现
11. 当二进制参数为nil时gjson.LoadContent并将gjson.Json对象ToMap时会报错
12. 改进控制器及执行对象注册,更友好地支持动态路由注册,例如:注册规则为 /channel/:name现有的控制器及执行对象注册很难友好支持这种动态形式
DONE:

7
g/g.go
View File

@ -32,6 +32,12 @@ type Map map[string]interface{}
// 常用list数据结构
type List []Map
// 阻塞等待HTTPServer执行完成(同一进程多HTTPServer情况下)
func Wait() {
ghttp.Wait()
}
// HTTPServer单例对象
func Server(name...interface{}) *ghttp.Server {
return ghttp.GetServer(name...)
@ -58,7 +64,6 @@ func Config() *gcfg.Config {
return gins.Config()
}
// 数据库操作对象,使用了连接池
func Database(name...string) *gdb.Db {
config := gins.Config()

View File

@ -7,12 +7,15 @@
package ghttp
import (
"os"
"sync"
"errors"
"strings"
"reflect"
"runtime"
"net/http"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/os/gcache"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/container/gmap"
@ -21,46 +24,55 @@ import (
)
const (
gHTTP_METHODS = "GET,PUT,POST,DELETE,PATCH,HEAD,CONNECT,OPTIONS,TRACE"
gDEFAULT_SERVER = "default"
gDEFAULT_DOMAIN = "default"
gDEFAULT_METHOD = "ALL"
gDEFAULT_COOKIE_PATH = "/" // 默认path
gDEFAULT_COOKIE_MAX_AGE = 86400*365 // 默认cookie有效期(一年)
gDEFAULT_SESSION_MAX_AGE = 600 // 默认session有效期(600秒)
gDEFAULT_SESSION_ID_NAME = "gfsessionid" // 默认存放Cookie中的SessionId名称
gHTTP_METHODS = "GET,PUT,POST,DELETE,PATCH,HEAD,CONNECT,OPTIONS,TRACE"
gDEFAULT_SERVER = "default"
gDEFAULT_DOMAIN = "default"
gDEFAULT_METHOD = "ALL"
gDEFAULT_COOKIE_PATH = "/" // 默认path
gDEFAULT_COOKIE_MAX_AGE = 86400*365 // 默认cookie有效期(一年)
gDEFAULT_SESSION_MAX_AGE = 600 // 默认session有效期(600秒)
gDEFAULT_SESSION_ID_NAME = "gfsessionid" // 默认存放Cookie中的SessionId名称
)
// http server结构体
// ghttp.Server结构体
type Server struct {
hmmu sync.RWMutex // handler互斥锁
hhmu sync.RWMutex // hooks互斥锁
hsmu sync.RWMutex // status handler互斥锁
hmcmu sync.RWMutex // handlerCache互斥锁
hhcmu sync.RWMutex // hooksCache互斥锁
// 基本属性变量
name string // 服务名称,方便识别
config ServerConfig // 配置对象
status int8 // 当前服务器状态(0未启动1运行中)
servers []*gracefulServer // 底层http.Server列表
methodsMap map[string]bool // 所有支持的HTTP Method(初始化时自动填充)
servedCount *gtype.Int // 已经服务的请求数(4-8字节不考虑溢出情况)同时作为请求ID
closeQueue *gqueue.Queue // 请求结束的关闭队列(存放的是需要异步关闭处理的*Request对象)
signalQueue chan os.Signal // 终端命令行监听队列
// 服务注册相关
hmmu sync.RWMutex // handler互斥锁
hmcmu sync.RWMutex // handlerCache互斥锁
handlerMap HandlerMap // 所有注册的回调函数(静态匹配)
statusHandlerMap map[string]HandlerFunc // 不同状态码下的注册处理方法(例如404状态时的处理方法)
handlerTree map[string]interface{} // 所有注册的回调函数(动态匹配,树型+链表优先级匹配)
hooksTree map[string]interface{} // 所有注册的事件回调函数(动态匹配,树型+链表优先级匹配)
handlerCache *gcache.Cache // 服务注册路由内存缓存
// 事件回调注册
hhmu sync.RWMutex // hooks互斥锁
hhcmu sync.RWMutex // hooksCache互斥锁
hooksTree map[string]interface{} // 所有注册的事件回调函数(动态匹配,树型+链表优先级匹配)
hooksCache *gcache.Cache // 回调事件注册路由内存缓存
servedCount *gtype.Int // 已经服务的请求数(4-8字节不考虑溢出情况)
// 自定义状态码回调
hsmu sync.RWMutex // status handler互斥锁
statusHandlerMap map[string]HandlerFunc // 不同状态码下的注册处理方法(例如404状态时的处理方法)
// COOKIE
cookieMaxAge *gtype.Int // Cookie有效期
cookies *gmap.IntInterfaceMap // 当前服务器正在服务(请求正在执行)的Cookie(每个请求一个Cookie对象)
// SESSION
sessionMaxAge *gtype.Int // Session有效期
sessionIdName *gtype.String // SessionId名称
cookies *gmap.IntInterfaceMap // 当前服务器正在服务(请求正在执行)的Cookie(每个请求一个Cookie对象)
sessions *gcache.Cache // Session内存缓存
closeQueue *gqueue.Queue // 请求结束的关闭队列(存放的是需要异步关闭处理的*Request对象)
// 日志相关属性
logPath *gtype.String // 存放日志的目录路径
logHandler *gtype.Interface // 自定义日志处理回调方法
errorLogEnabled *gtype.Bool // 是否开启error log
accessLogEnabled *gtype.Bool // 是否开启access log
accessLogger *glog.Logger // access log日志对象
errorLogger *glog.Logger // error log日志对象
logHandler *gtype.Interface // 自定义的日志处理回调方法
}
// 域名、URI与回调函数的绑定记录表
@ -85,9 +97,41 @@ type HandlerItem struct {
// http注册函数
type HandlerFunc func(r *Request)
// 文件描述符map
type listenerFdMap map[string]string
// Server表用以存储和检索名称与Server对象之间的关联关系
var serverMapping = gmap.NewStringInterfaceMap()
// Web Server多进程管理器
var procManager = gproc.NewManager()
// Web Server开始执行事件通道由于同一个进程支持多Server因此该通道为非阻塞
var readyChan = make(chan struct{}, 100000)
// Web Server已完成服务事件通道当有事件时表示服务完成当前进程退出
var doneChan = make(chan struct{}, 100000)
// Web Server进程初始化
func init() {
go func() {
// 等待ready消息(Run方法调用)
<- readyChan
// 主进程只负责创建子进程
if !gproc.IsChild() {
sendProcessMsg(os.Getpid(), gMSG_START, nil)
}
// 开启进程消息监听处理
handleProcessMsgAndSignal()
// 服务执行完成,需要退出
doneChan <- struct{}{}
if !gproc.IsChild() {
glog.Printfln("%d: all servers shutdown", gproc.Pid())
}
}()
}
// 获取/创建一个默认配置的HTTP Server(默认监听端口是80)
// 单例模式请保证name的唯一性
func GetServer(name...interface{}) (*Server) {
@ -100,6 +144,7 @@ func GetServer(name...interface{}) (*Server) {
}
s := &Server {
name : sname,
servers : make([]*gracefulServer, 0),
methodsMap : make(map[string]bool),
handlerMap : make(HandlerMap),
statusHandlerMap : make(map[string]HandlerFunc),
@ -114,6 +159,7 @@ func GetServer(name...interface{}) (*Server) {
sessionIdName : gtype.NewString(gDEFAULT_SESSION_ID_NAME),
servedCount : gtype.NewInt(),
closeQueue : gqueue.New(),
signalQueue : make(chan os.Signal),
logPath : gtype.NewString(),
accessLogEnabled : gtype.NewBool(),
errorLogEnabled : gtype.NewBool(true),
@ -134,74 +180,167 @@ func GetServer(name...interface{}) (*Server) {
return s
}
// 阻塞执行监听
func (s *Server) Run() error {
// 作为守护协程异步执行(当同一进程中存在多个Web Server时需要采用这种方式执行)
// 需要结合Wait方式一起使用
func (s *Server) Start() error {
// 主进程,不执行任何业务,只负责进程管理
if !gproc.IsChild() {
return nil
}
if s.status == 1 {
return errors.New("server is already running")
}
// 底层http server配置
if s.config.Handler == nil {
s.config.Handler = http.HandlerFunc(s.defaultHttpHandle)
}
// 开启异步处理队列处理循环
// 开启异步关闭队列处理循环
s.startCloseQueueLoop()
return nil
}
// 开始执行底层Web Server创建端口监听
var wg sync.WaitGroup
// 阻塞执行监听
func (s *Server) Run() error {
if err := s.Start(); err != nil {
return err
}
// Web Server准备就绪待执行
readyChan <- struct{}{}
// 阻塞等待服务执行完成
<- doneChan
return nil
}
// 阻塞等待所有Web Server停止常用于多Web Server场景以及需要将Web Server异步运行的场景
// 这是一个与进程相关的方法
func Wait() {
readyChan <- struct{}{}
<- doneChan
}
// 开启底层Web Server执行
func (s *Server) startServer(fdMap listenerFdMap) {
var httpsEnabled bool
if len(s.config.HTTPSCertPath) > 0 && len(s.config.HTTPSKeyPath) > 0 {
// ================
// HTTPS
// ================
if len(s.config.HTTPSAddr) == 0 {
if len(s.config.Addr) > 0 {
s.config.HTTPSAddr = s.config.Addr
s.config.Addr = ""
} else {
s.config.HTTPSAddr = gDEFAULT_HTTPS_ADDR
}
}
array := strings.Split(s.config.HTTPSAddr, ",")
httpsEnabled = len(s.config.HTTPSAddr) > 0
var array []string
if v, ok := fdMap["https"]; ok && len(v) > 0 {
array = strings.Split(v, ",")
} else {
array = strings.Split(s.config.HTTPSAddr, ",")
}
for _, v := range array {
wg.Add(1)
go func(addr string) {
if err := s.newServer(addr).ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath); err != nil {
glog.Error(err)
wg.Done()
if len(v) == 0 {
continue
}
fd := 0
addr := v
array := strings.Split(v, "#")
if len(array) > 1 {
addr = array[0]
// windows系统不支持文件描述符传递socket通信平滑交接因此只能完整重启
if runtime.GOOS != "windows" {
fd = gconv.Int(array[1])
}
}(v)
}
if fd > 0 {
s.servers = append(s.servers, s.newGracefulServer(addr, fd))
} else {
s.servers = append(s.servers, s.newGracefulServer(addr))
}
s.servers[len(s.servers) - 1].isHttps = true
}
}
// ================
// HTTP
if s.servedCount.Val() == 0 && len(s.config.Addr) == 0 {
// ================
// 当HTTPS服务未启用时默认HTTP地址才会生效
if !httpsEnabled && len(s.config.Addr) == 0 {
s.config.Addr = gDEFAULT_HTTP_ADDR
}
array := strings.Split(s.config.Addr, ",")
var array []string
if v, ok := fdMap["http"]; ok && len(v) > 0 {
array = strings.Split(v, ",")
} else {
array = strings.Split(s.config.Addr, ",")
}
for _, v := range array {
wg.Add(1)
go func(addr string) {
if err := s.newServer(addr).ListenAndServe(); err != nil {
if len(v) == 0 {
continue
}
fd := 0
addr := v
array := strings.Split(v, "#")
if len(array) > 1 {
addr = array[0]
// windows系统不支持文件描述符传递socket通信平滑交接因此只能完整重启
if runtime.GOOS != "windows" {
fd = gconv.Int(array[1])
}
}
if fd > 0 {
s.servers = append(s.servers, s.newGracefulServer(addr, fd))
} else {
s.servers = append(s.servers, s.newGracefulServer(addr))
}
}
// 开始执行异步监听
for _, v := range s.servers {
go func(server *gracefulServer) {
var err error
if server.isHttps {
err = server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath)
} else {
err = server.ListenAndServe()
}
// 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作
if err != nil && !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) {
glog.Error(err)
wg.Done()
}
}(v)
}
s.status = 1
// 阻塞执行直到所有Web Server退出
wg.Wait()
return nil
}
// 生成一个底层的Web Server对象
func (s *Server) newServer(addr string) *http.Server {
return &http.Server {
Addr : addr,
Handler : s.config.Handler,
ReadTimeout : s.config.ReadTimeout,
WriteTimeout : s.config.WriteTimeout,
IdleTimeout : s.config.IdleTimeout,
MaxHeaderBytes : s.config.MaxHeaderBytes,
// 获取当前监听的文件描述符信息构造成map返回
func (s *Server) getListenerFdMap() map[string]string {
m := map[string]string {
"https" : "",
"http" : "",
}
// s.servers是从HTTPS到HTTP优先级遍历解析的时候也应当按照这个顺序读取fd
for _, v := range s.servers {
str := v.addr + "#" + gconv.String(v.Fd()) + ","
if v.isHttps {
m["https"] += str
} else {
m["http"] += str
}
}
// 去掉末尾的","号
if len(m["https"]) > 0 {
m["https"] = m["https"][0 : len(m["https"]) - 1]
}
if len(m["http"]) > 0 {
m["http"] = m["http"][0 : len(m["http"]) - 1]
}
return m
}
// 清空当前的handlerCache

View File

@ -0,0 +1,173 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// pprof封装.
package ghttp
import (
"strings"
"gitee.com/johng/gf/g/os/gview"
"runtime"
"gitee.com/johng/gf/g/os/gproc"
"sync"
"gitee.com/johng/gf/g/os/gtime"
"errors"
"fmt"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/os/glog"
)
const (
gADMIN_ACTION_INTERVAL_LIMIT = 3000 // (毫秒)服务开启后允许执行管理操作的间隔限制
)
// 用于服务管理的对象
type utilAdmin struct {}
// (进程级别)用于Web Server管理操作的互斥锁保证管理操作的原子性
var serverActionLocker sync.Mutex
// (进程级别)用于记录上一次操作的时间(毫秒)
var serverActionLastTime = gtype.NewInt64(gtime.Millisecond())
// 当前服务进程所处的互斥管理操作状态
// 1 : reload
// 2 : restart
// 4 : shutdown
var serverProcessStatus = gtype.NewInt()
// 服务管理首页
func (p *utilAdmin) Index(r *Request) {
data := map[string]interface{}{
"uri" : strings.TrimRight(r.URL.Path, "/"),
}
buffer, _ := gview.ParseContent(`
<html>
<head>
<title>gf ghttp admin</title>
</head>
<body>
<p><a href="{{$.uri}}/reload">reload</a></p>
<p><a href="{{$.uri}}/restart">restart</a></p>
<p><a href="{{$.uri}}/shutdown">shutdown</a></p>
</body>
</html>
`, data)
r.Response.Write(buffer)
}
// 服务热重启
func (p *utilAdmin) Reload(r *Request) {
if runtime.GOOS == "windows" {
p.Restart(r)
} else {
if err := r.Server.Reload(); err == nil {
r.Response.Write("server reloaded")
} else {
r.Response.Write(err.Error())
}
}
}
// 服务完整重启
func (p *utilAdmin) Restart(r *Request) {
if err := r.Server.Restart(); err == nil {
r.Response.Write("server restarted")
} else {
r.Response.Write(err.Error())
}
}
// 服务关闭
func (p *utilAdmin) Shutdown(r *Request) {
r.Server.Shutdown()
if err := r.Server.Shutdown(); err == nil {
r.Response.Write("server shutdown")
} else {
r.Response.Write(err.Error())
}
}
// 开启服务管理支持
func (s *Server) EnableAdmin(pattern...string) {
p := "/debug/admin"
if len(pattern) > 0 {
p = pattern[0]
}
s.BindObject(p, &utilAdmin{})
}
// 平滑重启Web Server
func (s *Server) Reload() error {
serverActionLocker.Lock()
defer serverActionLocker.Unlock()
if err := s.checkActionStatus(); err != nil {
return err
}
if err := s.checkActionFrequence(); err != nil {
return err
}
glog.Printfln("%d: server reloading", gproc.Pid())
sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil)
return nil
}
// 完整重启Web Server
func (s *Server) Restart() error {
serverActionLocker.Lock()
defer serverActionLocker.Unlock()
if err := s.checkActionStatus(); err != nil {
return err
}
if err := s.checkActionFrequence(); err != nil {
return err
}
glog.Printfln("%d: server restarting", gproc.Pid())
sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil)
return nil
}
// 关闭Web Server
func (s *Server) Shutdown() error {
serverActionLocker.Lock()
defer serverActionLocker.Unlock()
if err := s.checkActionStatus(); err != nil {
return err
}
if err := s.checkActionFrequence(); err != nil {
return err
}
glog.Printfln("%d: server shutting down", gproc.Pid())
sendProcessMsg(gproc.PPid(), gMSG_SHUTDOWN, nil)
return nil
}
// 检测当前操作的频繁度
func (s *Server) checkActionFrequence() error {
interval := gtime.Millisecond() - serverActionLastTime.Val()
if interval < gADMIN_ACTION_INTERVAL_LIMIT {
return errors.New(fmt.Sprintf("too frequent action, please retry in %d ms", gADMIN_ACTION_INTERVAL_LIMIT - interval))
}
serverActionLastTime.Set(gtime.Millisecond())
return nil
}
// 检查当前服务进程的状态
func (s *Server) checkActionStatus() error {
status := serverProcessStatus.Val()
if status > 0 {
switch status {
case 1:
return errors.New("server is reloading")
case 2:
return errors.New("server is restarting")
case 4:
return errors.New("server is shutting down")
}
}
return nil
}

View File

@ -0,0 +1,166 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Web Server进程间通信
package ghttp
import (
"os"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/encoding/gjson"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/encoding/gbinary"
"gitee.com/johng/gf/g/os/gtime"
)
const (
gMSG_START = 1
gMSG_RELOAD = 2
gMSG_RESTART = 3
gMSG_SHUTDOWN = 4
gMSG_CLOSE = 5
gMSG_NEW_FORK = 6
gMSG_HEARTBEAT = 7
gPROC_HEARTBEAT_INTERVAL = 1000 // (毫秒)进程间心跳间隔
gPROC_HEARTBEAT_TIMEOUT = 3000 // (毫秒)进程间心跳超时时间,如果子进程在这段内没有接收到任何心跳,那么自动退出,防止可能出现的僵尸子进程
)
// 进程信号量监听消息队列
var procSignalChan = make(chan os.Signal)
// 上一次进程间心跳的时间戳
var lastUpdateTime = gtype.NewInt(int(gtime.Millisecond()))
// (主子进程)在第一次创建子进程成功之后才会开始心跳检测,同理对应超时时间才会生效
var checkHeartbeat = gtype.NewBool()
// 处理进程信号量监控以及进程间消息通信
func handleProcessMsgAndSignal() {
go handleProcessSignal()
if gproc.IsChild() {
go handleChildProcessHeartbeat()
} else {
go handleMainProcessHeartbeat()
}
handleProcessMsg()
}
// 处理进程间消息
// 数据格式: 操作(8bit) | 参数(变长)
func handleProcessMsg() {
for {
if msg := gproc.Receive(); msg != nil {
// 记录消息日志,用于调试
//content := gconv.String(msg.Pid) + "=>" + gconv.String(gproc.Pid()) + ":" + fmt.Sprintf("%v\n", msg.Data)
//glog.Print(content)
//gfile.PutContentsAppend("/tmp/gproc-log", content)
act := gbinary.DecodeToUint(msg.Data[0 : 1])
data := msg.Data[1 : ]
if msg.Pid != gproc.Pid() {
updateProcessUpdateTime()
}
if gproc.IsChild() {
// ===============
// 子进程
// ===============
switch act {
case gMSG_START: onCommChildStart(msg.Pid, data)
case gMSG_RELOAD: onCommChildReload(msg.Pid, data)
case gMSG_RESTART: onCommChildRestart(msg.Pid, data)
case gMSG_CLOSE: onCommChildClose(msg.Pid, data)
case gMSG_HEARTBEAT: onCommChildHeartbeat(msg.Pid, data)
case gMSG_SHUTDOWN: onCommChildShutdown(msg.Pid, data)
}
} else {
// ===============
// 父进程
// ===============
// 任何进程消息都会自动更新最后通信时间记录
if msg.Pid != gproc.Pid() {
updateProcessCommTime(msg.Pid)
}
switch act {
case gMSG_START: onCommMainStart(msg.Pid, data)
case gMSG_RELOAD: onCommMainReload(msg.Pid, data)
case gMSG_RESTART: onCommMainRestart(msg.Pid, data)
case gMSG_NEW_FORK: onCommMainNewFork(msg.Pid, data)
case gMSG_HEARTBEAT: onCommMainHeartbeat(msg.Pid, data)
case gMSG_SHUTDOWN:
onCommMainShutdown(msg.Pid, data)
return
}
}
}
}
}
// 向进程发送操作消息
func sendProcessMsg(pid int, act int, data []byte) error {
return gproc.Send(pid, formatMsgBuffer(act, data))
}
// 生成一条满足Web Server进程通信协议的消息
func formatMsgBuffer(act int, data []byte) []byte {
return append(gbinary.EncodeUint8(uint8(act)), data...)
}
// 获取所有Web Server的文件描述符map
func getServerFdMap() map[string]listenerFdMap {
sfm := make(map[string]listenerFdMap)
serverMapping.RLockFunc(func(m map[string]interface{}) {
for k, v := range m {
sfm[k] = v.(*Server).getListenerFdMap()
}
})
return sfm
}
// 二进制转换为FdMap
func bufferToServerFdMap(buffer []byte) map[string]listenerFdMap {
sfm := make(map[string]listenerFdMap)
if len(buffer) > 0 {
j, _ := gjson.LoadContent(buffer, "json")
for k, _ := range j.ToMap() {
m := make(map[string]string)
for k, v := range j.GetMap(k) {
m[k] = gconv.String(v)
}
sfm[k] = m
}
}
return sfm
}
// 关优雅闭进程所有端口的Web Server服务
// 注意只是关闭Web Server服务并不是退出进程
func shutdownWebServers() {
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
for _, s := range v.(*Server).servers {
s.shutdown()
}
}
})
}
// 强制关闭进程所有端口的Web Server服务
// 注意只是关闭Web Server服务并不是退出进程
func closeWebServers() {
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
for _, s := range v.(*Server).servers {
s.close()
}
}
})
}
// 更新上一次进程间通信的时间
func updateProcessUpdateTime() {
lastUpdateTime.Set(int(gtime.Millisecond()))
}

View File

@ -0,0 +1,109 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Web Server进程间通信 - 子进程
package ghttp
import (
"os"
"fmt"
"time"
"strings"
"runtime"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/encoding/gjson"
)
const (
gPROC_CHILD_MAX_IDLE_TIME = 10000 // 子进程闲置时间(未开启心跳机制的时间)
)
// 心跳处理(方法为空逻辑放到公共通信switch中进行处理)
func onCommChildHeartbeat(pid int, data []byte) {
}
// 平滑重启子进程收到重启消息那么将自身的ServerFdMap信息收集后发送给主进程由主进程进行统一调度
func onCommChildReload(pid int, data []byte) {
var buffer []byte = nil
p := procManager.NewProcess(os.Args[0], os.Args, os.Environ())
// windows系统无法进行文件描述符操作只能重启进程
if runtime.GOOS == "windows" {
// windows下使用shutdown会造成协程阻塞这里直接使用close强制关闭
closeWebServers()
} else {
// 创建新的服务进程,子进程自动从父进程复制文件描述来监听同样的端口
sfm := getServerFdMap()
// 将sfm中的fd按照子进程创建时的文件描述符顺序进行整理以便子进程获取到正确的fd
for name, m := range sfm {
for fdk, fdv := range m {
if len(fdv) > 0 {
s := ""
for _, item := range strings.Split(fdv, ",") {
array := strings.Split(item, "#")
fd := uintptr(gconv.Uint(array[1]))
if fd > 0 {
s += fmt.Sprintf("%s#%d,", array[0], 3 + len(p.ExtraFiles))
p.ExtraFiles = append(p.ExtraFiles, os.NewFile(fd, ""))
} else {
s += fmt.Sprintf("%s#%d,", array[0], 0)
}
}
sfm[name][fdk] = strings.TrimRight(s, ",")
}
}
}
buffer, _ = gjson.Encode(sfm)
}
p.PPid = gproc.PPid()
if newPid, err := p.Start(); err == nil {
sendProcessMsg(newPid, gMSG_START, buffer)
} else {
glog.Errorfln("%d: fork process failed, error:%s, %s", gproc.Pid(), err.Error(), string(buffer))
}
}
// 完整重启
func onCommChildRestart(pid int, data []byte) {
sendProcessMsg(gproc.PPid(), gMSG_RESTART, nil)
}
// 优雅关闭服务链接并退出
func onCommChildShutdown(pid int, data []byte) {
if runtime.GOOS != "windows" {
shutdownWebServers()
}
os.Exit(0)
}
// 强制性关闭服务链接并退出
func onCommChildClose(pid int, data []byte) {
closeWebServers()
os.Exit(0)
}
// 主进程与子进程相互异步方式发送心跳信息,保持活跃状态
func handleChildProcessHeartbeat() {
for {
time.Sleep(gPROC_HEARTBEAT_INTERVAL*time.Millisecond)
sendProcessMsg(gproc.PPid(), gMSG_HEARTBEAT, nil)
// 超过时间没有接收到主进程心跳,自动关闭退出
if checkHeartbeat.Val() && (int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT) {
// 子进程有时会无法退出(僵尸?)这里直接使用exit而不是return
glog.Printfln("%d: %d - %d > %d", gproc.Pid(), int(gtime.Millisecond()), lastUpdateTime.Val(), gPROC_HEARTBEAT_TIMEOUT)
glog.Printfln("%d: heartbeat timeout[%dms], exit", gproc.Pid(), gPROC_HEARTBEAT_TIMEOUT)
os.Exit(0)
}
// 未开启心跳检测的闲置超过一定时间则主动关闭
if !checkHeartbeat.Val() && gproc.Uptime() > gPROC_CHILD_MAX_IDLE_TIME {
glog.Printfln("%d: idle timeout[%dms], exit", gproc.Pid(), gPROC_CHILD_MAX_IDLE_TIME)
os.Exit(0)
}
}
}

View File

@ -0,0 +1,43 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Web Server进程间通信 - 子进程
// +build !windows
package ghttp
import (
"os"
"gitee.com/johng/gf/g/os/gproc"
)
// 开启所有Web Server(根据消息启动)
func onCommChildStart(pid int, data []byte) {
if len(data) > 0 {
sfm := bufferToServerFdMap(data)
for k, v := range sfm {
GetServer(k).startServer(v)
}
} else {
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
v.(*Server).startServer(nil)
}
})
}
// 进程创建成功之后(开始执行服务时间点为准),通知主进程自身的存在,并开始执行心跳机制
sendProcessMsg(gproc.PPid(), gMSG_NEW_FORK, nil)
// 如果创建自己的父进程非gproc父进程那么表示该进程为重启创建的进程创建成功之后需要通知父进程自行销毁
if gproc.PPidOS() != gproc.PPid() {
//如果子进程已经继承了父进程的socket文件描述符那么父进程没有存在的必要直接kill掉
if p, err := os.FindProcess(gproc.PPidOS()); err == nil {
p.Kill()
}
}
// 开始心跳时必须保证主进程时间有值,但是又不能等待主进程消息后再开始检测,因此这里自己更新一下通信时间
updateProcessUpdateTime()
checkHeartbeat.Set(true)
}

View File

@ -0,0 +1,26 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package ghttp
import (
"gitee.com/johng/gf/g/os/gproc"
)
// 开启所有Web Server(根据消息启动)
func onCommChildStart(pid int, data []byte) {
// 进程创建成功之后(开始执行服务时间点为准),通知主进程自身的存在,并开始执行心跳机制
sendProcessMsg(gproc.PPid(), gMSG_NEW_FORK, nil)
// 开启Web Server服务
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
v.(*Server).startServer(nil)
}
})
// 开始心跳时必须保证主进程时间有值,但是又不能等待主进程消息后再开始检测,因此这里自己更新一下通信时间
updateProcessUpdateTime()
checkHeartbeat.Set(true)
}

View File

@ -0,0 +1,119 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Web Server进程间通信 - 主进程.
// 管理子进程按照规则听话玩,不然有一百种方法让子进程在本地混不下去.
package ghttp
import (
"os"
"time"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/container/gmap"
)
// (主进程)主进程与子进程上一次活跃时间映射map
var procUpdateTimeMap = gmap.NewIntIntMap()
// 开启服务
func onCommMainStart(pid int, data []byte) {
fork := forkNewProcess()
if fork == nil {
os.Exit(1)
}
updateProcessCommTime(fork.Pid())
// 子进程创建成功之后再发送执行命令
sendProcessMsg(fork.Pid(), gMSG_START, nil)
}
// 心跳处理(方法为空逻辑放到公共通信switch中进行处理)
func onCommMainHeartbeat(pid int, data []byte) {
}
// 平滑重启服务
func onCommMainReload(pid int, data []byte) {
procManager.Send(formatMsgBuffer(gMSG_RELOAD, nil))
}
// 完整重启服务
func onCommMainRestart(pid int, data []byte) {
// 如果是父进程自身发送的重启指令,那么通知所有子进程重启
if pid == gproc.Pid() {
procManager.Send(formatMsgBuffer(gMSG_RESTART, nil))
return
}
// 首先创建子进程,暂时不开始服务,否则会有端口冲突
fork := forkNewProcess()
if fork == nil {
os.Exit(1)
}
// 然后通知旧的子进程自动关闭并退出(不需要新建的子进程来处理)
sendProcessMsg(pid, gMSG_CLOSE, nil)
if p, err := os.FindProcess(pid); err == nil && p != nil {
p.Kill()
p.Wait()
}
// 通知新的子进程执行服务监听
sendProcessMsg(fork.Pid(), gMSG_START, nil)
}
// 新建子进程通知
func onCommMainNewFork(pid int, data []byte) {
procManager.AddProcess(pid)
checkHeartbeat.Set(true)
}
// 关闭服务,通知所有子进程退出(Kill强制性退出)
func onCommMainShutdown(pid int, data []byte) {
procManager.Send(formatMsgBuffer(gMSG_CLOSE, nil))
procManager.KillAll()
procManager.WaitAll()
}
// 更新指定进程的通信时间记录
func updateProcessCommTime(pid int) {
procUpdateTimeMap.Set(pid, int(gtime.Millisecond()))
}
// 创建一个子进程,但是暂时不执行服务监听
func forkNewProcess() *gproc.Process {
p := procManager.NewProcess(os.Args[0], os.Args, os.Environ())
if _, err := p.Start(); err != nil {
glog.Errorfln("%d: fork new process error:%s", gproc.Pid(), err.Error())
return nil
}
return p
}
// 主进程与子进程相互异步方式发送心跳信息,保持活跃状态
func handleMainProcessHeartbeat() {
for {
time.Sleep(gPROC_HEARTBEAT_INTERVAL*time.Millisecond)
procManager.Send(formatMsgBuffer(gMSG_HEARTBEAT, nil))
// 清理过期进程
if checkHeartbeat.Val() {
for _, pid := range procManager.Pids() {
updatetime := procUpdateTimeMap.Get(pid)
if updatetime > 0 && int(gtime.Millisecond()) - updatetime > gPROC_HEARTBEAT_TIMEOUT {
//fmt.Println("remove pid", pid, int(gtime.Millisecond()), updatetime)
// 这里需要手动从进程管理器中去掉该进程
procManager.RemoveProcess(pid)
sendProcessMsg(pid, gMSG_CLOSE, nil)
}
}
// (双保险)如果所有子进程都退出,并且主进程未活动达到超时时间,那么主进程也没存在的必要
if procManager.Size() == 0 && int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT{
glog.Printfln("%d: all children died, exit", gproc.Pid())
os.Exit(0)
}
}
}
}

View File

@ -0,0 +1,53 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// +build !windows
package ghttp
import (
"os"
"syscall"
"os/signal"
"gitee.com/johng/gf/g/os/gproc"
)
// 信号量处理
func handleProcessSignal() {
var sig os.Signal
signal.Notify(
procSignalChan,
syscall.SIGINT,
syscall.SIGQUIT,
syscall.SIGKILL,
syscall.SIGHUP,
syscall.SIGTERM,
syscall.SIGUSR1,
syscall.SIGUSR2,
)
for {
sig = <- procSignalChan
switch sig {
// 进程终止,停止所有子进程运行
case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGTERM:
sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil)
if gproc.IsChild() {
sendProcessMsg(gproc.PPid(), gMSG_SHUTDOWN, nil)
}
return
// 用户信号,热重启服务
case syscall.SIGUSR1:
sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil)
// 用户信号,完整重启服务
case syscall.SIGUSR2:
sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil)
default:
}
}
}

View File

@ -0,0 +1,12 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package ghttp
// windows不处理信号量
func handleProcessSignal() {
}

View File

@ -0,0 +1,172 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package ghttp
import (
"os"
"fmt"
"net"
"context"
"net/http"
"crypto/tls"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gproc"
)
// 优雅的Web Server对象封装
type gracefulServer struct {
fd uintptr
addr string
httpServer *http.Server
rawListener net.Listener // 原始listener
listener net.Listener // 接口化封装的listener
isHttps bool
shutdownChan chan bool
}
// 创建一个优雅的Http Server
func (s *Server) newGracefulServer(addr string, fd...int) *gracefulServer {
gs := &gracefulServer {
addr : addr,
httpServer : s.newHttpServer(addr),
shutdownChan : make(chan bool),
}
// 是否有继承的文件描述符
if len(fd) > 0 && fd[0] > 0 {
gs.fd = uintptr(fd[0])
}
return gs
}
// 生成一个底层的Web Server对象
func (s *Server) newHttpServer(addr string) *http.Server {
return &http.Server {
Addr : addr,
Handler : s.config.Handler,
ReadTimeout : s.config.ReadTimeout,
WriteTimeout : s.config.WriteTimeout,
IdleTimeout : s.config.IdleTimeout,
MaxHeaderBytes : s.config.MaxHeaderBytes,
}
}
// 执行HTTP监听
func (s *gracefulServer) ListenAndServe() error {
addr := s.httpServer.Addr
ln, err := s.getNetListener(addr)
if err != nil {
return err
}
s.listener = ln
s.rawListener = ln
return s.doServe()
}
// 获得文件描述符
func (s *gracefulServer) Fd() uintptr {
if s.rawListener != nil {
file, err := s.rawListener.(*net.TCPListener).File()
if err == nil {
return file.Fd()
}
}
return 0
}
// 设置自定义fd
func (s *gracefulServer) setFd(fd int) {
s.fd = uintptr(fd)
}
// 执行HTTPS监听
func (s *gracefulServer) ListenAndServeTLS(certFile, keyFile string) error {
addr := s.httpServer.Addr
config := &tls.Config{}
if s.httpServer.TLSConfig != nil {
*config = *s.httpServer.TLSConfig
}
if config.NextProtos == nil {
config.NextProtos = []string{"http/1.1"}
}
var err error
config.Certificates = make([]tls.Certificate, 1)
config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return err
}
ln, err := s.getNetListener(addr)
if err != nil {
return err
}
s.listener = tls.NewListener(ln, config)
s.rawListener = ln
return s.doServe()
}
// 获取服务协议字符串
func (s *gracefulServer) getProto() string {
proto := "http"
if s.isHttps {
proto = "https"
}
return proto
}
// 开始执行Web Server服务处理
func (s *gracefulServer) doServe() error {
action := "started"
if s.fd != 0 {
action = "reloaded"
}
glog.Printfln("%d: %s server %s listening on [%s]", gproc.Pid(), s.getProto(), action, s.addr)
err := s.httpServer.Serve(s.listener)
<-s.shutdownChan
return err
}
// 自定义的net.Listener
func (s *gracefulServer) getNetListener(addr string) (net.Listener, error) {
var ln net.Listener
var err error
if s.fd > 0 {
f := os.NewFile(s.fd, "")
ln, err = net.FileListener(f)
if err != nil {
err = fmt.Errorf("%d: net.FileListener error: %v", gproc.Pid(), err)
return nil, err
}
} else {
ln, err = net.Listen("tcp", addr)
if err != nil {
err = fmt.Errorf("%d: net.Listen error: %v", gproc.Pid(), err)
return nil, err
}
}
return ln, nil
}
// 执行请求优雅关闭
func (s *gracefulServer) shutdown() {
if err := s.httpServer.Shutdown(context.Background()); err != nil {
glog.Errorfln("%d: %s server [%s] shutdown error: %v", gproc.Pid(), s.getProto(), s.addr, err)
} else {
//glog.Printfln("%d: %s server [%s] shutdown smoothly", gproc.Pid(), s.getProto(), s.addr)
s.shutdownChan <- true
}
}
// 执行请求强制关闭
func (s *gracefulServer) close() {
if err := s.httpServer.Close(); err != nil {
glog.Errorfln("%d: %s server [%s] closed error: %v", gproc.Pid(), s.getProto(), s.addr, err)
} else {
//glog.Printfln("%d: %s server [%s] closed smoothly", gproc.Pid(), s.getProto(), s.addr)
s.shutdownChan <- true
}
}

View File

@ -16,8 +16,8 @@ import (
"net/url"
"net/http"
"gitee.com/johng/gf/g/os/gfile"
"gitee.com/johng/gf/g/encoding/ghtml"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/encoding/ghtml"
)
// 默认HTTP Server处理入口http包底层默认使用了gorutine异步处理请求所以这里不再异步执行
@ -169,3 +169,20 @@ func (s *Server)listDir(r *Request, f http.File) {
}
r.Response.Write("</pre>\n")
}
// 开启异步队列处理循环该异步线程与Server同生命周期
func (s *Server) startCloseQueueLoop() {
go func() {
for {
if v := s.closeQueue.PopFront(); v != nil {
r := v.(*Request)
s.callHookHandler(r, "BeforeClose")
// 关闭当前会话的Cookie
r.Cookie.Close()
// 更新Session会话超时时间
r.Session.UpdateExpire()
s.callHookHandler(r, "AfterClose")
}
}
}()
}

View File

@ -11,13 +11,13 @@ import (
"strings"
runpprof "runtime/pprof"
netpprof "net/http/pprof"
"gitee.com/johng/gf/g/frame/gins"
"gitee.com/johng/gf/g/os/gview"
)
// 用于pprof的对象
type utilpprof struct {}
type utilPprof struct {}
func (p *utilpprof) Index(r *Request) {
func (p *utilPprof) Index(r *Request) {
profiles := runpprof.Profiles()
action := r.Get("action")
data := map[string]interface{}{
@ -25,8 +25,7 @@ func (p *utilpprof) Index(r *Request) {
"profiles" : profiles,
}
if len(action) == 0 {
view := gins.View()
buffer, _ := view.ParseContent(`
buffer, _ := gview.ParseContent(`
<html>
<head>
<title>gf ghttp pprof</title>
@ -52,19 +51,19 @@ func (p *utilpprof) Index(r *Request) {
}
}
func (p *utilpprof) Cmdline(r *Request) {
func (p *utilPprof) Cmdline(r *Request) {
netpprof.Cmdline(r.Response.Writer, &r.Request)
}
func (p *utilpprof) Profile(r *Request) {
func (p *utilPprof) Profile(r *Request) {
netpprof.Profile(r.Response.Writer, &r.Request)
}
func (p *utilpprof) Symbol(r *Request) {
func (p *utilPprof) Symbol(r *Request) {
netpprof.Symbol(r.Response.Writer, &r.Request)
}
func (p *utilpprof) Trace(r *Request) {
func (p *utilPprof) Trace(r *Request) {
netpprof.Trace(r.Response.Writer, &r.Request)
}
@ -74,7 +73,7 @@ func (s *Server) EnablePprof(pattern...string) {
if len(pattern) > 0 {
p = pattern[0]
}
up := &utilpprof{}
up := &utilPprof{}
_, _, uri, _ := s.parsePattern(p)
uri = strings.TrimRight(uri, "/")
s.BindHandler(uri + "/*action", up.Index)

View File

@ -34,23 +34,29 @@ func (s *Server)bindHandlerByMap(m HandlerMap) error {
return nil
}
// 将方法名称按照设定的规则转换为URI并附加到指定的URI后面
func (s *Server)appendMethodNameToUriWithPattern(pattern string, name string) string {
// 将方法名称按照设定的规则合并到pattern中.
// 规则1pattern中的URI包含{method}关键字,则替换该关键字为方法名称
// 规则2如果不满足规则1那么直接将防发明附加到pattern中的URI后面
func (s *Server)mergeMethodNameToPattern(pattern string, name string) string {
// 方法名中间存在大写字母转换为小写URI地址以“-”号链接每个单词
method := ""
for i := 0; i < len(name); i++ {
if i > 0 && gutil.IsLetterUpper(name[i]) {
method += "-"
}
method += strings.ToLower(string(name[i]))
}
if strings.Index(pattern, "{method}") != -1 {
return strings.Replace(pattern, "{method}", method, -1)
}
// 检测域名后缀
array := strings.Split(pattern, "@")
// 分离URI(其实可能包含HTTP Method)
uri := array[0]
uri = strings.TrimRight(uri, "/") + "/"
// 方法名中间存在大写字母转换为小写URI地址以“-”号链接每个单词
for i := 0; i < len(name); i++ {
if i > 0 && gutil.IsLetterUpper(name[i]) {
uri += "-"
}
uri += strings.ToLower(string(name[i]))
}
// 加上指定域名后缀
if len(array) > 1 {
uri += "@" + array[1]
return uri + "@" + array[1]
}
return uri
}
@ -72,7 +78,7 @@ func (s *Server)BindObject(pattern string, obj interface{}) error {
t := v.Type()
for i := 0; i < v.NumMethod(); i++ {
name := t.Method(i).Name
key := s.appendMethodNameToUriWithPattern(pattern, name)
key := s.mergeMethodNameToPattern(pattern, name)
m[key] = &HandlerItem {
ctype : nil,
fname : "",
@ -100,7 +106,7 @@ func (s *Server)BindObjectMethod(pattern string, obj interface{}, methods string
if !fval.IsValid() {
return errors.New("invalid method name:" + name)
}
key := s.appendMethodNameToUriWithPattern(pattern, name)
key := s.mergeMethodNameToPattern(pattern, name)
m[key] = &HandlerItem{
ctype : nil,
fname : "",
@ -152,7 +158,7 @@ func (s *Server)BindController(pattern string, c Controller) error {
if name == "Init" || name == "Shut" || name == "Exit" {
continue
}
key := s.appendMethodNameToUriWithPattern(pattern, name)
key := s.mergeMethodNameToPattern(pattern, name)
m[key] = &HandlerItem {
ctype : v.Elem().Type(),
fname : name,
@ -181,7 +187,7 @@ func (s *Server)BindControllerMethod(pattern string, c Controller, methods strin
if !cval.MethodByName(name).IsValid() {
return errors.New("invalid method name:" + name)
}
key := s.appendMethodNameToUriWithPattern(pattern, name)
key := s.mergeMethodNameToPattern(pattern, name)
m[key] = &HandlerItem {
ctype : ctype,
fname : name,

View File

@ -1,25 +0,0 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// 异步工作协程.
package ghttp
// 开启异步队列处理循环该异步线程与Server同生命周期
func (s *Server) startCloseQueueLoop() {
go func() {
for {
if v := s.closeQueue.PopFront(); v != nil {
r := v.(*Request)
s.callHookHandler(r, "BeforeClose")
// 关闭当前会话的Cookie
r.Cookie.Close()
// 更新Session会话超时时间
r.Session.UpdateExpire()
s.callHookHandler(r, "AfterClose")
}
}
}()
}

92
g/net/gtcp/tcp_func.go Normal file
View File

@ -0,0 +1,92 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gtcp
import (
"io"
"net"
"time"
)
const (
gDEFAULT_RETRY_INTERVAL = 100 // 默认重试时间间隔
)
type Retry struct {
Count int // 重试次数
Interval int // 重试间隔(毫秒)
}
// 常见的二进制数据校验方式,生成校验结果
func Checksum(buffer []byte) uint32 {
var checksum uint32
for _, b := range buffer {
checksum += uint32(b)
}
return checksum
}
// 获取数据
func Receive(conn net.Conn, retry...Retry) []byte {
size := 1024
data := make([]byte, 0)
for {
buffer := make([]byte, size)
length, err := conn.Read(buffer)
if length < 1 && err != nil {
if err == io.EOF || len(retry) == 0 || retry[0].Count == 0 {
break
}
if len(retry) > 0 {
retry[0].Count--
if retry[0].Interval == 0 {
retry[0].Interval = gDEFAULT_RETRY_INTERVAL
}
time.Sleep(time.Duration(retry[0].Interval) * time.Millisecond)
}
} else {
data = append(data, buffer[0:length]...)
if err == io.EOF {
break
}
}
}
return data
}
// 带超时时间的数据获取
func ReceiveWithTimeout(conn net.Conn, timeout time.Duration, retry...Retry) []byte {
conn.SetReadDeadline(time.Now().Add(timeout))
return Receive(conn, retry...)
}
// 发送数据
func Send(conn net.Conn, data []byte, retry...Retry) error {
for {
_, err := conn.Write(data)
if err != nil {
if len(retry) == 0 || retry[0].Count == 0 {
return err
}
if len(retry) > 0 {
retry[0].Count--
if retry[0].Interval == 0 {
retry[0].Interval = gDEFAULT_RETRY_INTERVAL
}
time.Sleep(time.Duration(retry[0].Interval) * time.Millisecond)
}
} else {
return nil
}
}
}
// 带超时时间的数据发送
func SendWithTimeout(conn net.Conn, data []byte, timeout time.Duration, retry...Retry) error {
conn.SetWriteDeadline(time.Now().Add(timeout))
return Send(conn, data, retry...)
}

View File

@ -46,6 +46,10 @@ func Mkdir(path string) error {
// 给定文件的绝对路径创建文件
func Create(path string) error {
dir := Dir(path)
if !Exists(dir) {
Mkdir(dir)
}
f, err := os.Create(path)
if err != nil {
return err
@ -107,7 +111,7 @@ func Info(path string) *os.FileInfo {
return &info
}
// 修改时间
// 修改时间(秒)
func MTime(path string) int64 {
f, e := os.Stat(path)
if e != nil {
@ -116,6 +120,17 @@ func MTime(path string) int64 {
return f.ModTime().Unix()
}
// 修改时间(毫秒)
func MTimeMillisecond(path string) int64 {
f, e := os.Stat(path)
if e != nil {
return 0
}
seconds := f.ModTime().Unix()
nanoSeconds := f.ModTime().Nanosecond()
return seconds*1000 + int64(nanoSeconds/1000000)
}
// 文件大小(bytes)
func Size(path string) int64 {
f, e := os.Stat(path)
@ -320,6 +335,11 @@ func putContents(path string, data []byte, flag int, perm os.FileMode) error {
return nil
}
// Truncate
func Truncate(path string, size int) error {
return os.Truncate(path, int64(size))
}
// (文本)写入文件内容
func PutContents(path string, content string) error {
return putContents(path, []byte(content), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
@ -462,4 +482,9 @@ func MainPkgPath() string {
return p
}
return ""
}
// 系统临时目录
func TempDir() string {
return os.TempDir()
}

80
g/os/gflock/gflock.go Normal file
View File

@ -0,0 +1,80 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// 文件锁.
package gflock
import (
"sync"
"github.com/theckman/go-flock"
"gitee.com/johng/gf/g/os/gfile"
)
// 文件锁
type Locker struct {
mu sync.RWMutex // 用于外部接口调用的互斥锁(阻塞机制)
flock *flock.Flock // 底层文件锁对象
}
// 创建文件锁
func New(file string) *Locker {
dir := gfile.TempDir() + gfile.Separator + "gflock"
if !gfile.Exists(dir) {
gfile.Mkdir(dir)
}
path := dir + gfile.Separator + file
lock := flock.NewFlock(path)
return &Locker{
flock : lock,
}
}
func (l *Locker) Path() string {
return l.flock.Path()
}
// 当前文件锁是否处于锁定状态(Lock)
func (l *Locker) IsLocked() bool {
return l.flock.Locked()
}
// 尝试Lock文件如果失败立即返回
func (l *Locker) TryLock() bool {
ok, _ := l.flock.TryLock()
if ok {
l.mu.Lock()
}
return ok
}
// 尝试RLock文件如果失败立即返回
func (l *Locker) TryRLock() bool {
ok, _ := l.flock.TryRLock()
if ok {
l.mu.RLock()
}
return ok
}
func (l *Locker) Lock() {
l.mu.Lock()
l.flock.Lock()
}
func (l *Locker) UnLock() {
l.flock.Unlock()
l.mu.Unlock()
}
func (l *Locker) RLock() {
l.mu.RLock()
l.flock.RLock()
}
func (l *Locker) RUnlock() {
l.flock.Unlock()
l.mu.RUnlock()
}

View File

@ -13,9 +13,7 @@ import (
"gitee.com/johng/gf/g/os/glog"
"github.com/fsnotify/fsnotify"
"gitee.com/johng/gf/g/os/gfile"
"gitee.com/johng/gf/g/os/gcache"
"gitee.com/johng/gf/g/os/grpool"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gqueue"
@ -25,7 +23,6 @@ import (
type Watcher struct {
watcher *fsnotify.Watcher // 底层fsnotify对象
events *gqueue.Queue // 过滤后的事件通知,不会出现重复事件
eventCache *gcache.Cache // 用于进行事件过滤当同一监听文件在100ms内出现相同事件则过滤
closeChan chan struct{} // 关闭事件
callbacks *gmap.StringInterfaceMap // 监听的回调函数
}
@ -66,15 +63,12 @@ func Remove(path string) error {
return watcher.Remove(path)
}
// 创建监听管理对象
func New() (*Watcher, error) {
if watch, err := fsnotify.NewWatcher(); err == nil {
w := &Watcher {
watcher : watch,
events : gqueue.New(),
eventCache : gcache.New(),
closeChan : make(chan struct{}, 1),
callbacks : gmap.NewStringInterfaceMap(),
}
@ -134,9 +128,6 @@ func (w *Watcher) startWatchLoop() {
// 监听事件
case ev := <- w.watcher.Events:
if !w.eventCache.Lock(ev.Name + ":" + gconv.String(ev.Op), 100) {
continue
}
w.events.PushBack(&Event{
Path : ev.Name,
Op : Op(ev.Op),
@ -155,7 +146,7 @@ func (w *Watcher) startEventLoop() {
for {
if v := w.events.PopFront(); v != nil {
event := v.(*Event)
// 如果是文件删除时间,判断该文件是否存在,如果存在,那么将此事件认为“假删除”,并重新添加监控
// 如果是文件删除事件,判断该文件是否存在,如果存在,那么将此事件认为“假删除”,并重新添加监控
if event.IsRemove() && gfile.Exists(event.Path){
w.watcher.Add(event.Path)
continue

76
g/os/gproc/gproc.go Normal file
View File

@ -0,0 +1,76 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// 进程管理/通信.
// 本进程管理从syscall, os.StartProcess, exec.Cmd都使用过
// 最后采用了exec.Cmd来实现多进程管理这是一个顶层的跨平台封装兼容性更好另外两个是偏底层的接口。
package gproc
import (
"os"
"time"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/container/gtype"
)
const (
gPROC_ENV_KEY_PPID_KEY = "gproc.ppid"
)
// 进程开始执行时间
var processStartTime = time.Now()
// 优雅退出标识符号
var isExited = gtype.NewBool()
// 获取当前进程ID
func Pid() int {
return os.Getpid()
}
// 获取父进程ID(gproc父进程如果当前进程本身就是父进程那么返回自身的pid不存在时则使用系统父进程)
func PPid() int {
if !IsChild() {
return Pid()
}
// gPROC_ENV_KEY_PPID_KEY为gproc包自定义的父进程
ppidValue := os.Getenv(gPROC_ENV_KEY_PPID_KEY)
if ppidValue != "" {
return gconv.Int(ppidValue)
}
return PPidOS()
}
// 获取父进程ID(系统父进程)
func PPidOS() int {
return os.Getppid()
}
// 判断当前进程是否为gproc创建的子进程
func IsChild() bool {
return os.Getenv(gPROC_ENV_KEY_PPID_KEY) != ""
}
// 进程开始执行时间
func StartTime() time.Time {
return processStartTime
}
// 进程已经运行的时间(毫秒)
func Uptime() int {
return int(time.Now().UnixNano()/1e6 - processStartTime.UnixNano()/1e6)
}
// 标识当前进程为退出状态,其他业务可以根据此标识来执行优雅退出
func SetExited() {
isExited.Set(true)
}
// 当前进程是否被标识为退出状态
func Exited() bool {
return isExited.Val()
}

252
g/os/gproc/gproc_comm.go Normal file
View File

@ -0,0 +1,252 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gproc
import (
"io"
"os"
"fmt"
"time"
"errors"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gfile"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/os/gflock"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/os/gfsnotify"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/container/gqueue"
"gitee.com/johng/gf/g/encoding/gbinary"
)
const (
// 由于子进程的temp dir有可能会和父进程不一致(特别是windows下),影响进程间通信,这里统一使用环境变量设置
gPROC_TEMP_DIR_ENV_KEY = "gproc.tempdir"
// 自动通信文件清理时间间隔
gPROC_COMM_AUTO_CLEAR_INTERVAL = time.Second
// 写入通信数据失败时候的重试次数
gPROC_COMM_FAILURE_RETRY_COUNT = 3
// (毫秒)主动通信内容检查时间间隔
gPROC_COMM_ACTIVE_CHECK_INTERVAL = 500
)
// 全局通信文件清理文件锁(同一时刻只能存在一个进程进行通信文件清理)
var commClearLocker = gflock.New("comm.clear.lock")
// 当前进程的文件锁
var commLocker = gflock.New(fmt.Sprintf("%d.lock", os.Getpid()))
// 进程通信消息队列
var commQueue = gqueue.New()
// 上一次进程通信内容检查的时间
var commLastCheckTime = gtype.NewInt64()
// TCP通信数据结构定义
type Msg struct {
Pid int // PID哪个进程发送的消息
Data []byte // 参数,消息附带的参数
}
// 进程管理/通信初始化操作
func init() {
path := getCommFilePath(os.Getpid())
if !gfile.Exists(path) {
// 判断是否需要创建通信文件
commLocker.Lock()
err := gfile.Create(path)
commLocker.UnLock()
if err != nil {
glog.Error(err)
os.Exit(1)
}
}
// 检测写入权限
if !gfile.IsWritable(path) {
glog.Errorfln("%s is not writable for gproc", path)
os.Exit(1)
}
updateLastCheckTime()
if gtime.Second() - gfile.MTime(path) < 10 {
// 初始化时读取已有数据(文件修改时间在10秒以内)
checkCommBuffer(path)
} else {
// 否则清空旧的数据内容
commLocker.Lock()
os.Truncate(path, 0)
commLocker.UnLock()
}
// 文件事件监听,如果通信数据文件有任何变化,读取文件并添加到消息队列
err := gfsnotify.Add(path, func(event *gfsnotify.Event) {
updateLastCheckTime()
checkCommBuffer(path)
})
if err != nil {
glog.Error(err)
}
go autoClearCommDir()
go autoActiveCheckComm()
}
// 更新最后通信检查时间
func updateLastCheckTime() {
commLastCheckTime.Set(gtime.Millisecond())
}
// 自动清理通信目录文件
// @todo 目前是以时间过期规则进行清理,后期可以考虑加入进程存在性判断
func autoClearCommDir() {
dirPath := getCommDirPath()
for {
time.Sleep(gPROC_COMM_AUTO_CLEAR_INTERVAL)
if commClearLocker.TryLock() {
for _, name := range gfile.ScanDir(dirPath) {
path := dirPath + gfile.Separator + name
if gtime.Second() - gfile.MTime(path) >= 10 {
gfile.Remove(path)
}
}
commClearLocker.UnLock()
}
}
}
// 主动通信内容检测
func autoActiveCheckComm() {
for {
time.Sleep(gPROC_COMM_ACTIVE_CHECK_INTERVAL*time.Millisecond)
if gtime.Millisecond() - commLastCheckTime.Val() > gPROC_COMM_ACTIVE_CHECK_INTERVAL {
updateLastCheckTime()
checkCommBuffer(getCommFilePath(Pid()))
}
}
}
// 手动检查进程通信消息,如果存在消息曾推送到进程消息队列
func checkCommBuffer(path string) {
commLocker.Lock()
buffer := gfile.GetBinContents(path)
if len(buffer) > 0 {
os.Truncate(path, 0)
}
commLocker.UnLock()
if len(buffer) > 0 {
for _, v := range bufferToMsgs(buffer) {
commQueue.PushBack(v)
}
}
}
// 获取其他进程传递到当前进程的消息包,阻塞执行
func Receive() *Msg {
if v := commQueue.PopFront(); v != nil {
return v.(*Msg)
}
return nil
}
// 向指定gproc进程发送数据
// 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长)
func Send(pid int, data interface{}) error {
var err error = nil
buffer := gconv.Bytes(data)
b := make([]byte, 0)
b = append(b, gbinary.EncodeInt32(int32(len(buffer) + 12))...)
b = append(b, gbinary.EncodeInt32(int32(os.Getpid()))...)
b = append(b, gbinary.EncodeUint32(checksum(buffer))...)
b = append(b, buffer...)
l := gflock.New(fmt.Sprintf("%d.lock", pid))
l.Lock()
for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- {
err = doSend(pid, b)
if err == nil {
break
}
}
l.UnLock()
//glog.Printfln("%d to %d, %v, %d, %v", Pid(), pid, data, gfile.Size(getCommFilePath(pid)), err)
return err
}
// 执行进程间通信数据写入
func doSend(pid int, buffer []byte) error {
file, err := gfile.OpenWithFlag(getCommFilePath(pid), os.O_RDWR|os.O_CREATE|os.O_APPEND)
if err != nil{
return err
}
// 获取原有文件内容大小
stat, err := file.Stat()
if err != nil {
return err
}
oldSize := stat.Size()
// 执行数据写入
writeSize, err := file.Write(buffer)
if err != nil {
return err
}
if writeSize < len(buffer) {
return io.ErrShortWrite
}
// 写入成功之后获取最新文件内容大小,执行对比
if stat, err := file.Stat(); err != nil {
return err
} else {
// 由于文件锁机制的保证,同一时刻只会有一个进程(&协程)在执行写入,不会出现数据粘包情况
// 这里从严谨性考虑增加大小判断,更进一步避免粘包,或者丢包情况
if stat.Size() - int64(writeSize) != oldSize {
return errors.New("error writing data")
}
}
return nil
}
// 获取指定进程的通信文件地址
func getCommFilePath(pid int) string {
return getCommDirPath() + gfile.Separator + gconv.String(pid)
}
// 获取进程间通信目录地址
func getCommDirPath() string {
tempDir := os.Getenv("gproc.tempdir")
if tempDir == "" {
tempDir = gfile.TempDir()
}
return tempDir + gfile.Separator + "gproc"
}
// 数据解包,防止黏包
func bufferToMsgs(buffer []byte) []*Msg {
s := 0
msgs := make([]*Msg, 0)
for s < len(buffer) {
length := gbinary.DecodeToInt(buffer[s : s + 4])
if length < 0 || length > len(buffer) {
s++
continue
}
checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12])
checksum2 := checksum(buffer[s + 12 : s + length])
if checksum1 != checksum2 {
s++
continue
}
msgs = append(msgs, &Msg {
Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]),
Data : buffer[s + 12 : s + length],
})
s += length
}
return msgs
}
// 常见的二进制数据校验方式,生成校验结果
func checksum(buffer []byte) uint32 {
var checksum uint32
for _, b := range buffer {
checksum += uint32(b)
}
return checksum
}

177
g/os/gproc/gproc_manager.go Normal file
View File

@ -0,0 +1,177 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// 进程管理.
package gproc
import (
"os"
"fmt"
"strings"
"os/exec"
"gitee.com/johng/gf/g/container/gmap"
)
const (
gCHILD_ARGS_MARK_NAME = "--gproc-child"
)
// 进程管理器
type Manager struct {
processes *gmap.IntInterfaceMap // 所管理的子进程map
}
// 创建一个进程管理器
func NewManager() *Manager {
return &Manager{
processes : gmap.NewIntInterfaceMap(),
}
}
// 创建一个进程(不执行)
func NewProcess(path string, args []string, environment []string) *Process {
env := make([]string, len(environment) + 1)
for k, v := range environment {
env[k] = v
}
env[len(env) - 1] = fmt.Sprintf("%s=%s", gPROC_TEMP_DIR_ENV_KEY, os.TempDir())
p := &Process {
Manager : nil,
PPid : os.Getpid(),
Cmd : exec.Cmd {
Args : []string{path},
Path : path,
Stdin : os.Stdin,
Stdout : os.Stdout,
Stderr : os.Stderr,
Env : env,
ExtraFiles : make([]*os.File, 0),
},
}
// 当前工作目录
if d, err := os.Getwd(); err == nil {
p.Dir = d
}
// 判断是否加上子进程标识
hasChildMark := false
childMarkLen := len(gCHILD_ARGS_MARK_NAME)
for _, v := range args {
if len(v) >= childMarkLen && strings.EqualFold(v[0 : childMarkLen], gCHILD_ARGS_MARK_NAME) {
hasChildMark = true
}
}
if !hasChildMark {
p.Args = append(p.Args, gCHILD_ARGS_MARK_NAME)
}
if len(args) > 0 {
start := 0
if strings.EqualFold(path, args[0]) {
start = 1
}
p.Args = append(p.Args, args[start : ]...)
}
return p
}
// 创建一个进程(不执行)
func (m *Manager) NewProcess(path string, args []string, environment []string) *Process {
p := NewProcess(path, args, environment)
p.Manager = m
return p
}
// 获取当前进程管理器中的一个进程
func (m *Manager) GetProcess(pid int) *Process {
if v := m.processes.Get(pid); v != nil {
return v.(*Process)
}
return nil
}
// 添加一个已存在进程到进程管理器中
func (m *Manager) AddProcess(pid int) {
if process, err := os.FindProcess(pid); err == nil {
p := m.NewProcess("", nil, nil)
p.Process = process
m.processes.Set(pid, p)
}
}
// 移除进程管理器中的指定进程
func (m *Manager) RemoveProcess(pid int) {
m.processes.Remove(pid)
}
// 获取所有的进程对象,构成列表返回
func (m *Manager) Processes() []*Process {
processes := make([]*Process, 0)
m.processes.RLockFunc(func(m map[int]interface{}) {
for _, v := range m {
processes = append(processes, v.(*Process))
}
})
return processes
}
// 获取所有的进程pid构成列表返回
func (m *Manager) Pids() []int {
return m.processes.Keys()
}
// 等待所有子进程结束
func (m *Manager) WaitAll() {
processes := m.Processes()
if len(processes) > 0 {
for _, p := range processes {
p.Wait()
}
}
}
// 关闭所有的进程
func (m *Manager) KillAll() error {
for _, p := range m.Processes() {
if err := p.Kill(); err != nil {
return err
}
}
return nil
}
// 向所有进程发送信号量
func (m *Manager) SignalAll(sig os.Signal) error {
for _, p := range m.Processes() {
if err := p.Signal(sig); err != nil {
return err
}
}
return nil
}
// 向所有进程发送消息
func (m *Manager) Send(data interface{}) error {
for _, p := range m.Processes() {
if err := p.Send(data); err != nil {
return err
}
}
return nil
}
// 向指定进程发送消息
func (m *Manager) SendTo(pid int, data interface{}) error {
return Send(pid, data)
}
// 清空管理器
func (m *Manager) Clear() {
m.processes.Clear()
}
// 当前进程总数
func (m *Manager) Size() int {
return m.processes.Size()
}

View File

@ -0,0 +1,90 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gproc
import (
"os"
"fmt"
"errors"
"os/exec"
)
// 子进程
type Process struct {
exec.Cmd
Manager *Manager // 所属进程管理器
PPid int // 自定义关联的父进程ID
}
// 开始执行(非阻塞)
func (p *Process) Start() (int, error) {
if p.Process != nil {
return p.Pid(), nil
}
if p.PPid > 0 {
p.Env = append(p.Env, fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, p.PPid))
}
if err := p.Cmd.Start(); err == nil {
if p.Manager != nil {
p.Manager.processes.Set(p.Process.Pid, p)
}
return p.Process.Pid, nil
} else {
return 0, err
}
}
// 运行进程(阻塞等待执行完毕)
func (p *Process) Run() error {
if _, err := p.Start(); err == nil {
return p.Wait()
} else {
return err
}
}
// PID
func (p *Process) Pid() int {
if p.Process != nil {
return p.Process.Pid
}
return 0
}
// 向进程发送消息
func (p *Process) Send(data interface{}) error {
if p.Process != nil {
return Send(p.Process.Pid, data)
}
return errors.New("process not running")
}
// Release releases any resources associated with the Process p,
// rendering it unusable in the future.
// Release only needs to be called if Wait is not.
func (p *Process) Release() error {
return p.Process.Release()
}
// Kill causes the Process to exit immediately.
func (p *Process) Kill() error {
if err := p.Process.Kill(); err == nil {
if p.Manager != nil {
p.Manager.processes.Remove(p.Pid())
}
return nil
} else {
return err
}
}
// Signal sends a signal to the Process.
// Sending Interrupt on Windows is not implemented.
func (p *Process) Signal(sig os.Signal) error {
return p.Process.Signal(sig)
}

18
geg/encoding/gbase64.go Normal file
View File

@ -0,0 +1,18 @@
package main
import (
"fmt"
"gitee.com/johng/gf/g/encoding/gbase64"
)
func main() {
s := "john"
b := gbase64.Encode(s)
c, e := gbase64.Decode(b)
fmt.Println(b)
fmt.Println(c)
fmt.Println(e)
}

View File

@ -0,0 +1,19 @@
package demo
import (
"gitee.com/johng/gf/g"
"gitee.com/johng/gf/g/frame/gmvc"
)
type ControllerRule struct {
gmvc.Controller
}
func init() {
g.Server().BindController("/rule/{method}/:name", &ControllerRule{})
}
func (c *ControllerRule) Show() {
c.Response.Write(c.Request.Get("name"))
}

View File

@ -1,11 +1,11 @@
package main
import (
"gitee.com/johng/gf/g/net/ghttp"
"gitee.com/johng/gf/g"
_ "gitee.com/johng/gf/geg/frame/mvc/controller/demo"
)
func main() {
ghttp.GetServer().SetPort(8199)
ghttp.GetServer().Run()
g.Server().SetPort(8199)
g.Server().Run()
}

View File

@ -6,11 +6,13 @@ import (
func main() {
s := ghttp.GetServer()
s.EnableAdmin()
s.BindHandler("/", func(r *ghttp.Request){
r.Response.Writeln("您可以同时通过HTTP和HTTPS方式看到该内容")
})
s.EnableHTTPS("/home/john/temp/server.crt", "/home/john/temp/server.key")
s.SetHTTPSPort(443)
s.SetPort(80)
s.SetHTTPSPort(8198, 8199)
s.SetPort(8200, 8300)
s.EnableAdmin()
s.Run()
}

View File

@ -0,0 +1,12 @@
package main
import (
"gitee.com/johng/gf/g"
)
func main() {
s := g.Server()
s.EnableAdmin()
s.SetPort(8199)
s.Run()
}

View File

@ -0,0 +1,18 @@
package main
import (
"gitee.com/johng/gf/g/net/ghttp"
)
func main() {
s := ghttp.GetServer()
s.EnableAdmin()
s.BindHandler("/", func(r *ghttp.Request){
r.Response.Writeln("您可以同时通过HTTP和HTTPS方式看到该内容")
})
s.EnableHTTPS("/home/john/temp/server.crt", "/home/john/temp/server.key")
s.SetHTTPSPort(8198, 8199)
s.SetPort(8200, 8300)
s.EnableAdmin()
s.Run()
}

View File

@ -0,0 +1,23 @@
package main
import (
"gitee.com/johng/gf/g"
"gitee.com/johng/gf/g/net/ghttp"
)
func main() {
s := g.Server()
s.BindHandler("/", func(r *ghttp.Request){
r.Response.Writeln("hello")
})
s.BindHandler("/restart", func(r *ghttp.Request){
r.Response.Writeln("restart server")
r.Server.Restart()
})
s.BindHandler("/shutdown", func(r *ghttp.Request){
r.Response.Writeln("shutdown server")
r.Server.Shutdown()
})
s.SetPort(8199, 8200)
s.Run()
}

View File

@ -0,0 +1,31 @@
package main
import (
"gitee.com/johng/gf/g"
"gitee.com/johng/gf/g/net/ghttp"
"time"
"gitee.com/johng/gf/g/os/gproc"
)
func main() {
s1 := g.Server("s1")
s1.EnableAdmin()
s1.BindHandler("/", func(r *ghttp.Request) {
pid := gproc.Pid()
r.Response.Writeln("before restart, pid:", pid)
time.Sleep(10*time.Second)
r.Response.Writeln("after restart, pid:", gproc.Pid())
})
s1.BindHandler("/pid", func(r *ghttp.Request) {
r.Response.Write(gproc.Pid())
})
s1.SetPort(8199, 8200)
s1.Start()
s2 := g.Server("s2")
s2.EnableAdmin()
s2.SetPort(8300, 8080)
s2.Start()
g.Wait()
}

View File

@ -0,0 +1,23 @@
package main
import (
"gitee.com/johng/gf/g"
"gitee.com/johng/gf/g/net/ghttp"
"gitee.com/johng/gf/g/os/gproc"
"time"
)
func main() {
s := g.Server()
s.BindHandler("/sleep", func(r *ghttp.Request){
r.Response.Writeln(gproc.Pid())
time.Sleep(10*time.Second)
r.Response.Writeln(gproc.Pid())
})
s.BindHandler("/pid", func(r *ghttp.Request){
r.Response.Writeln(gproc.Pid())
})
s.EnableAdmin()
s.SetPort(8199)
s.Run()
}

17
geg/os/gflock/flock.go Normal file
View File

@ -0,0 +1,17 @@
package main
import (
"fmt"
"github.com/theckman/go-flock"
"time"
)
func main() {
l := flock.NewFlock("/tmp/go-lock.lock")
l.Lock()
fmt.Printf("lock 1")
l.Lock()
fmt.Printf("lock 1")
time.Sleep(time.Hour)
}

29
geg/os/gflock/gflock.go Normal file
View File

@ -0,0 +1,29 @@
package main
import (
"gitee.com/johng/gf/g/os/gflock"
"fmt"
"time"
)
func test() {
l := gflock.New("1.lock")
fmt.Println(l.Path())
l.Lock()
fmt.Println("lock 1")
l.Lock()
fmt.Println("lock 2")
}
func active() {
i := 0
for {
time.Sleep(time.Second)
i++
}
}
func main() {
go active()
test()
}

View File

@ -6,7 +6,7 @@ import (
)
func main() {
err := gfsnotify.Add("/home/john/Documents/temp.txt", func(event *gfsnotify.Event) {
err := gfsnotify.Add("./temp.txt", func(event *gfsnotify.Event) {
if event.IsCreate() {
log.Println("创建文件 : ", event.Path)
}

15
geg/os/gproc/cmd.go Normal file
View File

@ -0,0 +1,15 @@
package main
import (
"os"
"fmt"
"time"
"os/exec"
)
func main () {
cmd := exec.Command(os.Args[0], "1")
time.Sleep(3*time.Second)
fmt.Println(cmd.Start())
time.Sleep(time.Hour)
}

28
geg/os/gproc/gproc.go Normal file
View File

@ -0,0 +1,28 @@
package main
import (
"os"
"fmt"
"time"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/os/gtime"
)
func main () {
fmt.Printf("%d: I am child? %v\n", gproc.Pid(), gproc.IsChild())
if gproc.IsChild() {
gtime.SetInterval(time.Second, func() bool {
gproc.Send(gproc.PPid(), gtime.Datetime())
return true
})
select { }
} else {
m := gproc.NewManager()
p := m.NewProcess(os.Args[0], os.Args, os.Environ())
p.Start()
for {
msg := gproc.Receive()
fmt.Printf("receive from %d, data: %s\n", msg.Pid, string(msg.Data))
}
}
}

11
geg/os/gproc/gproc2.go Normal file
View File

@ -0,0 +1,11 @@
package main
import (
"fmt"
"gitee.com/johng/gf/g/os/gproc"
)
func main () {
err := gproc.Send(23504, []byte{30})
fmt.Println(err)
}

19
geg/other/graceful.go Normal file
View File

@ -0,0 +1,19 @@
package main
import (
"fmt"
"net/http"
"github.com/tabalt/gracehttp"
)
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "hello world")
})
err := gracehttp.ListenAndServe(":8888", nil)
if err != nil {
fmt.Println(err)
}
}

View File

@ -2,16 +2,24 @@ package main
import (
"fmt"
"gitee.com/johng/gf/g/os/gpm"
"os"
"time"
"gitee.com/johng/gf/g/os/glog"
)
func main() {
//var v interface{}
m := map[string]int {
"age" : 18,
m := gproc.New()
env := os.Environ()
env = append(env, "child=1")
p := m.NewProcess(os.Args[0], os.Args, env)
if os.Getenv("child") != "" {
time.Sleep(3*time.Second)
glog.Error("error")
} else {
pid, err := p.Run()
fmt.Println(pid)
fmt.Println(err)
fmt.Println(p.Wait())
}
//v = m
p := &m
(*p)["age"] = 16
//fmt.Println(v)
fmt.Println(m)
}

View File

@ -1,5 +1,5 @@
package gf
const VERSION = "0.97 beta"
const VERSION = "0.98 beta"
const AUTHORS = "john<john@johng.cn>"