Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/io/backends/virtual.py: 84%

45 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-12 12:19 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Squirrel IO adaptor for virtual files. 

8''' 

9 

10import time 

11from collections import defaultdict 

12from pyrocko.io.io_common import FileLoadError 

13 

14 

15def provided_formats(): 

16 return ['virtual'] 

17 

18 

19def detect(first512): 

20 return None 

21 

22 

23class UniqueKeyRequired(Exception): 

24 pass 

25 

26 

27def get_stats(file_path): 

28 try: 

29 return float(data_mtimes[file_path]), 0 

30 except KeyError: 

31 raise FileLoadError(file_path) 

32 

33 

34def touch(file_path): 

35 try: 

36 data_mtimes[file_path] = time.time() 

37 except KeyError: 

38 raise FileLoadError(file_path) 

39 

40 

41data = defaultdict(list) 

42data_mtimes = {} 

43 

44 

45def add_nuts(nuts): 

46 file_paths = set() 

47 for nut in nuts: 

48 file_paths.add(nut.file_path) 

49 data[nut.file_path].append(nut) 

50 

51 for file_path in file_paths: 

52 data[file_path].sort( 

53 key=lambda nut: (nut.file_segment, nut.file_element)) 

54 ks = set() 

55 for nut in data[file_path]: 

56 k = nut.file_segment, nut.file_element 

57 if k in ks: 

58 raise UniqueKeyRequired() 

59 

60 ks.add(k) 

61 

62 mtime = max(nut.file_mtime or 0 for nut in data[file_path]) 

63 old_mtime = data_mtimes.get(file_path, None) 

64 if old_mtime is None: 

65 data_mtimes[file_path] = mtime 

66 else: 

67 data_mtimes[file_path] = old_mtime + 1 

68 

69 

70def remove(file_paths): 

71 for file_path in file_paths: 

72 del data[file_path] 

73 

74 

75def iload(format, file_path, segment, content): 

76 assert format == 'virtual' 

77 

78 for nut in data[file_path]: 

79 if segment is None or segment == nut.file_segment: 

80 yield nut