gf/g/os/gproc/gproc_comm_send.go

92 lines
2.8 KiB
Go
Raw Normal View History

// Copyright 2018 gf Author(https://github.com/gogf/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gproc
import (
"bytes"
2019-02-20 16:07:11 +08:00
"errors"
"fmt"
"github.com/gogf/gf/g/encoding/gbinary"
"github.com/gogf/gf/g/net/gtcp"
"github.com/gogf/gf/g/os/gfcache"
"github.com/gogf/gf/g/os/glog"
2019-02-20 16:07:11 +08:00
"github.com/gogf/gf/g/util/gconv"
2018-08-24 22:33:38 +08:00
"io"
2019-02-20 16:07:11 +08:00
"time"
)
const (
gPROC_COMM_FAILURE_RETRY_COUNT = 3 // 失败重试次数
gPROC_COMM_FAILURE_RETRY_TIMEOUT = 1000 // (毫秒)失败重试间隔
gPROC_COMM_SEND_TIMEOUT = 5000 // (毫秒)发送超时时间
gPROC_COMM_DEAFULT_GRUOP_NAME = "" // 默认分组名称
)
2019-06-15 16:07:36 +08:00
// 进程通信数据结构
type gPkg struct {
SendPid int // 发送进程ID
RecvPid int // 接收进程ID
Group string // 分组名称
Data []byte // 原始数据
}
2019-02-20 16:07:11 +08:00
// 向指定gproc进程发送数据.
2019-06-15 16:07:36 +08:00
// 数据格式:总长度(24bit)|发送进程PID(24bit)|接收进程PID(24bit)|分组长度(8bit)|分组名称(变长)|参数(变长)
func Send(pid int, data []byte, group...string) error {
groupName := gPROC_COMM_DEAFULT_GRUOP_NAME
if len(group) > 0 {
groupName = group[0]
}
2019-06-15 16:07:36 +08:00
// 执行发送流程
var err error
var buf []byte
var conn *gtcp.Conn
for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- {
if conn, err = getConnByPid(pid); err == nil {
defer conn.Close()
buf, err = conn.SendRecvWithTimeout(buffer, -1, gPROC_COMM_SEND_TIMEOUT*time.Millisecond)
if len(buf) > 0 {
// 如果有返回值,如果不是"ok",那么表示是错误信息
if !bytes.EqualFold(buf, []byte("ok")) {
err = errors.New(string(buf))
break
}
}
2018-08-24 22:33:38 +08:00
// EOF不算异常错误
if err == nil || err == io.EOF {
break
} else {
glog.Error(err)
}
}
time.Sleep(gPROC_COMM_FAILURE_RETRY_TIMEOUT*time.Millisecond)
}
return err
}
// 获取指定进程的TCP通信对象
func getConnByPid(pid int) (*gtcp.Conn, error) {
port := getPortByPid(pid)
if port > 0 {
2018-07-12 20:32:56 +08:00
if conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", port)); err == nil {
return conn, nil
} else {
return nil, err
}
}
return nil, errors.New(fmt.Sprintf("could not find port for pid: %d" , pid))
}
// 获取指定进程监听的端口号
func getPortByPid(pid int) int {
path := getCommFilePath(pid)
2019-02-20 16:07:11 +08:00
content := gfcache.GetContents(path)
return gconv.Int(content)
}