상황
회사 업무 중에 다른 통합 서비스와의 연동을 위해 내가 개발 중인 특정 시스템 백엔드에서 통계 기능을 API 형태로 제공해야 하는 상황이 발생했다. 이 기능은 일일, 주간, 월간 현황 등 다양한 기간별 통계를 제공해야 했고, 데이터베이스는 MongoDB를 사용하고 있었다. 이를 위해 MongoDB의 Aggregation Pipeline을 활용하여 데이터를 가공하였고, 특히 시간대(timezone) 처리에 주의를 기울였다.
1. 데이터 저장 시 시간대 처리
MongoDB는 기본적으로 날짜 데이터를 UTC 기준으로 저장한다. 따라서 한국 시간(KST)으로 입력된 날짜도 내부적으로는 UTC로 변환되어 저장된다. 예를 들어, 한국 시간으로 2025-05-21T15:47:52에 저장된 데이터는 MongoDB에서는 ISODate("2025-05-21T06:47:52Z")로 저장된다.
이를 고려하여, API에서 날짜 데이터를 입력받을 때는 한국 시간으로 입력받되, 내부적으로는 UTC로 변환하여 저장하도록 처리하였다. FastAPI를 사용하여 입력된 날짜에 타임존 정보를 추가하는 방식으로 구현하였다.
# 라우터 코드 내에서...
from pytz import timezone
...
@router.get("/analytics", summary="특정 기간 동안의 입출입 데이터 분석 조회")
async def getAnalytics(
start: datetime = Query(..., description="검색 시작일 (예: 2025-04-01 00:00:00)"),
end: datetime = Query(..., description="검색 종료일 (예: 2025-05-01 23:59:59)"),
):
KST = timezone("Asia/Seoul")
startTime = KST.localize(start)
endTime = KST.localize(end)
...
2. API 구성
MongoDB의 Aggregation Framework를 활용하여, 다음과 같은 세 가지 통계 데이터를 제공하는 API를 구성했다.
-
- 공정별 작업 현황 통계
-
- 작업 사유별 통계
-
- 시간대별 작업 현황 통계
각 통계는 MongoDB의 다양한 Aggregation 연산자를 활용하여 구현하였다.
3. Aggregation Pipeline을 활용한 통계 데이터 가공
공정별로 작업 횟수와 평균 작업 시간을 계산하기 위해 다음과 같은 파이프라인을 구성함
3.1 공정별 현황 통계
# 특정 서비스 메서드 내부 ...
process_pipeline =
[
# $match: 조건 필터링(where)
{"$match": {"startTime": {"$gte": start, "$lt": end}}},
# $group: 그룹핑(group by)
{"$group": {
"_id": "$processName",
"count": {"$sum": 1},
"totalDuration": {
"$sum": {
"$divide": [{"$subtract": ["$endTime", "$startTime"]}, 1000 * 60]
}
}
}},
# $project: 출력 필드를 선택(변형)
{"$project": {
"process": "$_id",
"count": 1,
# $cond: if -else 조건식을 뜻함(조건 연산자)
"avgDuration": {
"$cond": [{"$eq": ["$count", 0]}, 0, {"$divide": ["$totalDuration", "$count"]}]
},
"_id": 0
}}
]
여기서 $match는 지정된 기간 내의 데이터를 필터링하고, $group은 공정별로 데이터를 그룹화하여 작업 횟수와 총 작업 시간을 계산한다. 이후 $project를 통해 평균 작업 시간을 계산하고 필요한 필드만을 선택한다.
3.2 작업 사유별 통계
작업 사유별로 발생 횟수를 계산하기 위해 $unwind 연산자를 활용하여 배열을 펼친 후, $group과 $project를 사용하여 통계를 계산했다
reason_pipeline =
[
{"$match": {"startTime": {"$gte": start, "$lt": end}}},
{"$unwind": "$workReasons"},
{"$group": {
"_id": "$workReasons",
"count": {"$sum": 1}
}},
{"$project": {
"reason": "$_id",
"count": 1,
"_id": 0
}}
]
이 파이프라인은 각 작업의 사유를 개별적으로 분리하여, 사유별로 발생 횟수를 계산한다.
3.3 시간대별 작업 현황 통계
시간대별로 작업 현황을 분석하기 위해, $dateToString 연산자를 활용하여 KST(한국 표준시) 기준의 시(hour)를 추출하고, 이를 기반으로 그룹화하여 통계를 계산했다
hour_pipeline =
[
{"$match": {"startTime": {"$gte": start, "$lt": end}}},
# addFileds를 통해 특정 필드 추가
{"$addFields": {
"hour": {
"$toInt": {
"$dateToString": {
"date": "$startTime",
"format": "%H", # 00~23시 문자열
"timezone": "Asia/Seoul" # KST 기준
}
}
}
}},
{"$group": {
"_id": "$hour",
"count": {"$sum": 1},
"totalDuration": {
"$sum": {
"$divide": [{"$subtract": ["$endTime", "$startTime"]}, 1000 * 60]
}
}
}},
{"$project": {
"hour": "$_id",
"count": 1,
"avgDuration": {
"$cond": [{"$eq": ["$count", 0]}, 0, {"$divide": ["$totalDuration", "$count"]}]
},
"_id": 0
}}
]
여기서 $dateToString 연산자의 timezone 옵션을 활용하여 UTC로 저장된 시간을 KST로 변환하였다. 참고로 이러한 방식은 MongoDB 3.6 이상에서 지원된다고 하며, 시간대별 분석에 유용하다.
이렇게 정의된 파이프라인을 각각 실행하여 결과를 반환하였다.
# 서비스 코드 내부
...
# aggregate 코드(aggregateData)는 재사용을 위해 별도의 base 함수 코드에 정의함
process_stats = await aggregateData(self.dbName, self.tableName, process_pipeline)
reason_stats = await aggregateData(self.dbName, self.tableName, reason_pipeline)
hour_stats = await aggregateData(self.dbName, self.tableName, hour_pipeline)
return {
"byProcess": process_stats,
"byReason": reason_stats,
"byHour": hour_stats,
}
참고로 정의한 base.py 코드는 아래와 같다 (기본 메서드 정의)
from database.mongoDB import *
from datetime import datetime
from typing import List, Dict
...
# aggregate 메서드
async def aggregateData(database: str, collection: str, pipeline: List[dict]) -> List[dict]:
cursor = getConnection()[database][collection].aggregate(pipeline)
results = []
async for doc in cursor:
results.append(doc)
return results
4. 시간대 처리 시 주의 사항
MongoDB는 날짜 데이터를 UTC 기준으로 저장하므로, 시간대별 통계를 계산할 때는 반드시 타임존을 고려해야 한다. 특히 $dateToString 연산자를 사용할 때는 timezone 옵션을 명시하여 원하는 시간대 기준으로 데이터를 가공해야 한다.
예를 들어, 한국 시간 기준으로 시간대를 계산하려면 다음과 같이 timezone 옵션을 설정해야 한다는게 중요하다
{
"$dateToString": {
"date": "$startedAt",
"format": "%H",
"timezone": "Asia/Seoul"
}
}
이를 통해 한국 시간 기준으로 정확한 시간대별 통계를 계산할 수 있다.
'개발 일상' 카테고리의 다른 글
| 대량의 파일, 효율적으로 압축 & 전송하기 – 우분투 기반 백업 최적화 (2) | 2025.06.02 |
|---|---|
| [python] data.dict() vs jsonable_encoder(). 그리고 datetime 쿼리 삽질기 (0) | 2025.05.26 |
| 주기적인 MongoDB 서비스 Down 증상 해결 (0) | 2025.05.19 |
| [React.js] setTimeout 사용 시 최신 상태(state)가 반영되지 않는 이슈 처리 (0) | 2025.04.07 |
| Ubuntu의 crontab에서 GUI 프로그램 실행 스케쥴 등록하기(ubuntu 22.04 기준) (0) | 2022.11.04 |
