Coverage for mt940/models.py: 0%

177 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-02-23 05:07 +0000

1import re 

2import decimal 

3import datetime 

4 

5# python 3.8+ compatibility 

6try: # pragma: no cover 

7 from collections import abc 

8except ImportError: # pragma: no cover 

9 import collections as abc 

10 

11import mt940 

12 

13from . import _compat 

14from . import processors 

15 

16 

17class Model(object): 

18 

19 def __repr__(self): 

20 return '<%s>' % self.__class__.__name__ 

21 

22 

23class FixedOffset(datetime.tzinfo): 

24 '''Fixed time offset based on the Python docs 

25 Source: https://docs.python.org/2/library/datetime.html#tzinfo-objects 

26 

27 >>> offset = FixedOffset(60) 

28 >>> offset.utcoffset(None).total_seconds() 

29 3600.0 

30 >>> offset.dst(None) 

31 datetime.timedelta(0) 

32 >>> offset.tzname(None) 

33 '60' 

34 ''' 

35 

36 def __init__(self, offset=0, name=None): 

37 self._name = name or str(offset) 

38 

39 if not isinstance(offset, int): 

40 offset = int(offset) 

41 self._offset = datetime.timedelta(minutes=offset) 

42 

43 def utcoffset(self, dt): 

44 return self._offset 

45 

46 def dst(self, dt): 

47 return datetime.timedelta(0) 

48 

49 def tzname(self, dt): 

50 return self._name 

51 

52 

53class DateTime(datetime.datetime, Model): 

54 '''Just a regular datetime object which supports dates given as strings 

55 

56 >>> DateTime(year='2000', month='1', day='2', hour='3', minute='4', 

57 ... second='5', microsecond='6') 

58 DateTime(2000, 1, 2, 3, 4, 5, 6) 

59 

60 >>> DateTime(year='123', month='1', day='2', hour='3', minute='4', 

61 ... second='5', microsecond='6') 

62 DateTime(2123, 1, 2, 3, 4, 5, 6) 

63 

64 >>> DateTime(2000, 1, 2, 3, 4, 5, 6) 

65 DateTime(2000, 1, 2, 3, 4, 5, 6) 

66 

67 >>> DateTime(year='123', month='1', day='2', hour='3', minute='4', 

68 ... second='5', microsecond='6', tzinfo=FixedOffset('60')) 

69 DateTime(2123, 1, 2, 3, 4, 5, 6, tzinfo=<mt940.models.FixedOffset ...>) 

70 

71 Args: 

72 year (str): Year (0-100), will automatically add 2000 when needed 

73 month (str): Month 

74 day (str): Day 

75 hour (str): Hour 

76 minute (str): Minute 

77 second (str): Second 

78 microsecond (str): Microsecond 

79 tzinfo (tzinfo): Timezone information. Overwrites `offset` 

80 offset (str): Timezone offset in minutes, generates a tzinfo object 

81 with the given offset if no tzinfo is available. 

82 ''' 

83 

84 def __new__(cls, *args, **kwargs): 

85 if kwargs: 

86 values = dict( 

87 year=None, 

88 month=None, 

89 day=None, 

90 hour='0', 

91 minute='0', 

92 second='0', 

93 microsecond='0', ) 

94 

95 # The list makes sure this works in both Python 2 and 3 

96 for key, default in list(values.items()): 

97 # Fetch the value or the default 

98 value = kwargs.get(key, default) 

99 assert value is not None, '%s should not be None' % key 

100 # Convert the value to integer and force base 10 to make sure 

101 # it doesn't get recognized as octal 

102 if not isinstance(value, int): 

103 value = int(value, 10) 

104 

105 # Save the values again 

106 values[key] = value 

107 

108 if values['year'] < 1000: 

109 values['year'] += 2000 

110 

111 values['tzinfo'] = None 

112 

113 if kwargs.get('tzinfo'): 

114 values['tzinfo'] = kwargs['tzinfo'] 

115 

116 if kwargs.get('offset'): 

117 values['tzinfo'] = FixedOffset(kwargs['offset']) 

118 

119 return datetime.datetime.__new__(cls, **values) 

120 else: 

121 return datetime.datetime.__new__(cls, *args, **kwargs) 

122 

123 

124class Date(datetime.date, Model): 

