MongoDB按照天數或小時聚合
需求
最近接到需求,需要對用戶賬戶下的設備狀態,分別按照天以及小時進行聚合,以此為基礎繪制設備狀態趨勢圖.
實現思路是啟動定時任務,對各用戶的設備狀態數據分別按照小時以及天進行聚合,并存儲進數據庫中供用戶后續查詢.
涉及到的技術棧分別為:Spring Boot
,MongoDB,Morphia
.
數據模型
@Data
@Builder
@Entity(value = "rawDevStatus", noClassnameStored = true)
// 設備狀態索引
@Indexes({
// 設置數據超時時間(TTL,MongoDB根據TTL在后臺進行數據刪除操作)
@Index(fields = @Field("time"), options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)),
@Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
})
public class RawDevStatus {
@Id
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
private ObjectId objectId;
private String userId;
private Instant time;
@Embedded("points")
ListPoint> protocolPoints;
@Data
@AllArgsConstructor
public static class Point {
/**
* 協議類型
*/
private Protocol protocol;
/**
* 設備總數
*/
private Integer total;
/**
* 設備在線數目
*/
private Integer onlineNum;
/**
* 處于啟用狀態設備數目
*/
private Integer enableNum;
}
}
上述代碼是設備狀態實體類,其中設備狀態數據是按照設備所屬協議進行區分的.
@Data
@Builder
@Entity(value = "aggregationDevStatus", noClassnameStored = true)
@Indexes({
@Index(fields = @Field("expireAt"), options = @IndexOptions(expireAfterSeconds = 0)),
@Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
})
public class AggregationDevStatus {
@Id
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
private ObjectId objectId;
/**
* 用戶ID
*/
private String userId;
/**
* 設備總數
*/
private Double total;
/**
* 設備在線數目
*/
private Double onlineNum;
/**
* 處于啟用狀態設備數目
*/
private Double enableNum;
/**
* 聚合類型(按照小時還是按照天聚合)
*/
@Property("aggDuration")
private AggregationDuration aggregationDuration;
private Instant time;
/**
* 動態設置文檔過期時間
*/
private Instant expireAt;
}
上述代碼是期待的聚合結果,其中構建兩個索引:(1)超時索引;(2)復合索引,程序會根據用戶名以及時間查詢設備狀態聚合結果.
聚合操作符介紹
聚合操作類似于管道,管道中的每一步操作產生的中間結果作為下一步的輸入源,最終輸出聚合結果.
此次聚合主要涉及以下操作:
•$project:指定輸出文檔中的字段.
•$unwind:拆分數據中的數組;
•match:選擇要處理的文檔數據;
•group:根據key分組聚合結果.
原始聚合語句
db.getCollection('raw_dev_status').aggregate([
{$match:
{
time:{$gte: ISODate("2019-06-27T00:00:00Z")},
}
},
{$unwind: "$points"},
{$project:
{
userId:1,points:1,
tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
}
},
{$project:
{
userId:1,points:1,
groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
}
},
{$group:
{
_id:{user_id:'$userId', cal_time:'$groupTime'},
devTotal:{'$avg':'$points.total'},
onlineTotal:{'$avg':'$points.onlineNum'},
enableTotal:{'$avg':'$points.enableNum'}
}
},
])
上述代碼是按小時聚合數據,以下來逐步介紹處理思路:
(1) $match
根據小時聚合數據,因為只需要獲取近24小時的聚合結果,所以對數據進行初步篩選.
(2) $unwind
raw_dev_status中的設備狀態是按照協議區分的數組,因此需要對其進行展開,以便下一步進行篩選;
(3) $project
{$project:
{
userId:1,points:1,
tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
}
}
選擇需要輸出的數據,分別為:userId,points
以及tmp.
需要注意,為了按照時間聚合,對$time屬性進行操作,提取%Y:%m:%dT%H時信息至$tmp作為下一步的聚合依據.
如果需要按天聚合,則format數據可修改為:%Y:%m:%dT00:00:00Z
即可滿足要求.
(4) $project
{$project:
{
userId:1,points:1,
groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
}
}
因為上一步project操作中,tmp為字符串數據,最終的聚合結果需要時間戳(主要懶,不想在程序中進行轉換操作).
因此,此處對$tmp進行操作,轉換為時間類型數據,即groupTime.
(5) $group
對聚合結果進行分類操作,并生成最終輸出結果.
{$group:
{
# 根據_id進行分組操作,依據是`user_id`以及`$groupTime`
_id:{user_id:'$userId', cal_time:'$groupTime'},
# 求設備總數平均值
devTotal:{'$avg':'$points.total'},
# 求設備在線數平均值
onlineTotal:{'$avg':'$points.onlineNum'},
# ...
enableTotal:{'$avg':'$points.enableNum'}
}
}
代碼編寫
此處ODM選擇Morphia,亦可以使用MongoTemplate,原理類似.
/**
* 創建聚合條件
*
* @param pastTime 過去時間段
* @param dateToString 格式化字符串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT00:00:00Z)
* @return 聚合條件
*/
private AggregationPipeline createAggregationPipeline(Instant pastTime, String dateToString, String stringToDate) {
QueryRawDevStatus> query = datastore.createQuery(RawDevStatus.class);
return datastore.createAggregation(RawDevStatus.class)
.match(query.field("time").greaterThanOrEq(pastTime))
.unwind("points", new UnwindOptions().preserveNullAndEmptyArrays(false))
.match(query.field("points.protocol").equal("ALL"))
.project(Projection.projection("userId"),
Projection.projection("points"),
Projection.projection("convertTime",
Projection.expression("$dateToString",
new BasicDBObject("format", dateToString)
.append("date", "$time"))
)
)
.project(Projection.projection("userId"),
Projection.projection("points"),
Projection.projection("convertTime",
Projection.expression("$dateFromString",
new BasicDBObject("format", stringToDate)
.append("dateString", "$convertTime"))
)
)
.group(
Group.id(Group.grouping("userId"), Group.grouping("convertTime")),
Group.grouping("total", Group.average("points.total")),
Group.grouping("onlineNum", Group.average("points.onlineNum")),
Group.grouping("enableNum", Group.average("points.enableNum"))
);
}
/**
* 獲取聚合結果
*
* @param pipeline 聚合條件
* @return 聚合結果
*/
private ListAggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) {
ListAggregationMidDevStatus> statuses = new ArrayList>();
IteratorAggregationMidDevStatus> resultIterator = pipeline.aggregate(
AggregationMidDevStatus.class, AggregationOptions.builder().allowDiskUse(true).build());
while (resultIterator.hasNext()) {
statuses.add(resultIterator.next());
}
return statuses;
}
//......................................................................................
// 獲取聚合結果(省略若干代碼)
AggregationPipeline pipeline = createAggregationPipeline(pastTime, dateToString, stringToDate);
ListAggregationMidDevStatus> midStatuses = getAggregationResult(pipeline);
if (CollectionUtils.isEmpty(midStatuses)) {
log.warn("Can not get dev status aggregation result.");
return;
}
總結
以上所述是小編給大家介紹的基于Morphia實現MongoDB按小時、按天聚合操作方法,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對腳本之家網站的支持!
如果你覺得本文對你有幫助,歡迎轉載,煩請注明出處,謝謝!
您可能感興趣的文章:- JAVA mongodb 聚合幾種查詢方式詳解
- Mongodb中MapReduce實現數據聚合方法詳解
- Mongodb聚合函數count、distinct、group如何實現數據聚合操作
- MongoDB教程之聚合(count、distinct和group)
- MongoDB聚合功能淺析