energy/ipc/ipc-channel.go

379 lines
8.4 KiB
Go
Raw Normal View History

2023-03-19 00:00:09 +08:00
//----------------------------------------
//
// Copyright © yanghy. All Rights Reserved.
//
// Licensed under Apache License Version 2.0, January 2004
//
// https://www.apache.org/licenses/LICENSE-2.0
//
//----------------------------------------
package ipc
import (
"bytes"
"encoding/binary"
"errors"
"github.com/energye/energy/common"
. "github.com/energye/energy/consts"
"github.com/energye/energy/logger"
"github.com/energye/energy/pkgs/json"
"github.com/energye/golcl/lcl/rtl/version"
"math"
"net"
"os"
"path/filepath"
"sync"
)
var (
2023-03-19 00:27:06 +08:00
protocolHeader = []byte{0x01, 0x09, 0x08, 0x07, 0x00, 0x08, 0x02, 0x02} //协议头
protocolHeaderLength = int32(len(protocolHeader)) //协议头长度
messageTypeLength = int32(1) //消息类型 int8
channelIdLength = int32(8) //通道Id int64
dataByteLength = int32(4) //数据长度 int32
headerLength = int(protocolHeaderLength + messageTypeLength + channelIdLength + dataByteLength) //协议头长度
2023-03-19 00:00:09 +08:00
)
var (
memoryAddress = "energy.sock"
ipcSock string
2023-03-19 01:31:30 +08:00
useNetIPCChannel = false
Channel = &ipcChannel{
2023-03-19 00:00:09 +08:00
browser: &browserChannel{
channel: sync.Map{},
mutex: sync.Mutex{},
},
render: &renderChannel{
mutex: sync.Mutex{},
},
}
)
//消息类型
type mt int8
const (
2023-03-19 01:31:30 +08:00
mt_invalid mt = iota - 1 //无效类型
mt_connection //建立链接消息
mt_common //普通消息
2023-03-19 00:00:09 +08:00
)
const (
2023-03-19 00:27:06 +08:00
key_channelId = "key_channelId"
2023-03-19 00:00:09 +08:00
)
2023-03-19 01:31:30 +08:00
type IPCCallback func(context IIPCContext)
2023-03-19 00:00:09 +08:00
func init() {
ipcSock = filepath.Join(os.TempDir(), memoryAddress)
}
type ipcChannel struct {
port int
browser *browserChannel
render *renderChannel
}
2023-03-19 01:31:30 +08:00
type channel struct {
IPCType IPC_TYPE
Conn net.Conn
}
func removeMemory() {
os.Remove(ipcSock)
}
func UseNetIPCChannel() bool {
return useNetIPCChannel
}
func MemoryAddress() string {
return memoryAddress
}
func isUseNetIPC() bool {
if common.IsDarwin() || common.IsLinux() {
return false
}
ov := version.OSVersion
if (ov.Major > 10) || (ov.Major == 10 && ov.Build >= 17063) {
return false
}
return true
}
// conn 返回通道链接
func (m *channel) conn() net.Conn {
return m.Conn
}
// Port 获取并返回net socket端口
2023-03-19 00:00:09 +08:00
func (m *ipcChannel) Port() int {
if m.port != 0 {
return m.port
}
//主进程获取端口号
if common.Args.IsMain() {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
panic("Failed to Get unused Port number Error: " + err.Error())
}
listen, err := net.ListenTCP("tcp", addr)
if err != nil {
panic("Failed to Get unused Port number Error: " + err.Error())
}
defer listen.Close()
m.port = listen.Addr().(*net.TCPAddr).Port
}
return m.port
}
2023-03-19 01:31:30 +08:00
// Browser 返回 browser 通道
2023-03-19 00:00:09 +08:00
func (m *ipcChannel) Browser() *browserChannel {
return m.browser
}
2023-03-19 01:31:30 +08:00
// Render 返回 render 通道
2023-03-19 00:00:09 +08:00
func (m *ipcChannel) Render() *renderChannel {
return m.render
}
2023-03-19 01:31:30 +08:00
// IIPCChannel browser
2023-03-19 00:00:09 +08:00
type IIPCChannel interface {
Close()
Channel(channelId int64) *channel //IPC 获取指定的通道
ChannelIds() (result []int64) //IPC 获取所有通道
}
2023-03-19 01:31:30 +08:00
// IIPCContext IPC通信回调上下文
2023-03-19 00:00:09 +08:00
type IIPCContext interface {
2023-03-19 01:31:30 +08:00
Connect() net.Conn // IPC 通道链接
ChannelId() int64 // 通道ID
Message() IMessage // 消息
2023-03-19 00:00:09 +08:00
Free() //
}
2023-03-19 01:31:30 +08:00
// IMessage 消息内容接口
2023-03-19 00:00:09 +08:00
type IMessage interface {
Type() mt
2023-03-19 00:27:06 +08:00
Length() uint32
2023-03-19 00:00:09 +08:00
Data() []byte
JSON() json.JSON
clear()
}
2023-03-19 01:31:30 +08:00
// ipcReadHandler ipc 消息读取处理
2023-03-19 15:25:46 +08:00
//type ipcReadHandler struct {
// ipcType IPC_TYPE
// ct ChannelType
// connect net.Conn
// handler IPCCallback
//}
2023-03-19 00:00:09 +08:00
2023-03-19 01:31:30 +08:00
// ipcMessage 消息内容
2023-03-19 00:00:09 +08:00
type ipcMessage struct {
t mt
2023-03-19 00:27:06 +08:00
s uint32
2023-03-19 00:00:09 +08:00
v []byte
}
2023-03-19 01:31:30 +08:00
// IPCContext IPC 上下文
2023-03-19 00:00:09 +08:00
type IPCContext struct {
2023-03-19 00:10:52 +08:00
channelId int64 //render channelId
2023-03-19 00:00:09 +08:00
ipcType IPC_TYPE //
connect net.Conn //
message IMessage //
}
2023-03-19 01:31:30 +08:00
// Free 释放消息内存空间
2023-03-19 00:00:09 +08:00
func (m *IPCContext) Free() {
if m.message != nil {
m.message.clear()
m.message = nil
}
}
2023-03-19 01:31:30 +08:00
// ChannelId 返回通道ID
2023-03-19 00:00:09 +08:00
func (m *IPCContext) ChannelId() int64 {
2023-03-19 00:10:52 +08:00
return m.channelId
2023-03-19 00:00:09 +08:00
}
2023-03-19 01:31:30 +08:00
// Message 返回消息内容
2023-03-19 00:00:09 +08:00
func (m *IPCContext) Message() IMessage {
return m.message
}
2023-03-19 01:31:30 +08:00
// Connect 返回当前通道链接
2023-03-19 00:00:09 +08:00
func (m *IPCContext) Connect() net.Conn {
return m.connect
}
2023-03-19 01:31:30 +08:00
// Type 消息类型
2023-03-19 00:00:09 +08:00
func (m *ipcMessage) Type() mt {
return m.t
}
2023-03-19 01:31:30 +08:00
// Data 消息[]byte数据
2023-03-19 00:00:09 +08:00
func (m *ipcMessage) Data() []byte {
return m.v
}
2023-03-19 01:31:30 +08:00
// Length 消息[]byte长度
2023-03-19 00:27:06 +08:00
func (m *ipcMessage) Length() uint32 {
2023-03-19 00:00:09 +08:00
return m.s
}
2023-03-19 01:31:30 +08:00
// JSON 消息转为JSON对象
2023-03-19 00:00:09 +08:00
func (m *ipcMessage) JSON() json.JSON {
return json.NewJSON(m.v)
}
2023-03-19 01:31:30 +08:00
// clear 清空内容
2023-03-19 00:00:09 +08:00
func (m *ipcMessage) clear() {
2023-03-19 01:31:30 +08:00
m.t = mt_invalid
2023-03-19 00:00:09 +08:00
m.v = nil
m.s = 0
}
2023-03-19 15:25:46 +08:00
type connect struct {
writeBuf *bytes.Buffer //
channelId int64 //通道ID
conn net.Conn //通道链接
ipcType IPC_TYPE //IPC类型
ct ChannelType //链接通道类型
handler IPCCallback //
}
// Close 关闭当前ipc通道链接
func (m *connect) Close() {
if m.conn != nil {
m.conn.Close()
m.conn = nil
}
}
// Read 读取内容
func (m *connect) Read(b []byte) (n int, err error) {
if m.ipcType == IPCT_NET {
return m.conn.Read(b)
} else {
n, _, err := m.conn.(*net.UnixConn).ReadFromUnix(b)
return n, err
}
}
2023-03-19 01:31:30 +08:00
// ipcWrite 写入消息
2023-03-19 15:25:46 +08:00
func (m *connect) ipcWrite(messageType mt, channelId int64, data []byte) (n int, err error) {
2023-03-19 00:00:09 +08:00
defer func() {
data = nil
}()
2023-03-19 15:25:46 +08:00
if m.conn == nil {
return 0, errors.New("通道链接未建立成功")
2023-03-19 00:00:09 +08:00
}
var (
dataByteLen = len(data)
)
2023-03-19 00:27:06 +08:00
if dataByteLen > math.MaxUint32 {
2023-03-19 00:00:09 +08:00
return 0, errors.New("超出最大消息长度")
}
2023-03-19 15:25:46 +08:00
binary.Write(m.writeBuf, binary.BigEndian, protocolHeader) //协议头
binary.Write(m.writeBuf, binary.BigEndian, int8(messageType)) //消息类型
binary.Write(m.writeBuf, binary.BigEndian, channelId) //通道Id
binary.Write(m.writeBuf, binary.BigEndian, uint32(dataByteLen)) //数据长度
binary.Write(m.writeBuf, binary.BigEndian, data) //数据
n, err = m.conn.Write(m.writeBuf.Bytes())
m.writeBuf.Reset()
2023-03-19 00:00:09 +08:00
return n, err
}
2023-03-19 01:31:30 +08:00
// ipcRead 读取消息
2023-03-19 15:25:46 +08:00
func (m *connect) ipcRead() {
2023-03-19 00:00:09 +08:00
var ipcType, chnType string
2023-03-19 15:25:46 +08:00
if m.ipcType == IPCT_NET {
2023-03-19 00:00:09 +08:00
ipcType = "[net]"
} else {
ipcType = "[unix]"
}
2023-03-19 15:25:46 +08:00
if m.ct == Ct_Server {
2023-03-19 00:00:09 +08:00
chnType = "[server]"
} else {
chnType = "[client]"
}
defer func() {
logger.Debug("IPC Read Disconnect type:", ipcType, "ChannelType:", chnType, "processType:", common.Args.ProcessType())
2023-03-19 15:25:46 +08:00
m.Close()
2023-03-19 00:00:09 +08:00
}()
for {
header := make([]byte, headerLength)
2023-03-19 15:25:46 +08:00
size, err := m.Read(header)
2023-03-19 00:00:09 +08:00
if err != nil {
logger.Debug("IPC Read【Error】IPCType:", ipcType, "ChannelType:", chnType, "Error:", err)
return
} else if size == 0 {
logger.Debug("IPC Read【Size == 0】IPCType:", ipcType, "ChannelType:", chnType, "header:", header, "Error:", err)
return
}
if size == headerLength {
for i, protocol := range protocolHeader {
if header[i] != protocol {
return
}
}
var (
2023-03-19 00:27:06 +08:00
t int8 //消息类型
channelId int64
dataLen uint32 //数据长度
low, high int32 //
2023-03-19 00:00:09 +08:00
)
//消息类型
low = protocolHeaderLength
2023-03-19 00:27:06 +08:00
high = protocolHeaderLength + messageTypeLength
2023-03-19 00:00:09 +08:00
err = binary.Read(bytes.NewReader(header[low:high]), binary.BigEndian, &t)
if err != nil {
logger.Debug("binary.Read.length: ", err)
return
}
2023-03-19 00:27:06 +08:00
//通道ID
low = high
high = high + channelIdLength
err = binary.Read(bytes.NewReader(header[low:high]), binary.BigEndian, &channelId)
if err != nil {
logger.Debug("binary.Read.length: ", err)
return
}
2023-03-19 00:00:09 +08:00
//数据长度
low = high
high = high + dataByteLength
err = binary.Read(bytes.NewReader(header[low:high]), binary.BigEndian, &dataLen)
if err != nil {
logger.Debug("binary.Read.length: ", err)
return
}
//数据
dataByte := make([]byte, dataLen)
if dataLen > 0 {
2023-03-19 15:25:46 +08:00
size, err = m.Read(dataByte)
2023-03-19 00:00:09 +08:00
}
if err != nil {
logger.Debug("binary.Read.data: ", err)
return
}
2023-03-19 15:25:46 +08:00
m.handler(&IPCContext{
2023-03-19 00:27:06 +08:00
channelId: channelId,
2023-03-19 15:25:46 +08:00
ipcType: m.ipcType,
connect: m.conn,
2023-03-19 00:00:09 +08:00
message: &ipcMessage{
t: mt(t),
s: dataLen,
v: dataByte,
},
})
} else {
logger.Debug("无效的 != headerLength")
break
}
}
}