125 '''Just a regular date object which supports dates given as strings 

126 

127 >>> Date(year='2000', month='1', day='2') 

128 Date(2000, 1, 2) 

129 

130 >>> Date(year='123', month='1', day='2') 

131 Date(2123, 1, 2) 

132 

133 Args: 

134 year (str): Year (0-100), will automatically add 2000 when needed 

135 month (str): Month 

136 day (str): Day 

137 ''' 

138 

139 def __new__(cls, *args, **kwargs): 

140 if kwargs: 

141 dt = DateTime(*args, **kwargs).date() 

142 

143 return datetime.date.__new__(cls, dt.year, dt.month, dt.day) 

144 else: 

145 return datetime.date.__new__(cls, *args, **kwargs) 

146 

147 

148class Amount(Model): 

149 '''Amount object containing currency and amount 

150 

151 Args: 

152 amount (str): Amount using either a , or a . as decimal separator 

153 status (str): Either C or D for credit or debit respectively 

154 currency (str): A 3 letter currency (e.g. EUR) 

155 

156 >>> Amount('123.45', 'C', 'EUR') 

157 <123.45 EUR> 

158 >>> Amount('123.45', 'D', 'EUR') 

159 <-123.45 EUR> 

160 ''' 

161 

162 def __init__(self, amount, status, currency=None, **kwargs): 

163 self.amount = decimal.Decimal(amount.replace(',', '.')) 

164 self.currency = currency 

165 

166 # C = credit, D = debit 

167 

168 if status == 'D': 

169 self.amount = -self.amount 

170 

171 def __eq__(self, other): 

172 return self.amount == other.amount and self.currency == other.currency 

173 

174 def __str__(self): 

175 return '%s %s' % (self.amount, self.currency) 

176 

177 def __repr__(self): 

178 return '<%s>' % self 

179 

180 

181class SumAmount(Amount): 

182 def __init__(self, *args, **kwargs): 

183 number = kwargs.pop('number') 

184 super(SumAmount, self).__init__(*args, **kwargs) 

185 self.number = number 

186 

187 def __repr__(self): 

188 return '<%s %s in %s stmts)>' % (self.amount, self.currency, 

189 self.number) 

190 

191 

192class Balance(Model): 

193 '''Parse balance statement 

194 

195 Args: 

196 status (str): Either C or D for credit or debit respectively 

197 amount (Amount): Object containing the amount and currency 

198 date (date): The balance date 

199 

200 >>> balance = Balance('C', '0.00', Date(2010, 7, 22)) 

201 >>> balance.status 

202 'C' 

203 >>> balance.amount.amount 

204 Decimal('0.00') 

205 >>> isinstance(balance.date, Date) 

206 True 

207 >>> balance.date.year, balance.date.month, balance.date.day 

208 (2010, 7, 22) 

209 

210 >>> Balance() 

211 <None @ None> 

212 ''' 

213 

214 def __init__(self, status=None, amount=None, date=None, **kwargs): 

215 if amount and not isinstance(amount, Amount): 

216 amount = Amount(amount, status, kwargs.get('currency')) 

217 self.status = status 

218 self.amount = amount 

219 self.date = date 

220 

221 def __eq__(self, other): 

222 return self.amount == other.amount and self.status == other.status 

223 

224 def __repr__(self): 

225 return '<%s>' % self 

226 

227 def __str__(self): 

228 return '%s @ %s' % ( 

229 self.amount, 

230 self.date, ) 

231 

232 

233class Transactions(abc.Sequence): 

234 ''' 

235 Collection of :py:class:`Transaction` objects with global properties such 

236 as begin and end balance 

237 

238 ''' 

239 

240 #: Using the processors you can pre-process data before creating objects 

241 #: and modify them after creating the objects 

