Source code for tracking

from flask import current_app
import sqlalchemy as sql
from celery import shared_task
from celery.utils.log import get_task_logger
from crawlerdetect import CrawlerDetect

from db import FLASK_DB
from app.model.orm import (
    PageVisit,
    PageVisitCounter,
)

_LOGGER         = get_task_logger(__name__)
_CRAWLER_DETECT = CrawlerDetect()


@shared_task
[docs] def record_page_visit(request_info): db_session = FLASK_DB.session ip_info = None if request_info['remote_addr'] and hasattr(current_app, 'maxminddb'): try: ip = request_info['remote_addr'] if ip.startswith('[') and ip.endswith(']'): # IPv6 addresses may be wrapped in brackets, so let's remove them ip = ip[1:-1] ip_info = current_app.maxminddb.get(ip) except Exception as e: current_app.logger.warning(f"Maxmind Lookup failed: {e}") _record_page_visit(db_session, request_info, ip_info)
@shared_task
[docs] def aggregate_page_visits(): db_session = FLASK_DB.session _aggregate_page_visits(db_session)
def _record_page_visit(db_session, request_info, ip_info=None): country = None if ip_info: country = ip_info.get('country', {}).get('names', {}).get('en') page_visit = PageVisit( path=request_info['path'], query=request_info['query_string'], referrer=request_info['referrer'], ip=request_info['remote_addr'], country=country, userAgent=request_info['user_agent'], uuid=request_info['user_uuid'], isUser=request_info['is_user'], isAdmin=request_info['is_admin'], isBot=_CRAWLER_DETECT.isCrawler(request_info['user_agent']), ) db_session.add(page_visit) db_session.commit() def _aggregate_page_visits(db_session): _LOGGER.info("Page visit aggregation start") start_time, end_time, last_id = db_session.execute( sql.select( sql.func.min(PageVisit.createdAt), sql.func.max(PageVisit.createdAt), sql.func.max(PageVisit.id), ) ).one() if last_id is None: _LOGGER.info("There have been no new page visits since last aggregation") return _LOGGER.info(f"Recording page visits from {start_time} to {end_time}") total_count = 0 total_visit_count = 0 total_bot_visit_count = 0 total_api_visit_count = 0 total_visitors = set() total_users = set() paths = {} countries = {} page_visits = db_session.scalars(sql.select(PageVisit).where(PageVisit.id <= last_id)) for pv in page_visits: total_count += 1 pv_path = pv.path if pv_path not in paths: paths[pv_path] = { 'visitCount': 0, 'botVisitCount': 0, 'apiVisitCount': 0, 'visitors': set(), 'users': set(), } pv_country = pv.country or "Unknown" if pv_country not in countries: countries[pv_country] = { 'visitCount': 0, 'botVisitCount': 0, 'apiVisitCount': 0, 'visitors': set(), 'users': set(), } if pv_path.startswith('/api/'): countries[pv_country]['apiVisitCount'] += 1 total_api_visit_count += 1 elif pv.isBot: paths[pv_path]['botVisitCount'] += 1 countries[pv_country]['botVisitCount'] += 1 total_bot_visit_count += 1 else: paths[pv_path]['visitCount'] += 1 countries[pv_country]['visitCount'] += 1 total_visit_count += 1 paths[pv_path]['visitors'].add(pv.uuid) countries[pv_country]['visitors'].add(pv.uuid) total_visitors.add(pv.uuid) if pv.isUser: paths[pv_path]['users'].add(pv.uuid) countries[pv_country]['users'].add(pv.uuid) total_users.add(pv.uuid) for entry in [*paths.values(), *countries.values()]: entry['visitorCount'] = len(entry['visitors']) entry['userCount'] = len(entry['users']) del entry['visitors'] del entry['users'] pvc = PageVisitCounter( startTimestamp=start_time, endTimestamp=end_time, paths=paths, countries=countries, totalVisitCount=total_visit_count, totalBotVisitCount=total_bot_visit_count, totalVisitorCount=len(total_visitors), totalUserCount=len(total_users), totalApiVisitCount=total_api_visit_count, ) db_session.add(pvc) db_session.commit() _LOGGER.info(f"Recorded {total_count} page visits") # Clean up processed page views: db_session.execute( sql.delete(PageVisit) .where(PageVisit.id <= last_id) ) db_session.commit()