Source code for pyrocko.io.tdms_idas

import logging
import os.path as op
import struct
import datetime
import mmap
import numpy as num

from pyrocko import trace

logger = logging.getLogger(__name__)


def write_property_dict(prop_dict, out_file):
    from pprint import pformat

    f = open(out_file, "w")
    f.write("tdms_property_map=")
    f.write(pformat(prop_dict))
    f.close()


[docs]def type_not_supported(vargin): """Function raises a NotImplementedException.""" raise NotImplementedError("Reading of this tdsDataType is not implemented")
[docs]def parse_time_stamp(fractions, seconds): """ Convert time TDMS time representation to datetime fractions -- fractional seconds (2^-64) seconds -- The number of seconds since 1/1/1904 @rtype : datetime.datetime """ if ( fractions is not None and seconds is not None and fractions + seconds > 0 ): return datetime.timedelta( 0, fractions * 2 ** -64 + seconds ) + datetime.datetime(1904, 1, 1) else: return None
# Enum mapping TDM data types to description string, numpy type where exists # See Ref[2] for enum values TDS_DATA_TYPE = dict( { 0x00: "void", # tdsTypeVoid 0x01: "int8", # tdsTypeI8 0x02: "int16", # tdsTypeI16 0x03: "int32", # tdsTypeI32 0x04: "int64", # tdsTypeI64 0x05: "uint8", # tdsTypeU8 0x06: "uint16", # tdsTypeU16 0x07: "uint32", # tdsTypeU32 0x08: "uint64", # tdsTypeU64 0x09: "float32", # tdsTypeSingleFloat 0x0A: "float64", # tdsTypeDoubleFloat 0x0B: "float128", # tdsTypeExtendedFloat 0x19: "singleFloatWithUnit", # tdsTypeSingleFloatWithUnit 0x1A: "doubleFloatWithUnit", # tdsTypeDoubleFloatWithUnit 0x1B: "extendedFloatWithUnit", # tdsTypeExtendedFloatWithUnit 0x20: "str", # tdsTypeString 0x21: "bool", # tdsTypeBoolean 0x44: "datetime", # tdsTypeTimeStamp 0xFFFFFFFF: "raw", # tdsTypeDAQmxRawData } ) # Function mapping for reading TDMS data types TDS_READ_VAL = dict( { "void": lambda f: None, # tdsTypeVoid "int8": lambda f: struct.unpack("<b", f.read(1))[0], "int16": lambda f: struct.unpack("<h", f.read(2))[0], "int32": lambda f: struct.unpack("<i", f.read(4))[0], "int64": lambda f: struct.unpack("<q", f.read(8))[0], "uint8": lambda f: struct.unpack("<B", f.read(1))[0], "uint16": lambda f: struct.unpack("<H", f.read(2))[0], "uint32": lambda f: struct.unpack("<I", f.read(4))[0], "uint64": lambda f: struct.unpack("<Q", f.read(8))[0], "float32": lambda f: struct.unpack("<f", f.read(4))[0], "float64": lambda f: struct.unpack("<d", f.read(8))[0], "float128": type_not_supported, "singleFloatWithUnit": type_not_supported, "doubleFloatWithUnit": type_not_supported, "extendedFloatWithUnit": type_not_supported, "str": lambda f: f.read(struct.unpack("<i", f.read(4))[0]), "bool": lambda f: struct.unpack("<?", f.read(1))[0], "datetime": lambda f: parse_time_stamp( struct.unpack("<Q", f.read(8))[0], struct.unpack("<q", f.read(8))[0], ), "raw": type_not_supported, } ) DECIMATE_MASK = 0b00100000 LEAD_IN_LENGTH = 28 FILEINFO_NAMES = ( "file_tag", "toc", "version", "next_segment_offset", "raw_data_offset", )
[docs]class TdmsReader(object): """A TDMS file reader object for reading properties and data""" def __init__(self, filename): self._properties = None self._end_of_properties_offset = None self._data_type = None self._chunk_size = None self._raw_data = None self._raw_data2 = None # The mapped data in the 'Next Segment' self._raw_last_chunk = None self._raw2_last_chunk = None self.file_size = op.getsize(filename) self._channel_length = None self._seg1_length = None self._seg2_length = None # TODO: Error if file not big enough to hold header self._tdms_file = open(filename, "rb") # Read lead in (28 bytes): lead_in = self._tdms_file.read(LEAD_IN_LENGTH) # lead_in is 28 bytes: # [string of length 4][int32][int32][int64][int64] fields = struct.unpack("<4siiQQ", lead_in) if fields[0].decode() not in "TDSm": msg = "Not a TDMS file (TDSm tag not found)" raise (TypeError, msg) self.fileinfo = dict(zip(FILEINFO_NAMES, fields)) self.fileinfo["decimated"] = not bool( self.fileinfo["toc"] & DECIMATE_MASK ) # Make offsets relative to beginning of file: self.fileinfo["next_segment_offset"] += LEAD_IN_LENGTH self.fileinfo["raw_data_offset"] += LEAD_IN_LENGTH self.fileinfo["file_size"] = op.getsize(self._tdms_file.name) # TODO: Validate lead in: if self.fileinfo["next_segment_offset"] > self.file_size: self.fileinfo["next_segment_offset"] = self.file_size # raise(ValueError, "Next Segment Offset too large in TDMS header") def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self._tdms_file.close() @property def channel_length(self): if self._properties is None: self.get_properties() rdo = num.int64(self.fileinfo["raw_data_offset"]) nch = num.int64(self.n_channels) nso = self.fileinfo["next_segment_offset"] return num.int64( (nso - rdo) / nch / num.dtype(self._data_type).itemsize) @property def n_channels(self): if self._properties is None: self.get_properties() return self.fileinfo['n_channels']
[docs] def get_properties(self, mapped=False): """ Return a dictionary of properties. Read from file only if necessary. """ # Check if already hold properties in memory if self._properties is None: self._properties = self._read_properties() return self._properties
def _read_property(self): """ Read a single property from the TDMS file. Return the name, type and value of the property as a list. """ # Read length of object path: var = struct.unpack("<i", self._tdms_file.read(4))[0] # Read property name and type: name, data_type = struct.unpack( "<{0}si".format(var), self._tdms_file.read(var + 4) ) # Lookup function to read and parse property value based on type: value = TDS_READ_VAL[TDS_DATA_TYPE[data_type]](self._tdms_file) name = name.decode() if data_type == 32: value = value.decode() return name, data_type, value def _read_properties(self): """Read the properties from the file""" self._tdms_file.seek(LEAD_IN_LENGTH, 0) # Number of channels is total objects - file objects - group objects self.fileinfo["n_channels"] = ( struct.unpack("i", self._tdms_file.read(4))[0] - 2 ) # Read length of object path: var = struct.unpack("<i", self._tdms_file.read(4))[0] # skip over object path and raw data index: self._tdms_file.seek(var + 4, 1) # Read number of properties in this group: var = struct.unpack("<i", self._tdms_file.read(4))[0] # loop through and read each property properties = [self._read_property() for _ in range(var)] properties = {prop: value for (prop, type, value) in properties} self._end_of_properties_offset = self._tdms_file.tell() self._read_chunk_size() # TODO: Add number of channels to properties return properties def _read_chunk_size(self): """Read the data chunk size from the TDMS file header.""" if self._end_of_properties_offset is None: self._read_properties() self._tdms_file.seek(self._end_of_properties_offset, 0) # skip over Group Information: var = struct.unpack("<i", self._tdms_file.read(4))[0] self._tdms_file.seek(var + 8, 1) # skip over first channel path and length of index information: var = struct.unpack("<i", self._tdms_file.read(4))[0] self._tdms_file.seek(var + 4, 1) self._data_type = TDS_DATA_TYPE.get( struct.unpack("<i", self._tdms_file.read(4))[0] ) if self._data_type not in ("int16", "float32"): raise Exception("Unsupported TDMS data type: " + self._data_type) # Read Dimension of the raw data array (has to be 1): # dummy = struct.unpack("<i", self._tdms_file.read(4))[0] self._chunk_size = struct.unpack("<i", self._tdms_file.read(4))[0]
[docs] def get_data(self, first_ch=0, last_ch=None, first_s=0, last_s=None): """ Get a block of data from the TDMS file. first_ch -- The first channel to load last_ch -- The last channel to load first_s -- The first sample to load last_s -- The last sample to load """ if self._raw_data is None: self._initialise_data() if first_ch is None or first_ch < 0: first_ch = 0 if last_ch is None or last_ch >= self.n_channels: last_ch = self.n_channels else: last_ch += 1 if last_s is None or last_s > self._channel_length: last_s = self._channel_length else: last_s += 1 nch = num.int64(max(last_ch - first_ch, 0)) ns = num.int64(max(last_s - first_s, 0)) # Allocate output container data = num.empty((ns, nch), dtype=num.dtype(self._data_type)) if data.size == 0: return data # 1. Index first block & reshape? first_blk = first_s // self._chunk_size last_blk = last_s // self._chunk_size last_full_blk = min(last_blk + 1, self._raw_data.shape[1]) nchunk = min( max(last_full_blk - first_blk, 0), self._raw_data.shape[1] ) first_s_1a = max(first_s - first_blk * self._chunk_size, 0) last_s_1a = min( last_s - first_blk * self._chunk_size, nchunk * self._chunk_size ) ind_s = 0 ind_e = ind_s + max(last_s_1a - first_s_1a, 0) d = self._raw_data[:, first_blk:last_full_blk, first_ch:last_ch] d.shape = (self._chunk_size * nchunk, nch) d.reshape((self._chunk_size * nchunk, nch), order="F") data[ind_s:ind_e, :] = d[first_s_1a:last_s_1a, :] # 2. Index first additional samples first_s_1b = max( first_s - self._raw_data.shape[1] * self._chunk_size, 0 ) last_s_1b = min( last_s - self._raw_data.shape[1] * self._chunk_size, self._raw_last_chunk.shape[0], ) ind_s = ind_e ind_e = ind_s + max(last_s_1b - first_s_1b, 0) # data_1b = self._raw_last_chunk[first_s_1b:last_s_1b,first_ch:last_ch] if ind_e > ind_s: data[ind_s:ind_e, :] = self._raw_last_chunk[ first_s_1b:last_s_1b, first_ch:last_ch ] # 3. Index second block first_s_2 = max(first_s - self._seg1_length, 0) last_s_2 = last_s - self._seg1_length if (first_s_2 > 0 or last_s_2 > 0) and self._raw_data2 is not None: first_blk_2 = max(first_s_2 // self._chunk_size, 0) last_blk_2 = max(last_s_2 // self._chunk_size, 0) last_full_blk_2 = min(last_blk_2 + 1, self._raw_data2.shape[1]) nchunk_2 = min( max(last_full_blk_2 - first_blk_2, 0), self._raw_data2.shape[1] ) first_s_2a = max(first_s_2 - first_blk_2 * self._chunk_size, 0) last_s_2a = min( last_s_2 - first_blk_2 * self._chunk_size, nchunk_2 * self._chunk_size, ) ind_s = ind_e ind_e = ind_s + max(last_s_2a - first_s_2a, 0) # data_2a = self._raw_data2[:, first_blk_2:last_full_blk_2, # first_ch:last_ch]\ # .reshape((self._chunk_size*nchunk_2, nch), order='F')\ # [first_s_2a:last_s_2a, :] if ind_e > ind_s: data[ind_s:ind_e, :] = self._raw_data2[ :, first_blk_2:last_full_blk_2, first_ch:last_ch ].reshape((self._chunk_size * nchunk_2, nch), order="F")[ first_s_2a:last_s_2a, : ] # 4. Index second additional samples if ( first_s_2 > 0 or last_s_2 > 0 ) and self._raw2_last_chunk is not None: first_s_2b = max( first_s_2 - self._raw_data2.shape[1] * self._chunk_size, 0 ) last_s_2b = min( last_s_2 - self._raw_data2.shape[1] * self._chunk_size, self._raw2_last_chunk.shape[0], ) ind_s = ind_e ind_e = ind_s + max(last_s_2b - first_s_2b, 0) # data_2b = \ # self._raw2_last_chunk[first_s_2b:last_s_2b,first_ch:last_ch] if ind_e > ind_s: data[ind_s:ind_e, :] = self._raw2_last_chunk[ first_s_2b:last_s_2b, first_ch:last_ch ] # 5. Concatenate blocks # data = num.concatenate((data_1a, data_1b, data_2a, data_2b)) if data.size == 0: data = data.reshape(0, 0) return data
def _initialise_data(self): """Initialise the memory map for the data array.""" if self._chunk_size is None: self._read_chunk_size() dmap = mmap.mmap(self._tdms_file.fileno(), 0, access=mmap.ACCESS_READ) rdo = num.int64(self.fileinfo["raw_data_offset"]) nch = num.int64(self.n_channels) # TODO: Support streaming file type? # TODO: Is this a valid calculation for ChannelLength? nso = self.fileinfo["next_segment_offset"] self._seg1_length = num.int64( (nso - rdo) / nch / num.dtype(self._data_type).itemsize ) self._channel_length = self._seg1_length if self.fileinfo["decimated"]: n_complete_blk = num.int64(self._seg1_length / self._chunk_size) ax_ord = "C" else: n_complete_blk = 0 ax_ord = "F" self._raw_data = num.ndarray( (n_complete_blk, nch, self._chunk_size), dtype=self._data_type, buffer=dmap, offset=rdo, ) # Rotate the axes to [chunk_size, nblk, nch] self._raw_data = num.rollaxis(self._raw_data, 2) additional_samples = num.int64( self._seg1_length - n_complete_blk * self._chunk_size ) additional_samples_offset = ( rdo + n_complete_blk * nch * self._chunk_size * num.dtype(self._data_type).itemsize ) self._raw_last_chunk = num.ndarray( (nch, additional_samples), dtype=self._data_type, buffer=dmap, offset=additional_samples_offset, order=ax_ord, ) # Rotate the axes to [samples, nch] self._raw_last_chunk = num.rollaxis(self._raw_last_chunk, 1) if self.file_size == nso: self._seg2_length = 0 else: self._tdms_file.seek(nso + 12, 0) (seg2_nso, seg2_rdo) = struct.unpack( "<qq", self._tdms_file.read(2 * 8) ) self._seg2_length = ( (seg2_nso - seg2_rdo) / nch / num.dtype(self._data_type).itemsize ) if self.fileinfo["decimated"]: n_complete_blk2 = num.int64( self._seg2_length / self._chunk_size) else: n_complete_blk2 = num.int64(0) self._raw_data2 = num.ndarray( (n_complete_blk2, nch, self._chunk_size), dtype=self._data_type, buffer=dmap, offset=(nso + LEAD_IN_LENGTH + seg2_rdo), ) self._raw_data2 = num.rollaxis(self._raw_data2, 2) additional_samples = num.int64( self._seg2_length - n_complete_blk2 * self._chunk_size ) additional_samples_offset = ( nso + LEAD_IN_LENGTH + seg2_rdo + n_complete_blk2 * nch * self._chunk_size * num.dtype(self._data_type).itemsize ) self._raw2_last_chunk = num.ndarray( (nch, additional_samples), dtype=self._data_type, buffer=dmap, offset=additional_samples_offset, order=ax_ord, ) # Rotate the axes to [samples, nch] self._raw2_last_chunk = num.rollaxis(self._raw2_last_chunk, 1) if self._raw_data2.size != 0 or self._raw2_last_chunk.size != 0: pass # raise Exception('Second segment contains some data, \ # not currently supported') self._channel_length = self._seg1_length + self._seg2_length
# else: # print "Not decimated" # raise Exception('Reading file with decimated flag not set is not' # ' supported yet') META_KEYS = { 'measure_length': 'MeasureLength[m]', 'start_position': 'StartPosition[m]', 'spatial_resolution': 'SpatialResolution[m]', 'fibre_index': 'FibreIndex', 'unit_calibration': 'Unit Calibration (nm)', 'start_distance': 'Start Distance (m)', 'stop_distance': 'Stop Distance (m)', 'normalization': 'Normalization', 'decimation_filter': 'Decimation Filter', 'gauge_length': 'GaugeLength', 'norm_offset': 'Norm Offset', 'source_mode': 'Source Mode', 'time_decimation': 'Time Decimation', 'zero_offset': 'Zero Offset (m)', 'p_parameter': 'P', 'p_coefficients': 'P Coefficients', 'idas_version': 'iDASVersion', 'precice_sampling_freq': 'Precise Sampling Frequency (Hz)', 'receiver_gain': 'Receiver Gain', 'continuous_mode': 'Continuous Mode', 'geo_lat': 'SystemInfomation.GPS.Latitude', 'geo_lon': 'SystemInfomation.GPS.Longitude', 'geo_elevation': 'SystemInfomation.GPS.Altitude', 'channel': None, 'unit': None } def get_meta(tdms_properties): prop = tdms_properties deltat = 1. / prop['SamplingFrequency[Hz]'] tmin = prop['GPSTimeStamp'].timestamp() fibre_meta = {key: prop.get(key_map, -1) for key, key_map in META_KEYS.items() if key_map is not None} coeff = fibre_meta['p_coefficients'] try: coeff = tuple(map(float, coeff.split('\t'))) except ValueError: coeff = tuple(map(float, coeff.split(';'))) gain = fibre_meta['receiver_gain'] try: gain = tuple(map(float, gain.split('\t'))) except ValueError: gain = tuple(map(float, gain.split(';'))) fibre_meta['receiver_gain'] = coeff fibre_meta['unit'] = 'radians' return deltat, tmin, fibre_meta def iload(filename, load_data=True): tdms = TdmsReader(filename) deltat, tmin, meta = get_meta(tdms.get_properties()) data = tdms.get_data().T.copy() if load_data else None for icha in range(tdms.n_channels): meta_cha = meta.copy() assert icha < 99999 station = '%05i' % icha meta_cha['channel'] = icha tr = trace.Trace( network='DA', station=station, ydata=None, deltat=deltat, tmin=tmin, meta=meta_cha) if data is not None: tr.set_ydata(data[icha]) yield tr def detect(first512): return first512.startswith(b'TDSm.')