![]() System : Linux absol.cf 5.4.0-198-generic #218-Ubuntu SMP Fri Sep 27 20:18:53 UTC 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.33 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, Directory : /usr/local/lib/python3.8/dist-packages/watchdog/observers/ |
Upload File : |
# Copyright 2011 Yesudeep Mangalapilly <yesudeep@gmail.com> # Copyright 2012 Google, Inc & contributors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """:module: watchdog.observers.polling :synopsis: Polling emitter implementation. :author: yesudeep@google.com (Yesudeep Mangalapilly) :author: contact@tiger-222.fr (Mickaƫl Schoentgen) Classes ------- .. autoclass:: PollingObserver :members: :show-inheritance: .. autoclass:: PollingObserverVFS :members: :show-inheritance: :special-members: """ from __future__ import annotations import os import threading from functools import partial from watchdog.events import ( DirCreatedEvent, DirDeletedEvent, DirModifiedEvent, DirMovedEvent, FileCreatedEvent, FileDeletedEvent, FileModifiedEvent, FileMovedEvent, ) from watchdog.observers.api import DEFAULT_EMITTER_TIMEOUT, DEFAULT_OBSERVER_TIMEOUT, BaseObserver, EventEmitter from watchdog.utils.dirsnapshot import DirectorySnapshot, DirectorySnapshotDiff, EmptyDirectorySnapshot class PollingEmitter(EventEmitter): """Platform-independent emitter that polls a directory to detect file system changes. """ def __init__( self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, event_filter=None, stat=os.stat, listdir=os.scandir, ): super().__init__(event_queue, watch, timeout, event_filter) self._snapshot: DirectorySnapshot = EmptyDirectorySnapshot() self._lock = threading.Lock() self._take_snapshot = lambda: DirectorySnapshot( self.watch.path, self.watch.is_recursive, stat=stat, listdir=listdir, ) def on_thread_start(self): self._snapshot = self._take_snapshot() def queue_events(self, timeout): # We don't want to hit the disk continuously. # timeout behaves like an interval for polling emitters. if self.stopped_event.wait(timeout): return with self._lock: if not self.should_keep_running(): return # Get event diff between fresh snapshot and previous snapshot. # Update snapshot. try: new_snapshot = self._take_snapshot() except OSError: self.queue_event(DirDeletedEvent(self.watch.path)) self.stop() return events = DirectorySnapshotDiff(self._snapshot, new_snapshot) self._snapshot = new_snapshot # Files. for src_path in events.files_deleted: self.queue_event(FileDeletedEvent(src_path)) for src_path in events.files_modified: self.queue_event(FileModifiedEvent(src_path)) for src_path in events.files_created: self.queue_event(FileCreatedEvent(src_path)) for src_path, dest_path in events.files_moved: self.queue_event(FileMovedEvent(src_path, dest_path)) # Directories. for src_path in events.dirs_deleted: self.queue_event(DirDeletedEvent(src_path)) for src_path in events.dirs_modified: self.queue_event(DirModifiedEvent(src_path)) for src_path in events.dirs_created: self.queue_event(DirCreatedEvent(src_path)) for src_path, dest_path in events.dirs_moved: self.queue_event(DirMovedEvent(src_path, dest_path)) class PollingObserver(BaseObserver): """Platform-independent observer that polls a directory to detect file system changes. """ def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT): super().__init__(PollingEmitter, timeout=timeout) class PollingObserverVFS(BaseObserver): """File system independent observer that polls a directory to detect changes.""" def __init__(self, stat, listdir, polling_interval=1): """:param stat: stat function. See ``os.stat`` for details. :param listdir: listdir function. See ``os.scandir`` for details. :type polling_interval: float :param polling_interval: interval in seconds between polling the file system. """ emitter_cls = partial(PollingEmitter, stat=stat, listdir=listdir) super().__init__(emitter_cls, timeout=polling_interval)