from flask import current_app
import sqlalchemy as sql
from celery import shared_task
from celery.utils.log import get_task_logger
from crawlerdetect import CrawlerDetect
from db import FLASK_DB
from app.model.orm import (
PageVisit,
PageVisitCounter,
)
_LOGGER = get_task_logger(__name__)
_CRAWLER_DETECT = CrawlerDetect()
@shared_task
[docs]
def record_page_visit(request_info):
db_session = FLASK_DB.session
ip_info = None
if request_info['remote_addr'] and hasattr(current_app, 'maxminddb'):
try:
ip = request_info['remote_addr']
if ip.startswith('[') and ip.endswith(']'):
# IPv6 addresses may be wrapped in brackets, so let's remove them
ip = ip[1:-1]
ip_info = current_app.maxminddb.get(ip)
except Exception as e:
current_app.logger.warning(f"Maxmind Lookup failed: {e}")
_record_page_visit(db_session, request_info, ip_info)
@shared_task
[docs]
def aggregate_page_visits():
db_session = FLASK_DB.session
_aggregate_page_visits(db_session)
def _record_page_visit(db_session, request_info, ip_info=None):
country = None
if ip_info:
country = ip_info.get('country', {}).get('names', {}).get('en')
page_visit = PageVisit(
path=request_info['path'],
query=request_info['query_string'],
referrer=request_info['referrer'],
ip=request_info['remote_addr'],
country=country,
userAgent=request_info['user_agent'],
uuid=request_info['user_uuid'],
isUser=request_info['is_user'],
isAdmin=request_info['is_admin'],
isBot=_CRAWLER_DETECT.isCrawler(request_info['user_agent']),
)
db_session.add(page_visit)
db_session.commit()
def _aggregate_page_visits(db_session):
_LOGGER.info("Page visit aggregation start")
start_time, end_time, last_id = db_session.execute(
sql.select(
sql.func.min(PageVisit.createdAt),
sql.func.max(PageVisit.createdAt),
sql.func.max(PageVisit.id),
)
).one()
if last_id is None:
_LOGGER.info("There have been no new page visits since last aggregation")
return
_LOGGER.info(f"Recording page visits from {start_time} to {end_time}")
total_count = 0
total_visit_count = 0
total_bot_visit_count = 0
total_api_visit_count = 0
total_visitors = set()
total_users = set()
paths = {}
countries = {}
page_visits = db_session.scalars(sql.select(PageVisit).where(PageVisit.id <= last_id))
for pv in page_visits:
total_count += 1
pv_path = pv.path
if pv_path not in paths:
paths[pv_path] = {
'visitCount': 0,
'botVisitCount': 0,
'apiVisitCount': 0,
'visitors': set(),
'users': set(),
}
pv_country = pv.country or "Unknown"
if pv_country not in countries:
countries[pv_country] = {
'visitCount': 0,
'botVisitCount': 0,
'apiVisitCount': 0,
'visitors': set(),
'users': set(),
}
if pv_path.startswith('/api/'):
countries[pv_country]['apiVisitCount'] += 1
total_api_visit_count += 1
elif pv.isBot:
paths[pv_path]['botVisitCount'] += 1
countries[pv_country]['botVisitCount'] += 1
total_bot_visit_count += 1
else:
paths[pv_path]['visitCount'] += 1
countries[pv_country]['visitCount'] += 1
total_visit_count += 1
paths[pv_path]['visitors'].add(pv.uuid)
countries[pv_country]['visitors'].add(pv.uuid)
total_visitors.add(pv.uuid)
if pv.isUser:
paths[pv_path]['users'].add(pv.uuid)
countries[pv_country]['users'].add(pv.uuid)
total_users.add(pv.uuid)
for entry in [*paths.values(), *countries.values()]:
entry['visitorCount'] = len(entry['visitors'])
entry['userCount'] = len(entry['users'])
del entry['visitors']
del entry['users']
pvc = PageVisitCounter(
startTimestamp=start_time,
endTimestamp=end_time,
paths=paths,
countries=countries,
totalVisitCount=total_visit_count,
totalBotVisitCount=total_bot_visit_count,
totalVisitorCount=len(total_visitors),
totalUserCount=len(total_users),
totalApiVisitCount=total_api_visit_count,
)
db_session.add(pvc)
db_session.commit()
_LOGGER.info(f"Recorded {total_count} page visits")
# Clean up processed page views:
db_session.execute(
sql.delete(PageVisit)
.where(PageVisit.id <= last_id)
)
db_session.commit()