From 3f163309add7d1939a50762e80e7ee4cb53cf81a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Z=C3=ADpek?= Date: Sun, 4 Sep 2022 09:22:38 +0200 Subject: [PATCH] Experiment with value batching --- server/services/sensor_values_service.go | 78 ++++++++++++++++++------ 1 file changed, 59 insertions(+), 19 deletions(-) diff --git a/server/services/sensor_values_service.go b/server/services/sensor_values_service.go index 4165270..8a79b07 100644 --- a/server/services/sensor_values_service.go +++ b/server/services/sensor_values_service.go @@ -14,6 +14,13 @@ type sensorValue struct { Value float64 `json:"value"` } +func min(a, b int64) int64 { + if a < b { + return a + } + return b +} + func (s *SensorValuesService) Push(sensorId int64, value float64) (int64, error) { res, err := s.ctx.DB.Exec("INSERT INTO sensor_values (timestamp, sensor_id, value) VALUES (?, ?, ?)", time.Now().Unix(), sensorId, value) @@ -25,34 +32,33 @@ func (s *SensorValuesService) Push(sensorId int64, value float64) (int64, error) } func (s *SensorValuesService) GetList(sensorId int64, from int64, to int64) ([]sensorValue, error) { - var value float64 - var timestamp int64 - values := make([]sensorValue, 0) + intervalSize := to - from + batchSize := intervalSize - rows, err := s.ctx.DB.Query("SELECT timestamp, value FROM sensor_values WHERE sensor_id = ? AND timestamp > ? AND timestamp < ? ORDER BY timestamp ASC", sensorId, from, to) - - if err != nil { - if err == sql.ErrNoRows { - return values, nil - } - - return nil, err + if intervalSize > 100*24*60 { + batchSize = 24 * 60 } - defer rows.Close() + values := make([]sensorValue, 0) + current := from + + for current < to { + nextValue := min(to, current+batchSize) + + batchValues, err := s.getValueItems(sensorId, current, nextValue) - for rows.Next() { - err := rows.Scan(×tamp, &value) if err != nil { return nil, err } - values = append(values, sensorValue{Timestamp: timestamp, Value: value}) - } + sum := 0.0 - err = rows.Err() - if err != nil { - return nil, err + for _, item := range batchValues { + sum += item.Value + } + + values = append(values, sensorValue{Timestamp: int64((nextValue + current) / 2.0), Value: sum / float64(len(batchValues))}) + current += batchSize } return values, nil @@ -70,3 +76,37 @@ func (s *SensorValuesService) GetLatest(sensorId int64, to int64) (*sensorValue, return &value, nil } + +func (s *SensorValuesService) getValueItems(sensorId int64, from int64, to int64) ([]sensorValue, error) { + values := make([]sensorValue, 0) + + rows, err := s.ctx.DB.Query("SELECT timestamp, value FROM sensor_values WHERE sensor_id = ? AND timestamp > ? AND timestamp < ? ORDER BY timestamp ASC", sensorId, from, to) + + if err != nil { + if err == sql.ErrNoRows { + return values, nil + } + + return nil, err + } + + defer rows.Close() + + for rows.Next() { + item := sensorValue{} + + err := rows.Scan(&item.Timestamp, &item.Value) + if err != nil { + return nil, err + } + + values = append(values, item) + } + + err = rows.Err() + if err != nil { + return nil, err + } + + return values, nil +}