Python: Event Monitoring with watchdogs

Python file watcher

Pravash
8 min readJun 7, 2023

In this article I will discuss about one of the Python underdog libraries, watchdog and how its useful and competitive to other monitoring and scheduling Jobs like Cron. We will take a closer look at what the watchdog library is and how you can use it in your Python scripts.

Since, It’s an open source automation file handling library, It will reduce the manual work as this file-watcher, upon the arrival/modification of any file, will trigger a follow-up process.

Here, I will explain about one of the file event handler of this module with examples which will help you to get started with it. So, lets dive in

What is Watchdog ?

watchdog is a Python API library and shell utilities to monitor file system events — based on the official documentation.

It provides a simple and intuitive interface for monitoring files and directories, and it can be used to automate various tasks that require you to keep an eye on changes to files and directories.

You need to run below command to install (make sure you are using pytohn 3.6 or above):

pip install watchdog

Event Handler and Observer

The main implementation or you can say the building blocks of watchdog is based on the following classes:

  • Observer
  • Event handler

The following imports are what all we require -

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

Event handler

Below are the 4 Event Handlers available in watchdog:

  • FileSystemEventHandler: Base file system event handler that you can override methods from.
  • PatternMatchingEventHandler: Matches given patterns with file paths associated with occurring events.
  • RegexMatchingEventHandler: Matches given regexes with file paths associated with occurring events.
  • LoggingEventHandler: Logs all the events captured.

You can refer to this link for more details on Handlers

By extending an event handler class provided by watchdog we gain the ability to handle modified, created, deleted and moved events by implementing their respective functions and also following functions which we can override as the other remaining classes inherits from FileSystemEventHandler

on_any_event(event)
on_created(event)
on_deleted(event)
on_modified(event)
on_moved(event)

For each of the functions, it will have an input parameter called event which contains the following variables:
event_type — The type of the event as a string. Default to None.
is_directory — True if event was emitted for a directory; False otherwise.
src_path — Source path of the file system object that triggered this event.

Observer

Since you all are familiar with the design patterns, So watchdog follows the observation design pattern.

So every observer will have the events and it will look and show the changes if there is any change in the files or directories.

Observer threads, schedules watching directories and dispatches calls to event handlers

You can also import platform specific classes directly and use it instead of Observer

Understanding with implementation

So for this example I will be using the FileSystemEventHandler event class.
I will be setting the watch on a folder and trigger another function if any files arrive. And after the process is completed I will be moving that file to another folder.

  • First of all, you need to create the instances of the observer and event handler which you be using to inherit the FileSystemEventHandler.
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class MyHandler(FileSystemEventHandler):
pass

observer = Observer()
event_handler = MyHandler()
  • Now, you need to schedule this observer. This observer will take the following 3 parameters -
    event_handler: The handler object you have created
    path: The folder you want to track
    recursive: How many times it should track
observer.schedule(event_handler, path='./input_files', recursive=True)
  • observer.start() — Starts the Observer thread and wait for it to generate events that will trigger the code inside the event handler.
    you can use the time to set the monitoring time (time.sleep(100)).
  • observer.stop() this function will clean up the resources.
  • Now finish this with the observer.join(), as we are using the multi threading concept here. Here the join() will join the multiple threads and until the thread whose join method is called terminates.
observer.start()
try:
while True:
time.sleep(300)
except KeyboardInterrupt:
observer.stop()
observer.join()
  • Now moving to the event class. In this example, I will be checking if any file is uploaded into the tracking folder. for that I can use on_created(event).

Note: watchdog will also check the sub directories as well if you provide the parent directory

def create_directory(file_path=None):
# Get the current date in the format of 'year-month-day'
current_date = datetime.now().strftime('%Y-%m-%d')

# Create a folder with the current date
folder_path = f'{file_path}/{current_date}'
if not os.path.exists(folder_path):
os.makedirs(folder_path)
return folder_path
else:
return folder_path

class MyHandler(FileSystemEventHandler):
def on_created(self, event):
dir_path = event.src_path.split('/input_files')
processed_files = f'{dir_path[0]}/processed_files'

child_processed_dir = create_directory(file_path=processed_files)

if event:
print("file created:{}".format(event.src_path))
# call function here
main(file_name=event.src_path)

file_name = event.src_path.split('/')[-1]
destination_path = f'{child_processed_dir}/{file_name}'

# Move file to other directory
shutil.move(event.src_path, destination_path)
print("file moved:{} to {}".format(event.src_path, destination_path))