242 DEFAULT_PROCESSORS = dict( 

243 pre_account_identification=[], 

244 post_account_identification=[], 

245 pre_available_balance=[], 

246 post_available_balance=[], 

247 pre_closing_balance=[], 

248 post_closing_balance=[], 

249 pre_intermediate_closing_balance=[], 

250 post_intermediate_closing_balance=[], 

251 pre_final_closing_balance=[], 

252 post_final_closing_balance=[], 

253 pre_forward_available_balance=[], 

254 post_forward_available_balance=[], 

255 pre_opening_balance=[], 

256 post_opening_balance=[], 

257 pre_intermediate_opening_balance=[], 

258 post_intermediate_opening_balance=[], 

259 pre_final_opening_balance=[], 

260 post_final_opening_balance=[], 

261 pre_related_reference=[], 

262 post_related_reference=[], 

263 pre_statement=[processors.date_fixup_pre_processor], 

264 post_statement=[ 

265 processors.date_cleanup_post_processor, 

266 processors.transactions_to_transaction('transaction_reference'), 

267 ], 

268 pre_statement_number=[], 

269 post_statement_number=[], 

270 pre_non_swift=[], 

271 post_non_swift=[], 

272 pre_transaction_details=[], 

273 post_transaction_details=[ 

274 processors.transaction_details_post_processor 

275 # processors.transaction_details_post_processor_with_space 

276 ], 

277 pre_transaction_reference_number=[], 

278 post_transaction_reference_number=[], 

279 pre_floor_limit_indicator=[], 

280 post_floor_limit_indicator=[], 

281 pre_date_time_indication=[], 

282 post_date_time_indication=[], 

283 pre_sum_credit_entries=[], 

284 post_sum_credit_entries=[], 

285 pre_sum_debit_entries=[], 

286 post_sum_debit_entries=[]) 

287 

288 def __getstate__(self): # pragma: no cover 

289 # Processors are not always safe to dump so ignore them entirely 

290 state = self.__dict__.copy() 

291 del state['processors'] 

292 return state 

293 

294 def __init__(self, processors=None, tags=None): 

295 self.processors = self.DEFAULT_PROCESSORS.copy() 

296 self.tags = Transactions.defaultTags().copy() 

297 

298 if processors: 

299 self.processors.update(processors) 

300 if tags: 

301 self.tags.update(tags) 

302 

303 self.transactions = [] 

304 self.data = {} 

305 

306 @property 

307 def currency(self): 

308 balance = mt940.utils.coalesce( 

309 self.data.get('final_opening_balance'), 

310 self.data.get('opening_balance'), 

311 self.data.get('intermediate_opening_balance'), 

312 self.data.get('available_balance'), 

313 self.data.get('forward_available_balance'), 

314 self.data.get('final_closing_balance'), 

315 self.data.get('closing_balance'), 

316 self.data.get('intermediate_closing_balance'), 

317 self.data.get('c_floor_limit'), 

318 self.data.get('d_floor_limit'), ) 

319 

320 if balance: 

321 if isinstance(balance, Amount): 

322 return balance.currency 

323 

324 return balance.amount.currency 

325 

326 @staticmethod 

327 def defaultTags(): 

328 return mt940.tags.TAG_BY_ID 

329 

330 @classmethod 

331 def strip(cls, lines): 

332 for line in lines: 

333 # We don't like carriage returns in case of Windows files so let's 

334 # just replace them with nothing 

335 line = line.replace('\r', '') 

336 

337 # Strip trailing whitespace from lines since they cause incorrect 

338 # files 

339 line = line.rstrip() 

340 

341 # Skip separators 

342 

343 if line.strip() == '-': 

344 continue 

345 

346 # Return actual lines 

347 

348 if line: 

349 yield line 

350 

351 @classmethod 

352 def normalize_tag_id(cls, tag_id): 

353 # Since non-digit tags exist, make the conversion optional 

354 if tag_id.isdigit(): 

355 tag_id = int(tag_id) 

356 

357 return tag_id 

358 

359 def sanitize_tag_id_matches(self, matches): 

360 i_next = 0 

361 for i, match in enumerate(matches): 

362 # match was rejected 

363 if i < i_next: 

364 continue 

365 

366 # next match would be 

367 i_next = i + 1 

368 

369 # normalize tag id 

370 tag_id = self.normalize_tag_id(match.group('tag')) 

371 

372 # tag should be known 

373 assert tag_id in self.tags, 'Unknown tag %r ' \ 

374 'in line: %r' % (tag_id, match.group(0)) 

375 

376 # special treatment for long tag content with possible 

377 # bad line wrap which produces tag_id like line beginnings 

378 # seen with :86: tag 

379 if tag_id == mt940.tags.Tags.TRANSACTION_DETAILS.value.id: 

380 # search subsequent tags for unknown tag ids 

381 # these lines likely belong to the previous tag 

382 for j in range(i_next, len(matches)): 

383 next_tag_id = self.normalize_tag_id( 

384 matches[j].group('tag')) 

