perf: docker and containerd are supported to collect logs (#1338)

This commit is contained in:
yangkaa 2022-09-26 16:55:36 +08:00
parent a5d5add245
commit d695ac6ffc
9 changed files with 500 additions and 279 deletions

View File

@ -0,0 +1,115 @@
package sources
import (
"context"
"fmt"
"github.com/docker/docker/api/types"
"github.com/sirupsen/logrus"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"time"
)
const (
// ContainerRuntimeDocker docker runtime
ContainerRuntimeDocker = "docker"
// ContainerRuntimeContainerd containerd runtime
ContainerRuntimeContainerd = "containerd"
// RuntimeEndpointDocker docker runtime endpoint
RuntimeEndpointDocker = "/var/run/dockershim.sock"
// RuntimeEndpointContainerd containerd runtime endpoint
RuntimeEndpointContainerd = "/run/containerd/containerd.sock"
)
const (
// CONTAINER_ACTION_START is start container event action
CONTAINER_ACTION_START = "start"
// CONTAINER_ACTION_STOP is stop container event action
CONTAINER_ACTION_STOP = "stop"
// CONTAINER_ACTION_CREATE is create container event action
CONTAINER_ACTION_CREATE = "create"
// CONTAINER_ACTION_DESTROY is destroy container event action
CONTAINER_ACTION_DESTROY = "destroy"
// CONTAINER_ACTION_DIE is die container event action
CONTAINER_ACTION_DIE = "die"
)
type ContainerDesc struct {
ContainerRuntime string
// Info is extra information of the Container. The key could be arbitrary string, and
// value should be in json format. The information could include anything useful for
// debug, e.g. pid for linux container based container runtime.
// It should only be returned non-empty when Verbose is true.
Info map[string]string
*runtimeapi.ContainerStatus
// Docker container json
*types.ContainerJSON
}
func (c *ContainerDesc) GetLogPath() string {
if c.ContainerRuntime == ContainerRuntimeDocker {
logrus.Infof("docker container log path %s", c.ContainerJSON.LogPath)
return c.ContainerJSON.LogPath
}
logrus.Infof("containerd container log path %s", c.ContainerStatus.GetLogPath())
return c.ContainerStatus.GetLogPath()
}
func (c *ContainerDesc) GetId() string {
if c.ContainerRuntime == ContainerRuntimeDocker {
logrus.Infof("docker container id %s", c.ContainerJSON.ID)
return c.ContainerJSON.ID
}
logrus.Infof("containerd container id %s", c.ContainerStatus.GetId())
return c.ContainerStatus.GetId()
}
// ContainerImageCli container image client
type ContainerImageCli interface {
ListContainers() ([]*runtimeapi.Container, error)
InspectContainer(containerID string) (*ContainerDesc, error)
WatchContainers(ctx context.Context, cchan chan ContainerEvent) error
}
// ClientFactory client factory
type ClientFactory interface {
NewClient(endpoint string, timeout time.Duration) (ContainerImageCli, error)
}
// NewContainerImageClient new container image client
func NewContainerImageClient(containerRuntime, endpoint string, timeout time.Duration) (c ContainerImageCli, err error) {
logrus.Infof("create container client runtime %s endpoint %s", containerRuntime, endpoint)
switch containerRuntime {
case ContainerRuntimeDocker:
factory := &dockerClientFactory{}
c, err = factory.NewClient(
endpoint, timeout,
)
case ContainerRuntimeContainerd:
factory := &containerdClientFactory{}
c, err = factory.NewClient(
endpoint, timeout,
)
return
default:
err = fmt.Errorf("unknown runtime %s", containerRuntime)
return
}
return
}
//ContainerEvent container event
type ContainerEvent struct {
Action string
Container *ContainerDesc
}
func CacheContainer(cchan chan ContainerEvent, cs ...ContainerEvent) {
for _, container := range cs {
logrus.Debugf("found a container %s %s", container.Container.GetMetadata().GetName(), container.Action)
cchan <- container
}
}

View File

@ -0,0 +1,145 @@
package sources
import (
"context"
"github.com/containerd/containerd"
containerdEventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/events"
"github.com/containerd/typeurl"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"os"
"strings"
"github.com/goodrain/rainbond/util/criutil"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"time"
)
const (
DockerContainerdSock = "/var/run/docker/containerd/containerd.sock"
RunDockerContainerdSock = "/run/docker/containerd/containerd.sock"
ContainerdSock = "/run/containerd/containerd.sock"
)
type containerdClientFactory struct{}
func (f containerdClientFactory) NewClient(endpoint string, timeout time.Duration) (ContainerImageCli, error) {
var (
containerdCli *containerd.Client
runtimeClient runtimeapi.RuntimeServiceClient
grpcConn *grpc.ClientConn
err error
)
runtimeClient, grpcConn, err = criutil.GetRuntimeClient(context.Background(), endpoint, time.Second*3)
if err != nil {
return nil, err
}
if os.Getenv("CONTAINERD_SOCK") != "" {
endpoint = os.Getenv("CONTAINERD_SOCK")
}
containerdCli, err = containerd.New(endpoint, containerd.WithTimeout(timeout))
if err != nil {
return nil, err
}
return &containerdClientImpl{
client: containerdCli,
conn: grpcConn,
runtimeClient: runtimeClient,
}, nil
}
type containerdClientImpl struct {
client *containerd.Client
conn *grpc.ClientConn
runtimeClient runtimeapi.RuntimeServiceClient
}
func (c *containerdClientImpl) ListContainers() ([]*runtimeapi.Container, error) {
containers, err := c.runtimeClient.ListContainers(context.Background(), &runtimeapi.ListContainersRequest{})
if err != nil {
return nil, err
}
return containers.GetContainers(), nil
}
func (c *containerdClientImpl) InspectContainer(containerID string) (*ContainerDesc, error) {
containerStatus, err := c.runtimeClient.ContainerStatus(context.Background(), &runtimeapi.ContainerStatusRequest{
ContainerId: containerID,
Verbose: true,
})
if err != nil {
return nil, err
}
return &ContainerDesc{
ContainerRuntime: ContainerRuntimeContainerd,
ContainerStatus: containerStatus.GetStatus(),
Info: containerStatus.GetInfo(),
}, nil
}
func (c *containerdClientImpl) WatchContainers(ctx context.Context, cchan chan ContainerEvent) error {
eventsClient := c.client.EventService()
eventsCh, errCh := eventsClient.Subscribe(ctx)
var err error
for {
var e *events.Envelope
select {
case e = <-eventsCh:
case err = <-errCh:
return err
}
if e != nil {
if e.Event != nil {
ev, err := typeurl.UnmarshalAny(e.Event)
if err != nil {
logrus.Warn("cannot unmarshal an event from Any")
continue
}
switch ev.(type) {
case *containerdEventstypes.TaskStart:
evVal := ev.(*containerdEventstypes.TaskStart)
// PATCH: if it's start event of pause container
// we would skip it.
// QUESTION: what if someone's container ID equals the other Sandbox ID?
targetContainerID := evVal.ContainerID
resp, _ := c.runtimeClient.ListPodSandbox(context.Background(),
&runtimeapi.ListPodSandboxRequest{
Filter: &runtimeapi.PodSandboxFilter{
Id: targetContainerID,
},
})
if resp != nil && len(resp.Items) == 1 {
// it's sandbox container! skip this one!
logrus.Infof("skipped start event of container %s since it's sandbox container", targetContainerID)
continue
}
container, err := c.InspectContainer(targetContainerID)
if err != nil {
if !strings.Contains(err.Error(), "No such container") {
logrus.Errorf("get container detail info failure %s", err.Error())
}
break
}
CacheContainer(cchan, ContainerEvent{Action: CONTAINER_ACTION_START, Container: container})
case containerdEventstypes.TaskExit, containerdEventstypes.TaskDelete:
var targetContainerID string
evVal, ok := ev.(*containerdEventstypes.TaskExit)
if ok {
targetContainerID = evVal.ContainerID
} else {
targetContainerID = ev.(*containerdEventstypes.TaskDelete).ContainerID
}
container, err := c.InspectContainer(targetContainerID)
if err != nil {
if !strings.Contains(err.Error(), "No such container") {
logrus.Errorf("get container detail info failure %s", err.Error())
}
break
}
CacheContainer(cchan, ContainerEvent{Action: CONTAINER_ACTION_STOP, Container: container})
}
}
}
}
}

View File

@ -0,0 +1,118 @@
package sources
import (
"context"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
dockercli "github.com/docker/docker/client"
"github.com/sirupsen/logrus"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"os"
"strings"
"time"
)
var handleAction = []string{CONTAINER_ACTION_CREATE, CONTAINER_ACTION_START, CONTAINER_ACTION_STOP, CONTAINER_ACTION_DIE, CONTAINER_ACTION_DESTROY}
func checkEventAction(action string) bool {
for _, enable := range handleAction {
if enable == action {
return true
}
}
return false
}
type dockerClientFactory struct{}
var _ ClientFactory = &dockerClientFactory{}
func (f dockerClientFactory) NewClient(endpoint string, timeout time.Duration) (ContainerImageCli, error) {
if os.Getenv("DOCKER_API_VERSION") == "" {
os.Setenv("DOCKER_API_VERSION", "1.40")
}
cli, err := dockercli.NewClientWithOpts(dockercli.FromEnv)
if err != nil {
return nil, err
}
return &dockerClientImpl{
client: cli,
}, nil
}
var _ ContainerImageCli = &dockerClientImpl{}
type dockerClientImpl struct {
client *dockercli.Client
}
func (d *dockerClientImpl) ListContainers() ([]*runtimeapi.Container, error) {
lictctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
containers, err := d.client.ContainerList(lictctx, types.ContainerListOptions{})
if err != nil {
return nil, err
}
// convert to runtimeapi.Container
var runtimeContainers []*runtimeapi.Container
for _, container := range containers {
runtimeContainers = append(runtimeContainers, &runtimeapi.Container{
Id: container.ID,
Metadata: &runtimeapi.ContainerMetadata{
Name: container.Names[0],
},
Image: &runtimeapi.ImageSpec{
Image: container.Image,
},
ImageRef: container.ImageID,
Labels: container.Labels,
CreatedAt: container.Created,
})
}
return runtimeContainers, nil
}
func (d *dockerClientImpl) InspectContainer(containerID string) (*ContainerDesc, error) {
inspectctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
container, err := d.client.ContainerInspect(inspectctx, containerID)
if err != nil {
return nil, err
}
return &ContainerDesc{
ContainerRuntime: ContainerRuntimeDocker,
ContainerJSON: &container,
}, nil
}
func (d *dockerClientImpl) WatchContainers(ctx context.Context, cchan chan ContainerEvent) error {
containerFileter := filters.NewArgs()
containerFileter.Add("type", "container")
eventchan, eventerrchan := d.client.Events(ctx, types.EventsOptions{
Filters: containerFileter,
})
for {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-eventerrchan:
return err
case event, ok := <-eventchan:
if !ok {
return fmt.Errorf("event chan is closed")
}
if event.Type == events.ContainerEventType && checkEventAction(event.Action) {
container, err := d.InspectContainer(event.ID)
if err != nil {
if !strings.Contains(err.Error(), "No such container") {
logrus.Errorf("get container detail info failure %s", err.Error())
}
break
}
CacheContainer(cchan, ContainerEvent{Action: event.Action, Container: container})
}
}
}
}