In the example above, I am using function — create_directory(), to check if a folder with current date is in the destination path or else to create the same.
Then I am using the same path as the destination to move files (shutil.move module) after I did some processing in function — main(), from other python script.

Please find below the final code -

event_handler_ex.py -

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import shutil
import time
import os
from datetime import datetime
from watchdog_fileobserver_ex.main import main


def create_directory(file_path=None):
# Get the current date in the format of 'year-month-day'
current_date = datetime.now().strftime('%Y-%m-%d')

# Create a folder with the current date
folder_path = f'{file_path}/{current_date}'
if not os.path.exists(folder_path):
os.makedirs(folder_path)
return folder_path
else:
return folder_path


class MyHandler(FileSystemEventHandler):
def on_created(self, event):
dir_path = event.src_path.split('/input_files')
processed_files = f'{dir_path[0]}/processed_files'

child_processed_dir = create_directory(file_path=processed_files)

if event:
print("file created:{}".format(event.src_path))
# call function here
main(file_name=event.src_path)

file_name = event.src_path.split('/')[-1]
destination_path = f'{child_processed_dir}/{file_name}'

shutil.move(event.src_path, destination_path)
print("file moved:{} to {}".format(event.src_path, destination_path))


if __name__ == "__main__":
observer = Observer()
event_handler = MyHandler()
observer.schedule(event_handler, path='./input_files', recursive=True)
observer.start()
try:
while True:
time.sleep(300)
except KeyboardInterrupt:
observer.stop()
observer.join()

watchdog_fileobserver_ex.main.py -

import csv

def read_csv_file(file_name):
try:
with open(f"{file_name}", 'r') as file:
csvreader = csv.DictReader(file)
for row in csvreader:
print(row)
return csvreader
except Exception as e:
pass

def main(file_name=None):
if file_name:
dict_data = read_csv_file(file_name)
print("Process completed")
else:
print("Invalid file path")

Wait for the File Transfer to Complete !!

Case where you need to wait for a file to be uploaded and then perform the required operation.
for that you can add either of the below code inside event functions -

def on_created(self, event):
file_size = -1
while file_size != os.path.getsize(event.src_path):
file_size = os.path.getsize(event.src_path)
print(file_size)
time.sleep(1)

### OR ###

def on_created(self, event):
file = None
while file is None:
try:
file = open(event.src_path)
except OSError:
logger.info('Waiting for file transfer....')
time.sleep(1)
continue

Use cases of watchdog

A scenario where you want to Ignore subdirectory or include only pattern matching files:

If you want to ignore some files in a directory, then you can use one of the simplest way by using PatternMatchingEventHandler

In file — event_handler_ex.py, modify the inherited class in MyHandler with PatternMatchingEventHandler and the event_handler instance as shown below


class MyHandler(PatternMatchingEventHandler):
....
....

if __name__ == "__main__":
event_handler = MyHandler(patterns=["*.csv", "*.pdf"],
ignore_patterns=[],
ignore_directories=True
)
....
....

You can implement Celery to start/stop watchdog:

You can use below example to implement the watchdog. Although, this example is just an idea on how you can integrate celery in watchdog.

from celery import Celery
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import os
import time

app = Celery('celery_ex.celery_apptask_ex', broker='redis://localhost:6379/0')


@app.task
def process_file(file_path):
# do something with the file
with open(file_path, 'r') as f:
print(f.read())


class MyHandler(PatternMatchingEventHandler):
def on_created(self, event):
file_size = -1
while file_size != os.path.getsize(event.src_path):
file_size = os.path.getsize(event.src_path)
print(file_size)
time.sleep(1)

if event:
print("file created:{}".format(event.src_path))
# call function here
process_file.apply_async(args=(event.src_path,))


if __name__ == "__main__":
observer = Observer()
event_handler = MyHandler(patterns=["*.csv", "*.pdf"],
ignore_patterns=[],
ignore_directories=True
)
observer.schedule(event_handler, path='./input_files', recursive=True)
observer.start()

try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()

Perquisites required -

install ‘redis’ —> brew install redis(for Mac)
then run -> redis-server (this run the redis in ‘localhost:6379’)

install celery in python environment —> pip install celery

Here, is an example, which might interest you -
https://gist.github.com/chenjianjx/53d8c2317f6023dc2fa0

There’s might be other ways of integrating watchdog with celery, But I believe this the simplest way of achieving the same.

Monitoring a directory for changes:

