Mbed Host Tests
mbed_base.py
Go to the documentation of this file.
1"""
2mbed SDK
3Copyright (c) 2011-2016 ARM Limited
4
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16
17Author: Przemyslaw Wirkus <Przemyslaw.Wirkus@arm.com>
18"""
19
20import json
21import os
22import mbed_lstools
23from time import sleep
24from mbed_host_tests import DEFAULT_BAUD_RATE
25import mbed_host_tests.host_tests_plugins as ht_plugins
26from mbed_host_tests.host_tests_logger import HtrunLogger
27
28
29class Mbed:
30 """! Base class for a host driven test
31 @details This class stores information about things like disk, port, serial speed etc.
32 Class is also responsible for manipulation of serial port between host and mbed device
33 """
34 def __init__(self, options):
35 """ ctor
36 """
37 # For compatibility with old mbed. We can use command line options for Mbed object
38 # or we can pass options directly from .
39 self.options = options
40 self.logger = HtrunLogger('MBED')
41 # Options related to copy / reset mbed device
42 self.port = self.options.port
43 self.disk = self.options.disk
44 self.target_id = self.options.target_id
45 self.image_path = self.options.image_path.strip('"') if self.options.image_path is not None else ''
46 self.copy_method = self.options.copy_method
47 self.retry_copy = self.options.retry_copy
48 self.program_cycle_s = float(self.options.program_cycle_s if self.options.program_cycle_s is not None else 2.0)
49 self.polling_timeout = self.options.polling_timeout
50
51 # Serial port settings
52 self.serial_baudserial_baud = DEFAULT_BAUD_RATE
54
55 # Users can use command to pass port speeds together with port name. E.g. COM4:115200:1
56 # Format if PORT:SPEED:TIMEOUT
57 port_config = self.port.split(':') if self.port else ''
58 if len(port_config) == 2:
59 # -p COM4:115200
60 self.port = port_config[0]
61 self.serial_baudserial_baud = int(port_config[1])
62 elif len(port_config) == 3:
63 # -p COM4:115200:0.5
64 self.port = port_config[0]
65 self.serial_baudserial_baud = int(port_config[1])
66 self.serial_timeout = float(port_config[2])
67
68 # Overriding baud rate value with command line specified value
69 self.serial_baudserial_baud = self.options.baud_rate if self.options.baud_rate else self.serial_baudserial_baud
70
71 # Test configuration in JSON format
72 self.test_cfg = None
73 if self.options.json_test_configuration is not None:
74 # We need to normalize path before we open file
75 json_test_configuration_path = self.options.json_test_configuration.strip("\"'")
76 try:
77 self.logger.prn_inf("Loading test configuration from '%s'..." % json_test_configuration_path)
78 with open(json_test_configuration_path) as data_file:
79 self.test_cfg = json.load(data_file)
80 except IOError as e:
81 self.logger.prn_err("Test configuration JSON file '{0}' I/O error({1}): {2}"
82 .format(json_test_configuration_path, e.errno, e.strerror))
83 except:
84 self.logger.prn_err("Test configuration JSON Unexpected error:", str(e))
85 raise
86
87 def copy_image(self, image_path=None, disk=None, copy_method=None, port=None, retry_copy=5):
88 """! Closure for copy_image_raw() method.
89 @return Returns result from copy plugin
90 """
91 def get_remount_count(disk_path, tries=2):
92 """! Get the remount count from 'DETAILS.TXT' file
93 @return Returns count, None if not-available
94 """
95 for cur_try in range(1, tries + 1):
96 try:
97 files_on_disk = [x.upper() for x in os.listdir(disk_path)]
98 if 'DETAILS.TXT' in files_on_disk:
99 with open(os.path.join(disk_path, 'DETAILS.TXT'), 'r') as details_txt:
100 for line in details_txt.readlines():
101 if 'Remount count:' in line:
102 return int(line.replace('Remount count: ', ''))
103 # Remount count not found in file
104 return None
105 # 'DETAILS.TXT file not found
106 else:
107 return None
108
109 except OSError as e:
110 self.logger.prn_err("Failed to get remount count due to OSError.", str(e))
111 self.logger.prn_inf("Retrying in 1 second (try %s of %s)" % (cur_try, tries))
112 sleep(1)
113 # Failed to get remount count
114 return None
115
116 def check_flash_error(target_id, disk, initial_remount_count):
117 """! Check for flash errors
118 @return Returns false if FAIL.TXT present, else true
119 """
120 if not target_id:
121 self.logger.prn_wrn("Target ID not found: Skipping flash check and retry")
122 return True
123
124 bad_files = set(['FAIL.TXT'])
125 # Re-try at max 5 times with 0.5 sec in delay
126 for i in range(5):
127 # mbed_lstools.create() should be done inside the loop. Otherwise it will loop on same data.
128 mbeds = mbed_lstools.create()
129 mbed_list = mbeds.list_mbeds() #list of mbeds present
130 # get first item in list with a matching target_id, if present
131 mbed_target = next((x for x in mbed_list if x['target_id']==target_id), None)
132
133 if mbed_target is not None:
134 if 'mount_point' in mbed_target and mbed_target['mount_point'] is not None:
135 if not initial_remount_count is None:
136 new_remount_count = get_remount_count(disk)
137 if not new_remount_count is None and new_remount_count == initial_remount_count:
138 sleep(0.5)
139 continue
140
141 common_items = []
142 try:
143 items = set([x.upper() for x in os.listdir(mbed_target['mount_point'])])
144 common_items = bad_files.intersection(items)
145 except OSError as e:
146 print("Failed to enumerate disk files, retrying")
147 continue
148
149 for common_item in common_items:
150 full_path = os.path.join(mbed_target['mount_point'], common_item)
151 self.logger.prn_err("Found %s"% (full_path))
152 bad_file_contents = "[failed to read bad file]"
153 try:
154 with open(full_path, "r") as bad_file:
155 bad_file_contents = bad_file.read()
156 except IOError as error:
157 self.logger.prn_err("Error opening '%s': %s" % (full_path, error))
158
159 self.logger.prn_err("Error file contents:\n%s" % bad_file_contents)
160 if common_items:
161 return False
162 sleep(0.5)
163 return True
164
165 # Set-up closure environment
166 if not image_path:
167 image_path = self.image_path
168 if not disk:
169 disk = self.disk
170 if not copy_method:
171 copy_method = self.copy_method
172 if not port:
173 port = self.port
174 if not retry_copy:
175 retry_copy = self.retry_copy
176 target_id = self.target_id
177
178 if not image_path:
179 self.logger.prn_err("Error: image path not specified")
180 return False
181
182 if not os.path.isfile(image_path):
183 self.logger.prn_err("Error: image file (%s) not found" % image_path)
184 return False
185
186 for count in range(0, retry_copy):
187 initial_remount_count = get_remount_count(disk)
188 # Call proper copy method
189 result = self.copy_image_raw(image_path, disk, copy_method, port)
190 sleep(self.program_cycle_s)
191 if not result:
192 continue
193 result = check_flash_error(target_id, disk, initial_remount_count)
194 if result:
195 break
196 return result
197
198 def copy_image_raw(self, image_path=None, disk=None, copy_method=None, port=None):
199 """! Copy file depending on method you want to use. Handles exception
200 and return code from shell copy commands.
201 @return Returns result from copy plugin
202 @details Method which is actually copying image to mbed
203 """
204 # image_path - Where is binary with target's firmware
205
206 # Select copy_method
207 # We override 'default' method with 'shell' method
208 copy_method = {
209 None : 'shell',
210 'default' : 'shell',
211 }.get(copy_method, copy_method)
212
213 result = ht_plugins.call_plugin('CopyMethod',
214 copy_method,
215 image_path=image_path,
216 serial=port,
217 destination_disk=disk,
218 target_id=self.target_id,
219 pooling_timeout=self.polling_timeout)
220 return result
221
222 def hw_reset(self):
223 """
224 Performs hardware reset of target ned device.
225
226 :return:
227 """
228 device_info = {}
229 result = ht_plugins.call_plugin('ResetMethod',
230 'power_cycle',
231 target_id=self.target_id,
232 device_info=device_info)
233 if result:
234 self.port = device_info['serial_port']
235 self.disk = device_info['mount_point']
236 return result
237
Base class for a host driven test.
Definition mbed_base.py:29
copy_image_raw(self, image_path=None, disk=None, copy_method=None, port=None)
Copy file depending on method you want to use.
Definition mbed_base.py:198
copy_image(self, image_path=None, disk=None, copy_method=None, port=None, retry_copy=5)
Closure for copy_image_raw() method.
Definition mbed_base.py:87