View File

@ -24,8 +24,7 @@ import (
"fmt"
"github.com/containerd/containerd"
"github.com/containerd/containerd/namespaces"
"github.com/goodrain/rainbond/util/criutil"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"github.com/goodrain/rainbond/builder/sources"
"os"
"path"
"time"
@ -36,6 +35,8 @@ import (
etcdutil "github.com/goodrain/rainbond/util/etcd"
"github.com/sirupsen/logrus"
"github.com/spf13/pflag"
// Register grpc event types
_ "github.com/containerd/containerd/api/events"
)
var (
@ -49,12 +50,6 @@ var (
exitChan = make(chan struct{})
)
const (
DockerContainerdSock = "/var/run/docker/containerd/containerd.sock"
RunDockerContainerdSock = "/run/docker/containerd/containerd.sock"
ContainerdSock = "/run/containerd/containerd.sock"
)
//Init init config
func Init() error {
if initialized {
@ -110,8 +105,10 @@ type Conf struct {
//enable collect docker container log
EnableCollectLog bool
//DockerCli *dockercli.Client
RuntimeServiceCli runtimeapi.RuntimeServiceClient
ContainerdCli *containerd.Client
//ContainerdCli *containerd.Client
ContainerRuntime string
RuntimeEndpoint string
ContainerImageCli sources.ContainerImageCli
EtcdCli *client.Client
LicPath string
@ -202,6 +199,8 @@ func (a *Conf) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&a.ImageRepositoryHost, "image-repo-host", "goodrain.me", "The host of image repository")
fs.StringVar(&a.GatewayVIP, "gateway-vip", "", "The vip of gateway")
fs.StringVar(&a.HostsFile, "hostsfile", "/newetc/hosts", "/etc/hosts mapped path in the container. eg. /etc/hosts:/tmp/hosts. Do not set hostsfile to /etc/hosts")
fs.StringVar(&a.ContainerRuntime, "container-runtime", sources.ContainerRuntimeContainerd, "container runtime, support docker and containerd")
fs.StringVar(&a.RuntimeEndpoint, "runtime-endpoint", sources.RuntimeEndpointContainerd, "container runtime endpoint")
}
//SetLog 设置log
@ -239,26 +238,13 @@ func newClient(namespace, address string, opts ...containerd.ClientOpt) (*contai
//ParseClient handle config and create some api
func (a *Conf) ParseClient(ctx context.Context, etcdClientArgs *etcdutil.ClientArgs) (err error) {
//a.DockerCli, err = dockercli.NewEnvClient()
//if err != nil {
// return err
//}
address := "unix:///var/run/dockershim.sock"
if os.Getenv("RUNTIME_ENDPOINT") != "" {
address = os.Getenv("RUNTIME_ENDPOINT")
}
runtimeClient, _, err := criutil.GetRuntimeClient(context.Background(), address, time.Second*3)
logrus.Infof("begin create container image client, runtime [%s] runtime endpoint [%s]", a.ContainerRuntime, a.RuntimeEndpoint, a.EtcdEndpoints)
containerImageCli, err := sources.NewContainerImageClient(a.ContainerRuntime, a.RuntimeEndpoint, time.Second*3)
if err != nil {
return
}
a.RuntimeServiceCli = runtimeClient
client, ctx, _, err := newClient("", ContainerdSock)
if err != nil {
logrus.Errorf("new client failed %v", err)
return err
}
a.ContainerdCli = client
logrus.Infof("begin create etcd client: %s", a.EtcdEndpoints)
a.ContainerImageCli = containerImageCli
logrus.Infof("create container image client success\n begin create etcd client: %s", a.EtcdEndpoints)
for {
a.EtcdCli, err = etcdutil.NewClient(ctx, etcdClientArgs)
if err != nil {

10
go.mod
View File

@ -14,6 +14,7 @@ require (
github.com/bluebreezecf/opentsdb-goclient v0.0.0-20190921120552-796138372df3
github.com/cockroachdb/cmux v0.0.0-20170110192607-30d10be49292 // indirect
github.com/containerd/containerd v1.5.7
github.com/containerd/typeurl v1.0.2
github.com/coreos/etcd v3.3.17+incompatible
github.com/creack/pty v1.1.11 // indirect
github.com/crossplane/crossplane-runtime v0.10.0
@ -88,7 +89,6 @@ require (
github.com/twinj/uuid v1.0.0
github.com/urfave/cli v1.22.2
github.com/yudai/umutex v0.0.0-20150817080136-18216d265c6b
github.com/containerd/typeurl v1.0.2
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/net v0.0.0-20211209124913-491a49abca63
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5 // indirect
@ -114,7 +114,11 @@ require (
sigs.k8s.io/yaml v1.2.0
)
require golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
require (
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
k8s.io/klog/v2 v2.5.0
k8s.io/utils v0.0.0-20201110183641-67b214c5f920
)
require (
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
@ -269,10 +273,8 @@ require (
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/gengo v0.0.0-20201113003025-83324d819ded // indirect
k8s.io/klog/v2 v2.5.0 // indirect
k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd // indirect
k8s.io/kubectl v0.20.4 // indirect
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
sigs.k8s.io/kustomize v2.0.3+incompatible // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.0.2 // indirect
)

View File

@ -21,6 +21,7 @@ package cluster
import (
"context"
"fmt"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"strings"
"github.com/docker/distribution/reference"
@ -29,7 +30,6 @@ import (
"github.com/oam-dev/kubevela/pkg/utils/apply"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/yaml"
)

View File

@ -154,6 +154,7 @@ func decodeFunc(rdr io.Reader) func() (*Message, error) {
continue
}
logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
break
}
return msg, err
}

View File

@ -22,10 +22,7 @@ import (
"context"
"encoding/json"
"fmt"
containerdEventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/events"
"github.com/containerd/typeurl"
"github.com/docker/cli/templates"
"github.com/goodrain/rainbond/builder/sources"
"io"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"strings"
@ -36,8 +33,6 @@ import (
"github.com/docker/docker/api/types"
"github.com/goodrain/rainbond/cmd/node/option"
"text/template"
// Register grpc event types
_ "github.com/containerd/containerd/api/events"
)
@ -50,44 +45,10 @@ type ContainerLogManage struct {
ctx context.Context
cancel context.CancelFunc
conf *option.Conf
cchan chan ContainerEvent
cchan chan sources.ContainerEvent
containerLogs sync.Map
}
const (
// CONTAINER_ACTION_START is start container event action
CONTAINER_ACTION_START = "start"
// CONTAINER_ACTION_STOP is stop container event action
CONTAINER_ACTION_STOP = "stop"
)
const (
// CONTAINER_STATE_CREATED is created state
CONTAINER_STATE_CREATED = "created"
// CONTAINER_STATE_RUNNING is running state
CONTAINER_STATE_RUNNING = "running"
// CONTAINER_STATE_PAUSED is paused state
CONTAINER_STATE_PAUSED = "paused"
// CONTAINER_STATE_RESTARTING is restarting state
CONTAINER_STATE_RESTARTING = "restarting"
// CONTAINER_STATE_REMOVING is removing state
CONTAINER_STATE_REMOVING = "removing"
// CONTAINER_STATE_EXITED is exited state
CONTAINER_STATE_EXITED = "exited"
// CONTAINER_STATE_DEAD is dead state
CONTAINER_STATE_DEAD = "dead"
// CONTAINER_STATE_UNKNOWN is unknown state
CONTAINER_STATE_UNKNOWN = "unknown"
)
//CreatContainerLogManage create a container log manage
func CreatContainerLogManage(conf *option.Conf) *ContainerLogManage {
ctx, cancel := context.WithCancel(context.Background())
@ -95,7 +56,7 @@ func CreatContainerLogManage(conf *option.Conf) *ContainerLogManage {
ctx: ctx,
cancel: cancel,
conf: conf,
cchan: make(chan ContainerEvent, 100),
cchan: make(chan sources.ContainerEvent, 100),
}
}
@ -155,11 +116,13 @@ func (c *ContainerLogManage) handleLogger() {
return
case cevent := <-c.cchan:
switch cevent.Action {
case CONTAINER_ACTION_START:
//loggerType := cevent.Container.HostConfig.LogConfig.Type
//if loggerType != "json-file" && loggerType != "syslog" {
// continue
//}
case sources.CONTAINER_ACTION_START, sources.CONTAINER_ACTION_CREATE:
if cevent.Container.ContainerRuntime == sources.ContainerRuntimeDocker {
loggerType := cevent.Container.HostConfig.LogConfig.Type
if loggerType != "json-file" && loggerType != "syslog" {
continue
}
}
if logger, ok := c.containerLogs.Load(cevent.Container.GetId()); ok {
clog, okf := logger.(*ContainerLog)
if okf {
@ -204,7 +167,7 @@ func (c *ContainerLogManage) handleLogger() {
}
}()
}
case CONTAINER_ACTION_STOP:
case sources.CONTAINER_ACTION_STOP, sources.CONTAINER_ACTION_DESTROY, sources.CONTAINER_ACTION_DIE:
if logger, ok := c.containerLogs.Load(cevent.Container.GetId()); ok {
clog, okf := logger.(*ContainerLog)
if okf {
@ -218,14 +181,7 @@ func (c *ContainerLogManage) handleLogger() {
}
}
//ContainerEvent container event
type ContainerEvent struct {
Action string
//Container types.ContainerJSON
Container *runtimeapi.ContainerStatus
}
func (c *ContainerLogManage) cacheContainer(cs ...ContainerEvent) {
func (c *ContainerLogManage) cacheContainer(cs ...sources.ContainerEvent) {
for _, container := range cs {
logrus.Debugf("found a container %s %s", container.Container.GetMetadata().GetName(), container.Action)
c.cchan <- container
@ -245,12 +201,12 @@ func (c *ContainerLogManage) cacheContainer(cs ...ContainerEvent) {
//}
func (c *ContainerLogManage) listContainer() []*runtimeapi.Container {
containers, err := c.conf.RuntimeServiceCli.ListContainers(context.Background(), &runtimeapi.ListContainersRequest{})
containers, err := c.conf.ContainerImageCli.ListContainers()
if err != nil {
logrus.Errorf("list containers failure.%s", err.Error())
containers, _ = c.conf.RuntimeServiceCli.ListContainers(context.Background(), &runtimeapi.ListContainersRequest{})
containers, _ = c.conf.ContainerImageCli.ListContainers()
}
return containers.GetContainers()
return containers
}
func (c *ContainerLogManage) loollist() {
@ -266,12 +222,14 @@ func (c *ContainerLogManage) loollist() {
if cj.GetLogPath() == "" {
continue
}
//loggerType := cj.HostConfig.LogConfig.Type
//if loggerType != "json-file" && loggerType != "syslog" {
// continue
//}
if cj.ContainerRuntime == sources.ContainerRuntimeDocker {
loggerType := cj.ContainerJSON.HostConfig.LogConfig.Type
if loggerType != "json-file" && loggerType != "syslog" {
continue
}
}
if _, exist := c.containerLogs.Load(container.GetId()); !exist {
c.cacheContainer(ContainerEvent{Action: "start", Container: cj})
c.cacheContainer(sources.ContainerEvent{Action: sources.CONTAINER_ACTION_START, Container: cj})
}
}
}
@ -291,7 +249,7 @@ func (c *ContainerLogManage) listAndWatchContainer(errchan chan error) {
continue
}
logrus.Debugf("found a container %s ", container.GetMetadata().GetName())
c.cacheContainer(ContainerEvent{Action: "start", Container: container})
c.cacheContainer(sources.ContainerEvent{Action: sources.CONTAINER_ACTION_START, Container: container})
}
logrus.Info("list containers complete, start watch container")
for {
@ -309,139 +267,18 @@ type Out struct {
Event string
}
func parseTemplate(format string) (*template.Template, error) {
aliases := map[string]string{
"json": "{{json .}}",
}
if alias, ok := aliases[format]; ok {
format = alias
}
return templates.Parse(format)
}
func (c *ContainerLogManage) watchContainer() error {
eventsClient := c.conf.ContainerdCli.EventService()
eventsCh, errCh := eventsClient.Subscribe(context.Background())
var err error
for {
var e *events.Envelope
select {
case e = <-eventsCh:
case err = <-errCh:
return err
}
if e != nil {
if e.Event != nil {
ev, err := typeurl.UnmarshalAny(e.Event)
if err != nil {
logrus.Warn("cannot unmarshal an event from Any")
continue
}
switch ev.(type) {
case *containerdEventstypes.TaskStart:
evVal := ev.(*containerdEventstypes.TaskStart)
// PATCH: if it's start event of pause container
// we would skip it.
// QUESTION: what if someone's container ID equals the other Sandbox ID?
targetContainerID := evVal.ContainerID
resp, _ := c.conf.RuntimeServiceCli.ListPodSandbox(context.Background(),
&runtimeapi.ListPodSandboxRequest{
Filter: &runtimeapi.PodSandboxFilter{
Id: targetContainerID,
},
})
if resp != nil && len(resp.Items) == 1 {
// it's sandbox container! skip this one!
logrus.Infof("skipped start event of container %s since it's sandbox container", targetContainerID)
return nil
}
container, err := c.getContainer(targetContainerID)
if err != nil {
if !strings.Contains(err.Error(), "No such container") {
logrus.Errorf("get container detail info failure %s", err.Error())
}
break
}
c.cacheContainer(ContainerEvent{Action: CONTAINER_ACTION_START, Container: container})
case containerdEventstypes.TaskExit, containerdEventstypes.TaskDelete:
var targetContainerID string
evVal, ok := ev.(*containerdEventstypes.TaskExit)
if ok {
targetContainerID = evVal.ContainerID
} else {
targetContainerID = ev.(*containerdEventstypes.TaskDelete).ContainerID
}
container, err := c.getContainer(targetContainerID)
if err != nil {
if !strings.Contains(err.Error(), "No such container") {
logrus.Errorf("get container detail info failure %s", err.Error())
}
break
}
c.cacheContainer(ContainerEvent{Action: CONTAINER_ACTION_STOP, Container: container})
}
}
}
}
return c.conf.ContainerImageCli.WatchContainers(c.ctx, c.cchan)
}
//func (c *ContainerLogManage) watchContainer() error {
//
// return nil
//containerFileter := filters.NewArgs()
//containerFileter.Add("type", "container")
//eventchan, eventerrchan := c.conf.DockerCli.Events(c.ctx, types.EventsOptions{
// Filters: containerFileter,
//})
//for {
// select {
// case <-c.ctx.Done():
// return nil
// case err := <-eventerrchan:
// return err
// case event, ok := <-eventchan:
// if !ok {
// return fmt.Errorf("event chan is closed")
// }
// if event.Type == events.ContainerEventType && checkEventAction(event.Action) {
// container, err := c.getContainer(event.ID)
// if err != nil {
// if !strings.Contains(err.Error(), "No such container") {
// logrus.Errorf("get container detail info failure %s", err.Error())
// }
// break
// }
// c.cacheContainer(ContainerEvent{Action: event.Action, Container: container})
// }
// }
//}
//}
//func (c *ContainerLogManage) getContainer(containerID string) (types.ContainerJSON, error) {
// ctx, cancel := context.WithTimeout(c.ctx, time.Second*5)
// defer cancel()
// return c.conf.DockerCli.ContainerInspect(ctx, containerID)
//}
func (c *ContainerLogManage) getContainer(containerID string) (*runtimeapi.ContainerStatus, error) {
status, err := c.conf.RuntimeServiceCli.ContainerStatus(context.Background(), &runtimeapi.ContainerStatusRequest{
ContainerId: containerID,
})
if err != nil {
return nil, err
}
return status.GetStatus(), nil
}
var handleAction = []string{"create", "start", "stop", "die", "destroy"}
func checkEventAction(action string) bool {
for _, enable := range handleAction {
if enable == action {
return true
}
}
return false
func (c *ContainerLogManage) getContainer(containerID string) (*sources.ContainerDesc, error) {
return c.conf.ContainerImageCli.InspectContainer(containerID)
}
//func createContainerLog(ctx context.Context, container types.ContainerJSON, reader *LogFile) *ContainerLog {
@ -454,14 +291,14 @@ func checkEventAction(action string) bool {
// }
//}
func createContainerLog(ctx context.Context, container *runtimeapi.ContainerStatus, reader *LogFile, conf *option.Conf) *ContainerLog {
func createContainerLog(ctx context.Context, container *sources.ContainerDesc, reader *LogFile, conf *option.Conf) *ContainerLog {
cctx, cancel := context.WithCancel(ctx)
return &ContainerLog{
ctx: cctx,
cancel: cancel,
ContainerStatus: container,
reader: reader,
conf: conf,
ctx: cctx,
cancel: cancel,
ContainerDesc: container,
reader: reader,
conf: conf,
}
}
@ -471,7 +308,8 @@ type ContainerLog struct {
cancel context.CancelFunc
conf *option.Conf
//types.ContainerJSON
*runtimeapi.ContainerStatus
//*runtimeapi.ContainerStatus
*sources.ContainerDesc
LogCopier *Copier
LogDriver []Logger
reader *LogFile
@ -505,35 +343,36 @@ type ContainerLoggerConfig struct {
// CRI Interface does not currently support obtaining container environment variables
// Therefore, obtaining log-driven configuration from environment variables is not supported for the time being.
func getLoggerConfig(envs []string) []*ContainerLoggerConfig {
//var configs = make(map[string]*ContainerLoggerConfig)
//var envMap = make(map[string]string, len(envs))
//for _, v := range envs {
// info := strings.SplitN(v, "=", 2)
// if len(info) > 1 {
// envMap[strings.ToLower(info[0])] = info[1]
// if strings.HasPrefix(info[0], "LOGGER_DRIVER_NAME") {
// if _, exist := configs[info[1]]; !exist {
// configs[info[1]] = &ContainerLoggerConfig{
// Name: info[1],
// }
// }
// }
// }
//}
//var re []*ContainerLoggerConfig
//for i, c := range configs {
// if config, ok := envMap[strings.ToLower("LOGGER_DRIVER_OPT_"+c.Name)]; ok {
// var options = make(map[string]string)
// json.Unmarshal([]byte(config), &options)
// configs[i].Options = options
// }
// re = append(re, configs[i])
//}
//return re
// TODO: get logger config from container status
return []*ContainerLoggerConfig{{
Name: "streamlog",
}}
var configs = make(map[string]*ContainerLoggerConfig)
var envMap = make(map[string]string, len(envs))
for _, v := range envs {
info := strings.SplitN(v, "=", 2)
if len(info) > 1 {
envMap[strings.ToLower(info[0])] = info[1]
if strings.HasPrefix(info[0], "LOGGER_DRIVER_NAME") {
if _, exist := configs[info[1]]; !exist {
configs[info[1]] = &ContainerLoggerConfig{
Name: info[1],
}
}
}
}
}
var re []*ContainerLoggerConfig
for i, c := range configs {
if config, ok := envMap[strings.ToLower("LOGGER_DRIVER_OPT_"+c.Name)]; ok {
var options = make(map[string]string)
_ = json.Unmarshal([]byte(config), &options)
configs[i].Options = options
}
re = append(re, configs[i])
}
if len(re) == 0 {
return []*ContainerLoggerConfig{{
Name: "streamlog",
}}
}
return re
}
//ErrNeglectedContainer not define logger name
@ -609,20 +448,35 @@ type ContainerEnv struct {
Value string
}
func (container *ContainerLog) InspectContainer() (*Info, error) {
r, err := container.conf.RuntimeServiceCli.ContainerStatus(context.Background(), &runtimeapi.ContainerStatusRequest{
ContainerId: container.ContainerStatus.GetId(),
Verbose: true,
})
if err != nil {
logrus.Errorf("failed to get container %s status: %v", container.ContainerStatus.GetMetadata().GetName(), err)
return nil, err
func (container *ContainerLog) provideLoggerInfo() (*Info, error) {
if container.ContainerRuntime == sources.ContainerRuntimeDocker {
return container.provideDockerdLoggerInfo()
}
logrus.Debugf("container %s status: %v [%v]", container.ContainerStatus.GetMetadata().GetName(), *r.Status, r.Info)
return container.provideContainerdLoggerInfo()
}
func (container *ContainerLog) provideDockerdLoggerInfo() (*Info, error) {
createTime, _ := time.Parse(RFC3339NanoFixed, container.Created)
containerJson := container.ContainerJSON
return &Info{
ContainerID: containerJson.ID,
ContainerName: containerJson.Name,
ContainerEntrypoint: containerJson.Path,
ContainerArgs: containerJson.Args,
ContainerImageName: containerJson.Config.Image,
ContainerCreated: createTime,
ContainerEnv: containerJson.Config.Env,
ContainerLabels: containerJson.Config.Labels,
DaemonName: "docker",
}, nil
}
func (container *ContainerLog) provideContainerdLoggerInfo() (*Info, error) {
logrus.Debugf("container %s status: %v [%v]", container.ContainerStatus.GetMetadata().GetName(), *container.ContainerStatus, container.Info)
// NOTE: unmarshal the extra info to get the container envs and mounts data.
// Mounts should include both image volume and container mount.
extraContainerInfo := new(containerInfo)
err = json.Unmarshal([]byte(r.Info["info"]), extraContainerInfo)
err := json.Unmarshal([]byte(container.Info["info"]), extraContainerInfo)
if err != nil {
logrus.Warnf("failed to unmarshal container info: %v", err)
}
@ -633,7 +487,7 @@ func (container *ContainerLog) InspectContainer() (*Info, error) {
containerEnvs = append(containerEnvs, fmt.Sprintf("%s=%s", ce.Key, ce.Value))
}
}
createTime, _ := time.Parse(RFC3339NanoFixed, string(container.ContainerStatus.GetCreatedAt()))
createTime := time.Unix(container.ContainerStatus.GetCreatedAt(), 0)
return &Info{
ContainerID: container.ContainerStatus.GetId(),
ContainerName: container.ContainerStatus.GetMetadata().GetName(),
@ -643,17 +497,17 @@ func (container *ContainerLog) InspectContainer() (*Info, error) {
ContainerCreated: createTime,
ContainerEnv: containerEnvs,
ContainerLabels: container.ContainerStatus.GetLabels(),
DaemonName: "cri",
DaemonName: "containerd",
}, nil
}
// startLogger starts a new logger driver for the container.
func (container *ContainerLog) startLogger() ([]Logger, error) {
info, err := container.InspectContainer()
info, err := container.provideLoggerInfo()
if err != nil {
return nil, err
}
configs := getLoggerConfig([]string{})
configs := getLoggerConfig(info.ContainerEnv)
var loggers []Logger
for _, config := range configs {
initDriver, err :=
@ -664,7 +518,6 @@ func (container *ContainerLog) startLogger() ([]Logger, error) {
continue
}
info.Config = config.Options
info.DaemonName = "cri"
l, err := initDriver(*info)
if err != nil {
logrus.Warnf("init container log driver failure %s", err.Error())

View File

@ -142,13 +142,14 @@ func (n *NodeManager) Start(errchan chan error) error {
} else {
logrus.Infof("this node(%s) is not compute node or disable collect container log ,do not start container log manage", n.currentNode.Role)
}
//if n.cfg.EnableImageGC {
// if n.currentNode.Role.HasRule(client.ManageNode) && !n.currentNode.Role.HasRule(client.ComputeNode) {
// n.imageGCManager.SetServiceImages(n.controller.ListServiceImages())
// go n.imageGCManager.Start()
// }
//}
//TODO: imageGCManager with containerd
if n.cfg.EnableImageGC && n.cfg.ContainerRuntime == "docker" {
logrus.Info("Start the image garbage collection mechanism")
if n.currentNode.Role.HasRule(client.ManageNode) && !n.currentNode.Role.HasRule(client.ComputeNode) {
n.imageGCManager.SetServiceImages(n.controller.ListServiceImages())
go n.imageGCManager.Start()
}
}
go n.monitor.Start(errchan)
go n.heartbeat()