The observer can be set up to watch the specified directory and all its subdirectories, and to call the appropriate method (on_created, on_deleted, or on_modified) whenever a file or directory is created, deleted, or modified.
The observer runs in an infinite loop, which can be interrupted by a keyboard interrupt.

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class EventHandler(FileSystemEventHandler):
def on_created(self, event):
if event.is_directory:
print("Directory created:", event.src_path)
else:
print("File created:", event.src_path)

def on_deleted(self, event):
if event.is_directory:
print("Directory deleted:", event.src_path)
else:
print("File deleted:", event.src_path)

def on_modified(self, event):
if event.is_directory:
print("Directory modified:", event.src_path)
else:
print("File modified:", event.src_path)

event_handler = EventHandler()
observer = Observer()
observer.schedule(event_handler, "/path/to/dir", recursive=True)
observer.start()

try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()

Scenario where you start separate processes by implementing watchdog with thread and multiprocess:

You can also run watchdog to process multiple files in parallel using threading and multiprocessing.
below is the example for the same below -

from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import os
import ntpath
import time
import optparse
import multiprocessing
import threading
from collections import OrderedDict

lock = threading.RLock()


def process_function(get_event, event_dict):
print(f"Process started for event: {get_event}")
dir_path = ntpath.abspath(get_event)
file_name = ntpath.basename(get_event)

if len(get_event) > 0:
your_own_function()
do something....

class Handler(PatternMatchingEventHandler):
def __init__(self, queue):
PatternMatchingEventHandler.__init__(self, patterns=['*.csv'],
ignore_patterns=[],
ignore_directories=True)
self.queue = queue

def on_created(self, event):
# logger.info(f"Wait while the transfer of the file is finished before processing it")
# file_size = -1
# while file_size != os.path.getsize(event.src_path):
# file_size = os.path. getsize(event.src_path)
# time.sleep(1)

file = None
while file is None:
try:
file = open(event.src_path)
except OSError:
logger.info('Waiting for file transfer')
time.sleep(5)
continue

self.queue.put(event.src_path)

def on_modified(self, event):
pass


def start_watchdog(watchdog_queue, dir_path):
logger.info(f"Starting Watchdog Observer\n")
event_handler = Handler(watchdog_queue)
observer = Observer()
observer.schedule(event_handler, dir_path, recursive=False)
observer.start()

try:
while True:
time.sleep(1)
except Exception as error:
observer.stop()
logger.error(f"Error: {str(error)}")
observer.join()


if __name__ == '__main__':
dir_path = r'//file_path/'

watchdog_queue = Queue()

logger.info(f"Starting Worker Thread")
worker = threading.Thread(target=start_watchdog, name="Watchdog",
args=(watchdog_queue, dir_path), daemon=True)
worker.start()

mp = Manager()
event_dict = mp.dict()

while True:
if not watchdog_queue.empty():
logger.info(f"Is Queue empty: {watchdog_queue.empty()}")
pool = Pool()
pool.apply_async(process_function, (watchdog_queue.get(), event_dict))
else:
time.sleep(1)

Implement Logging in watchdog:

To log events, you can create a custom event handler class that inherits from the FileSystemEventHandler class, and override the methods that correspond to the events you're interested in logging.

Here’s an example of how you can log file creation and modification events using the watchdog library:

import logging
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class LogEventHandler(FileSystemEventHandler):
def on_created(self, event):
if not event.is_directory:
logging.info(f"File created: {event.src_path}")

def on_modified(self, event):
if not event.is_directory:
logging.info(f"File modified: {event.src_path}")

logging.basicConfig(filename='watchdog.log', level=logging.INFO, format='%(asctime)s - %(message)s')
event_handler = LogEventHandler()
observer = Observer()
observer.schedule(event_handler, "/path/to/", recursive=True)
observer.start()

try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()

This library can also handle many complex scenarios like update user/process if any new files arrive or some files were updated, log changes, etc.

watchdog comes under the observation design pattern. That is object, maintains a list of its dependents, called observers, and notifies them automatically of any state changes, usually by calling one of their methods.

Whether you’re working on a large project that requires you to keep track of multiple files, or you just want to automate a task that requires you to keep an eye on a single file, the watchdog library has got you covered.

I believe this article is helpful for anyone who is new to this library and now have basic understanding to get started with it.

--

--

Pravash

I am a passionate Data Engineer and Technology Enthusiast. Here I am using this platform to share my knowledge and experience on tech stacks.