package service import ( "context" "encoding/json" "fmt" "strconv" "time" "github.com/1Panel-dev/1Panel/agent/app/repo" "github.com/1Panel-dev/1Panel/agent/app/dto" "github.com/1Panel-dev/1Panel/agent/app/model" "github.com/1Panel-dev/1Panel/agent/constant" "github.com/1Panel-dev/1Panel/agent/global" "github.com/1Panel-dev/1Panel/agent/utils/common" "github.com/robfig/cron/v3" "github.com/shirou/gopsutil/v3/cpu" "github.com/shirou/gopsutil/v3/disk" "github.com/shirou/gopsutil/v3/load" "github.com/shirou/gopsutil/v3/mem" "github.com/shirou/gopsutil/v3/net" ) type MonitorService struct { DiskIO chan ([]disk.IOCountersStat) NetIO chan ([]net.IOCountersStat) } var monitorCancel context.CancelFunc type IMonitorService interface { Run() LoadMonitorData(req dto.MonitorSearch) ([]dto.MonitorData, error) LoadSetting() (*dto.MonitorSetting, error) UpdateSetting(key, value string) error CleanData() error saveIODataToDB(ctx context.Context, interval float64) saveNetDataToDB(ctx context.Context, interval float64) } func NewIMonitorService() IMonitorService { return &MonitorService{ DiskIO: make(chan []disk.IOCountersStat, 2), NetIO: make(chan []net.IOCountersStat, 2), } } func (m *MonitorService) LoadMonitorData(req dto.MonitorSearch) ([]dto.MonitorData, error) { loc, _ := time.LoadLocation(common.LoadTimeZoneByCmd()) req.StartTime = req.StartTime.In(loc) req.EndTime = req.EndTime.In(loc) var data []dto.MonitorData if req.Param == "all" || req.Param == "cpu" || req.Param == "memory" || req.Param == "load" { bases, err := monitorRepo.GetBase(repo.WithByCreatedAt(req.StartTime, req.EndTime)) if err != nil { return nil, err } var itemData dto.MonitorData itemData.Param = "base" for _, base := range bases { itemData.Date = append(itemData.Date, base.CreatedAt) itemData.Value = append(itemData.Value, base) } data = append(data, itemData) } if req.Param == "all" || req.Param == "io" { bases, err := monitorRepo.GetIO(repo.WithByCreatedAt(req.StartTime, req.EndTime)) if err != nil { return nil, err } var itemData dto.MonitorData itemData.Param = "io" for _, base := range bases { itemData.Date = append(itemData.Date, base.CreatedAt) itemData.Value = append(itemData.Value, base) } data = append(data, itemData) } if req.Param == "all" || req.Param == "network" { bases, err := monitorRepo.GetIO(repo.WithByName(req.Info), repo.WithByCreatedAt(req.StartTime, req.EndTime)) if err != nil { return nil, err } var itemData dto.MonitorData itemData.Param = "network" for _, base := range bases { itemData.Date = append(itemData.Date, base.CreatedAt) itemData.Value = append(itemData.Value, base) } data = append(data, itemData) } return data, nil } func (m *MonitorService) LoadSetting() (*dto.MonitorSetting, error) { setting, err := settingRepo.GetList() if err != nil { return nil, constant.ErrRecordNotFound } settingMap := make(map[string]string) for _, set := range setting { settingMap[set.Key] = set.Value } var info dto.MonitorSetting arr, err := json.Marshal(settingMap) if err != nil { return nil, err } if err := json.Unmarshal(arr, &info); err != nil { return nil, err } return &info, err } func (m *MonitorService) UpdateSetting(key, value string) error { return settingRepo.Update(key, value) } func (m *MonitorService) CleanData() error { if err := global.MonitorDB.Exec("DELETE FROM monitor_bases").Error; err != nil { return err } if err := global.MonitorDB.Exec("DELETE FROM monitor_ios").Error; err != nil { return err } if err := global.MonitorDB.Exec("DELETE FROM monitor_networks").Error; err != nil { return err } return nil } func (m *MonitorService) Run() { var itemModel model.MonitorBase totalPercent, _ := cpu.Percent(3*time.Second, false) if len(totalPercent) == 1 { itemModel.Cpu = totalPercent[0] } cpuCount, _ := cpu.Counts(false) loadInfo, _ := load.Avg() itemModel.CpuLoad1 = loadInfo.Load1 itemModel.CpuLoad5 = loadInfo.Load5 itemModel.CpuLoad15 = loadInfo.Load15 itemModel.LoadUsage = loadInfo.Load1 / (float64(cpuCount*2) * 0.75) * 100 memoryInfo, _ := mem.VirtualMemory() itemModel.Memory = memoryInfo.UsedPercent if err := settingRepo.CreateMonitorBase(itemModel); err != nil { global.LOG.Errorf("Insert basic monitoring data failed, err: %v", err) } m.loadDiskIO() m.loadNetIO() MonitorStoreDays, err := settingRepo.Get(settingRepo.WithByKey("MonitorStoreDays")) if err != nil { return } storeDays, _ := strconv.Atoi(MonitorStoreDays.Value) timeForDelete := time.Now().AddDate(0, 0, -storeDays) _ = settingRepo.DelMonitorBase(timeForDelete) _ = settingRepo.DelMonitorIO(timeForDelete) _ = settingRepo.DelMonitorNet(timeForDelete) } func (m *MonitorService) loadDiskIO() { ioStat, _ := disk.IOCounters() var diskIOList []disk.IOCountersStat for _, io := range ioStat { diskIOList = append(diskIOList, io) } m.DiskIO <- diskIOList } func (m *MonitorService) loadNetIO() { netStat, _ := net.IOCounters(true) netStatAll, _ := net.IOCounters(false) var netList []net.IOCountersStat netList = append(netList, netStat...) netList = append(netList, netStatAll...) m.NetIO <- netList } func (m *MonitorService) saveIODataToDB(ctx context.Context, interval float64) { defer close(m.DiskIO) for { select { case <-ctx.Done(): return case ioStat := <-m.DiskIO: select { case <-ctx.Done(): return case ioStat2 := <-m.DiskIO: var ioList []model.MonitorIO for _, io2 := range ioStat2 { for _, io1 := range ioStat { if io2.Name == io1.Name { var itemIO model.MonitorIO itemIO.Name = io1.Name if io2.ReadBytes != 0 && io1.ReadBytes != 0 && io2.ReadBytes > io1.ReadBytes { itemIO.Read = uint64(float64(io2.ReadBytes-io1.ReadBytes) / interval / 60) } if io2.WriteBytes != 0 && io1.WriteBytes != 0 && io2.WriteBytes > io1.WriteBytes { itemIO.Write = uint64(float64(io2.WriteBytes-io1.WriteBytes) / interval / 60) } if io2.ReadCount != 0 && io1.ReadCount != 0 && io2.ReadCount > io1.ReadCount { itemIO.Count = uint64(float64(io2.ReadCount-io1.ReadCount) / interval / 60) } writeCount := uint64(0) if io2.WriteCount != 0 && io1.WriteCount != 0 && io2.WriteCount > io1.WriteCount { writeCount = uint64(float64(io2.WriteCount-io1.WriteCount) / interval * 60) } if writeCount > itemIO.Count { itemIO.Count = writeCount } if io2.ReadTime != 0 && io1.ReadTime != 0 && io2.ReadTime > io1.ReadTime { itemIO.Time = uint64(float64(io2.ReadTime-io1.ReadTime) / interval / 60) } writeTime := uint64(0) if io2.WriteTime != 0 && io1.WriteTime != 0 && io2.WriteTime > io1.WriteTime { writeTime = uint64(float64(io2.WriteTime-io1.WriteTime) / interval / 60) } if writeTime > itemIO.Time { itemIO.Time = writeTime } ioList = append(ioList, itemIO) break } } } if err := settingRepo.BatchCreateMonitorIO(ioList); err != nil { global.LOG.Errorf("Insert io monitoring data failed, err: %v", err) } m.DiskIO <- ioStat2 } } } } func (m *MonitorService) saveNetDataToDB(ctx context.Context, interval float64) { defer close(m.NetIO) for { select { case <-ctx.Done(): return case netStat := <-m.NetIO: select { case <-ctx.Done(): return case netStat2 := <-m.NetIO: var netList []model.MonitorNetwork for _, net2 := range netStat2 { for _, net1 := range netStat { if net2.Name == net1.Name { var itemNet model.MonitorNetwork itemNet.Name = net1.Name if net2.BytesSent != 0 && net1.BytesSent != 0 && net2.BytesSent > net1.BytesSent { itemNet.Up = float64(net2.BytesSent-net1.BytesSent) / 1024 / interval / 60 } if net2.BytesRecv != 0 && net1.BytesRecv != 0 && net2.BytesRecv > net1.BytesRecv { itemNet.Down = float64(net2.BytesRecv-net1.BytesRecv) / 1024 / interval / 60 } netList = append(netList, itemNet) break } } } if err := settingRepo.BatchCreateMonitorNet(netList); err != nil { global.LOG.Errorf("Insert network monitoring data failed, err: %v", err) } m.NetIO <- netStat2 } } } } func StartMonitor(removeBefore bool, interval string) error { if removeBefore { monitorCancel() global.Cron.Remove(cron.EntryID(global.MonitorCronID)) } intervalItem, err := strconv.Atoi(interval) if err != nil { return err } service := NewIMonitorService() ctx, cancel := context.WithCancel(context.Background()) monitorCancel = cancel now := time.Now() nextMinute := now.Truncate(time.Minute).Add(time.Minute) time.AfterFunc(time.Until(nextMinute), func() { monitorID, err := global.Cron.AddJob(fmt.Sprintf("@every %sm", interval), service) if err != nil { return } global.MonitorCronID = monitorID }) service.Run() go service.saveIODataToDB(ctx, float64(intervalItem)) go service.saveNetDataToDB(ctx, float64(intervalItem)) return nil }