385 if next_tag_id in self.tags: 

386 # this one is the next valid match 

387 i_next = j 

388 break 

389 # else reject match 

390 

391 # a valid match 

392 yield match 

393 

394 def parse(self, data): 

395 '''Parses mt940 data, expects a string with data 

396 

397 Args: 

398 data (str): The MT940 data 

399 

400 Returns: :py:class:`list` of :py:class:`Transaction` 

401 ''' 

402 # Remove extraneous whitespace and such 

403 data = '\n'.join(self.strip(data.split('\n'))) 

404 

405 # The pattern is a bit annoying to match by regex, even with a greedy 

406 # match it's difficult to get both the beginning and the end so we're 

407 # working around it in a safer way to get everything. 

408 tag_re = re.compile( 

409 r'^:\n?(?P<full_tag>(?P<tag>[0-9]{2}|NS)(?P<sub_tag>[A-Z])?):', 

410 re.MULTILINE) 

411 matches = list(tag_re.finditer(data)) 

412 

413 # identify valid matches 

414 valid_matches = list(self.sanitize_tag_id_matches(matches)) 

415 

416 for i, match in enumerate(valid_matches): 

417 tag_id = self.normalize_tag_id(match.group('tag')) 

418 

419 # get tag instance corresponding to tag id 

420 tag = self.tags.get(match.group('full_tag')) \ 

421 or self.tags[tag_id] 

422 

423 # Nice trick to get all the text that is part of this tag, python 

424 # regex matches have a `end()` and `start()` to indicate the start 

425 # and end index of the match. 

426 

427 if valid_matches[i + 1:i + 2]: 

428 tag_data = \ 

429 data[match.end():valid_matches[i + 1].start()].strip() 

430 else: 

431 tag_data = data[match.end():].strip() 

432 

433 tag_dict = tag.parse(self, tag_data) 

434 

435 # Preprocess data before creating the object 

436 

437 for processor in self.processors.get('pre_%s' % tag.slug, []): 

438 tag_dict = processor(self, tag, tag_dict) 

439 

440 result = tag(self, tag_dict) 

441 

442 # Postprocess the object 

443 

444 for processor in self.processors.get('post_%s' % tag.slug, []): 

445 result = processor(self, tag, tag_dict, result) 

446 

447 # Creating a new transaction for :20: and :61: tags allows the 

448 # tags from :20: to :61: to be captured as part of the transaction. 

449 

450 if isinstance(tag, mt940.tags.Statement): 

451 # Transactions only get a Transaction Reference Code ID from a 

452 # :61: tag which is why a new transaction is created if the 

453 # 'id' has a value. 

454 

455 if not self.transactions: 

456 transaction = Transaction(self) 

457 self.transactions.append(transaction) 

458 

459 if transaction.data.get('id'): 

460 transaction = Transaction(self, result) 

461 self.transactions.append(transaction) 

462 else: 

463 transaction.data.update(result) 

464 elif issubclass(tag.scope, Transaction) and self.transactions: 

465 # Combine multiple results together as one string, Rabobank has 

466 # multiple :86: tags for a single transaction 

467 

468 for k, v in _compat.iteritems(result): 

469 if k in transaction.data and hasattr(v, 'strip'): 

470 transaction.data[k] += '\n%s' % v.strip() 

471 else: 

472 transaction.data[k] = v 

473 

474 elif issubclass(tag.scope, Transactions): # pragma: no branch 

475 self.data.update(result) 

476 

477 return self.transactions 

478 

479 def __getitem__(self, key): 

480 return self.transactions[key] 

481 

482 def __len__(self): 

483 return len(self.transactions) 

484 

485 def __repr__(self): 

486 return '<%s[%s]>' % ( 

487 self.__class__.__name__, 

488 ']['.join('%s: %s' % (k.replace('_balance', ''), v) 

489 for k, v in _compat.iteritems(self.data) 

490 if k.endswith('balance'))) 

491 

492 

493class Transaction(Model): 

494 def __init__(self, transactions, data=None): 

495 self.transactions = transactions 

496 self.data = {} 

497 self.update(data) 

498 

499 def update(self, data): 

500 if data: 

501 self.data.update(data) 

502 

503 def __repr__(self): 

504 return '<%s[%s] %s>' % ( 

505 self.__class__.__name__, 

506 self.data.get('date'), 

507 self.data.get('amount'), )