Source code for sutools.log_handler

import logging, os, datetime, warnings, atexit
from types import SimpleNamespace


[docs]class Logger: """object designed for swift granular logging configuration""" def __init__( self, name, loggers, loglvl, filename, filepath, filefmt, fhandler, filecap, filetimeout, file, streamfmt, shandler, stream, ): # set properties self.name = name self.loggers = {} self.filename = filename self.filepath = filepath self.loglvl = loglvl self.file = file self.stream = stream self.rootlogger = logging.getLogger() self.filefmt = filefmt self.fhandler = fhandler self.streamfmt = streamfmt self.shandler = shandler # iterate through loggers and add each to the root logger for log in loggers: logger = logging.getLogger(log) logger.setLevel(loglvl) # set logger level self.loggers[log] = logger self.rootlogger.addHandler(logger) # convert logger dict to namespace for easy access dot syntax self.loggers = SimpleNamespace(**self.loggers) self.rootlogger.setLevel(loglvl) # set default log level # if file property enabled create file logger if self.file: self.fhandler.setLevel(self.loglvl) # set the level of the file handler self.fhandler.setFormatter( self.filefmt ) # set the formatter for the file handler for log in vars(self.loggers).keys(): logger = logging.getLogger(log) logger.addHandler(self.fhandler) # add the file handler to the logger logger.propagate = False # disable propagation of log messages atexit.register(self.out) # register out function at interpreter exit # if stream property enabled create stream logger if self.stream: # set the log level for the stream handler self.shandler.setLevel(self.loglvl) # set the formatter for the stream handler self.shandler.setFormatter(self.streamfmt) for log in vars(self.loggers).keys(): logger = logging.getLogger(log) logger.addHandler(self.shandler) # add the stream handler to the logger logger.propagate = False # disable propagation of log messages # if a file cap is defined and type is int run cap function if filecap and isinstance(filecap, int): self.cap(filecap) # if a file timout is defined and type is str run filetimeout function if filetimeout and isinstance(filetimeout, str): self.timeout(filetimeout)
[docs] def cap(self, filecap): """delete any file outside of range based on file age""" parent_folder = os.path.dirname(self.filepath) # create list of tuples with filename and its creation time for all files ending with '.log' in parent folder logs = [ ( os.path.join(parent_folder, f), os.path.getctime(os.path.join(parent_folder, f)), ) for f in os.listdir(parent_folder) if f.endswith(".log") ] logs.sort( key=lambda x: x[1], reverse=True ) # sort the logs by their creation time in descending order # if files have exceeded the cap if len(logs) > filecap: logs_to_remove = ( len(logs) - filecap ) # calculate the number of logs to remove for log in logs[filecap:]: os.remove(log[0]) # remove file if logs_to_remove > 1: print(f"filecap removed {logs_to_remove} logs") else: print("filecap reached") # print if filecap is reached
[docs] def timeout(self, filetimeout): """delete any file outside given time range""" try: # get the path of the parent folder and find all log files in it parent_folder = os.path.dirname(self.filepath) logs = [ os.path.join(parent_folder, f) for f in os.listdir(parent_folder) if f.endswith(".log") ] # define time units and extract the amount and unit of the file timeout. time_units = { "m": "minutes", "h": "hours", "d": "days", "o": "months", "y": "years", } time_unit = time_units[filetimeout[-1]] time_amount = int(filetimeout[:-1]) # get the current time and calculate the time threshold based on the file timeout now = datetime.datetime.now() if time_unit == "years": time_threshold = now - datetime.timedelta(days=time_amount * 365) elif time_unit == "minutes": time_threshold = now - datetime.timedelta(minutes=time_amount) elif time_unit == "months": time_threshold = now - datetime.timedelta(days=time_amount * 30) else: time_threshold = now - datetime.timedelta(**{time_unit: time_amount}) # remove all logs that are older than the time threshold and count the number of logs removed logs_removed = 0 for log in logs: if os.path.getctime(log) < time_threshold.timestamp(): os.remove(log) logs_removed += 1 # print the number of logs that were removed if any if logs_removed > 0: print(f"timeout removed {logs_removed} logs") except KeyError: # warn the user if an invalid time unit is provided warnings.warn(f"Invalid time unit: {filetimeout[-1]}", Warning)
[docs] def out(self): """ Check all loggers in the loggers namespace object for existing logs. If none exist, close the file fhandlers and remove the empty file """ # check each logger for existing handlers and set to null if they exist for log in vars(self.loggers).values(): if log.hasHandlers(): for fhandler in log.handlers: fhandler.close() log.handlers = [] # check if the log file is empty and remove it try: file_size = os.path.getsize(self.filepath) if file_size == 0: os.remove(self.filepath) except FileNotFoundError as e: # print an error message if the file cannot be removed print(f"Failed to remove file: {e}") # check if the module folder is empty and remove it try: m_folder = os.path.dirname(self.filepath) if not os.listdir(m_folder): # check if folder is empty os.rmdir(m_folder) # remove empty folder except FileNotFoundError as e: # print an error message if the folder cannot be removed print(f"Failed to remove module folder: {e}") # check if the log folder is empty and remove it try: l_folder = os.path.dirname(os.path.dirname(self.filepath)) if not os.listdir(l_folder): # check if folder is empty os.rmdir(l_folder) # remove empty folder except FileNotFoundError as e: # print an error message if the folder cannot be removed print(f"Failed to remove log folder: {e}")