An error occurred while loading the file. Please try again.
-
Wan Xia authored4b5e9f80
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
import glob
import json
import os
import shutil
import subprocess
import sys
import time
from collections import defaultdict
from datetime import datetime
from ftplib import FTP
from multiprocessing import Process
from queue import Queue
from threading import Thread
import cv2
import numpy as np
import requests
import pickle
from socket import socket, AF_INET, SOCK_STREAM
# =================== sage utils======================================
def make_dir(target_path: str):
if target_path:
if os.path.exists(target_path):
return
else:
parent_path, _ = os.path.split(target_path)
if parent_path == '/':
os.mkdir(target_path)
return
elif not os.path.exists(parent_path):
make_dir(parent_path)
os.mkdir(target_path)
class ParamManager:
@classmethod
def check(cls, config_path):
if not os.path.exists(config_path):
my_print(f'{config_path} not existed!', 'ERROR')
sys.exit(0)
@staticmethod
def mkdir(dir_path):
if not os.path.exists(dir_path):
make_dir(dir_path)
def __init__(self, config_path):
self.check(config_path)
conf = open(config_path, 'r')
param_dict = json.load(conf)
conf.close()
try:
self.log_path = param_dict['log_path']
self.mkdir(self.log_path)
self.device_path = param_dict['device_path']
self.mkdir(self.device_path)
self.target_path, self.device_id = os.path.split(self.device_path)
self.limit_size = int(param_dict['limit_size'])
self.root_path = param_dict['root_path']
self.days = float(param_dict['days'])
self.cam_id = param_dict['cam_id']
self.base_port = int(param_dict['base_port'])
self.buffer_size = int(param_dict['buffer_size'])
self.default_width = int(param_dict['default_width'])
self.default_height = int(param_dict['default_height'])
self.mtx_file_path = param_dict['mtx_file_path']
self.remote_host = param_dict['remote_host']
self.port = int(param_dict['port'])
self.tcp_port = int(param_dict['tcp_port'])
self.username = param_dict['username']
self.psw = param_dict['psw']
self.remote_dir = param_dict['remote_dir']
self.cam_exp = int(param_dict['cam_exp'])
self.upload_max = int(param_dict['multi-thread(upload)'])
self.ocr_post_address = param_dict['ocr_post_address'] # 各项参数见文档站说明
self.image_num_per_level = int(param_dict['image_num_per_level'])
self.server_port = int(param_dict['server_port'])
self.level2cam = param_dict['cam_id']
except Exception as e:
my_print(f'load param from {config_path} failed!\n{e}', 'ERROR')
sys.exit(0)
def get_stime():
return str(datetime.now()).split('.')[0].replace('-', '').replace(' ', '').replace(':', '')
def my_print(msg, msg_type='INFO'):
print(f'{get_stime()}||[{msg_type}]: {msg}')
class Logger(object):
def __init__(self, filepath, log_type, stream=sys.stdout):
self.terminal = stream
self.log = open(filepath, 'a')
self.log_type = f'.{log_type}' if log_type else log_type
self.log_dir_path, self.filename = os.path.split(filepath)
self.file_date = self.filename[:14]
self.filepath = filepath
def write(self, message):
self.terminal.write(message)
self.terminal.flush()
self.check_log()
self.log.write(message)
self.log.flush()
def check_log(self):
if (datetime.now() - datetime.strptime(self.file_date, '%Y%m%d%H%M%S')).days > 1:
self.log.close()
self.filename = get_stime()
self.filepath = f'{os.path.join(self.log_dir_path, self.filename)}{self.log_type}.log'
self.file_date = self.filename[:14]
self.log = open(self.filepath, 'a')
check_log(self.log_dir_path)
def flush(self):
pass
class SaveTool:
task_id = None
task_path = None
pm: ParamManager = None
@classmethod
def check_res(cls):
while True:
vision_res = glob.glob(f'{cls.pm.device_path}/*')
vision_res.sort(key=lambda x: float(os.path.split(x)[-1].split('.')[0]))
if cls.pm.days >= 0.5:
limit_hour = cls.pm.days * 24
for video_dir_path in vision_res:
try:
video_dir = os.path.split(video_dir_path)[-1]
file_date = video_dir[:14]
d_datetime = (datetime.now() - datetime.strptime(file_date, '%Y%m%d%H%M%S'))
existed_hour = d_datetime.days * 24 + d_datetime.seconds / 3600
if existed_hour <= limit_hour:
break
my_print(
f'{video_dir} exists {existed_hour / 24:.1f} days ')
shutil.rmtree(video_dir_path)
my_print(f'{video_dir_path} deleted')
except Exception as e:
my_print(f'delete {video_dir_path} error, {e}', 'ERROR')
break
elif len(vision_res) >= 2 and cls.pm.root_path:
disk_info = os.statvfs(cls.pm.root_path) # absolute path
free_size_1 = disk_info.f_bavail * disk_info.f_frsize / 1024 / 1024 / 1024
free_size_2 = disk_info.f_bavail * disk_info.f_bsize / 1024 / 1024 / 1024
if free_size_1 < cls.pm.limit_size or free_size_2 < cls.pm.limit_size:
target_path = vision_res[0]
try:
shutil.rmtree(target_path)
my_print(
f'free size 1 ={free_size_1}, free size 2 ={free_size_2}, delete {target_path}')
except Exception as e:
my_print(f'try to delete result dir={target_path}, {e}', 'ERROR')
else:
break
else:
break
@classmethod
def check_path(cls):
my_print(f'current task path = {cls.task_path}')
if not os.path.exists(cls.task_path):
my_print(f'try to create task path = {cls.task_path}')
make_dir(cls.task_path)
cls.check_res()
@classmethod
def set_task_path(cls):
cls.task_id = f'{get_stime()}{str(time.time() * 1000).split(".")[0][-3:]}'
cls.task_path = f'{cls.pm.device_path}/{cls.task_id}'
cls.check_path()
def set_log(pm: ParamManager, log_type=''):
_log_type = f'.{log_type}' if log_type else log_type
SaveTool.pm = pm
check_log(pm.log_path)
log_time = get_stime()
sys.stdout = Logger(f'{os.path.join(pm.log_path, log_time)}{_log_type}.log', log_type, sys.stdout)
sys.stderr = Logger(f'{os.path.join(pm.log_path, log_time)}{_log_type}.log', log_type, sys.stderr)
def check_log(log_path):
if not os.path.exists(log_path):
try:
os.mkdir(log_path)
except Exception as e:
my_print(f'make dir = {log_path}, {e}', 'ERROR')
else:
logs = os.listdir(log_path)
if len(logs) > 50:
logs.sort(key=lambda x: float(x.split('.')[0]))
for del_log in logs[:-20]:
try:
os.remove(os.path.join(log_path, del_log))
except Exception as e:
my_print(f'remove log={del_log} fail, {e}', 'ERROR')
class CamControl:
def __init__(self, pm: ParamManager, cam_id):
self.th = None
self.cam_id = cam_id
self.pm = pm
self.open_flag = False
self.cam_gen = None
self.mtx = None
self.dist = None
self.mtx_flag = False
self.flag = False
self.cap = None
self.img_gen = None
self.buffsize = 1
self.default_img = np.zeros((pm.default_height, pm.default_width, 3), dtype=np.uint8)
def open(self):
self.set_mtx()
self.open_cam()
def open_cam(self):
cap = cv2.VideoCapture(self.pm.cam_id[self.cam_id], cv2.CAP_V4L2)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 1)
cap.set(cv2.CAP_PROP_EXPOSURE, self.pm.cam_exp)
my_print(f'EXPOSURE:{cap.get(cv2.CAP_PROP_EXPOSURE)}')
self.flag = cap.isOpened()
my_print(f'open_flag:{self.flag}')
if self.flag:
self.img_gen = self.img_generator()
def img_generator(self):
while True:
if self.flag:
buffsize_cnt = self.buffsize + 1
while buffsize_cnt > 0:
try:
ret, f = self.cap.read()
except Exception as e:
my_print(f'cam{self.cam_id}-{self.pm.cam_id[self.cam_id]} broken, {e}', 'ERROR')
f = None
self.flag = False
break
buffsize_cnt -= 1
my_print(f'cap read')
else:
f = None
if not isinstance(f, np.ndarray):
f = None
yield f
def set_mtx(self):
mtx_file = self.pm.mtx_file_path.format(self.pm.cam_id[self.cam_id])
if os.path.exists(mtx_file):
f = open(mtx_file, 'rb')
dist_p = pickle.load(f, )
f.close()
self.mtx = dist_p['mtx']
self.dist = dist_p['dist']
self.mtx_flag = True
my_print(f'{mtx_file} found')
else:
my_print(f'{mtx_file} not existed')
def save_img_msg(self, client_sock, msg):
msg_params = str(msg)[2:-1].split('||')
if len(msg_params) == 6:
# cam_id, target_path, device_id, task_id, postion = msg_params
cam_idx, device_path, task_id, shelf_id, level_id, img_name = msg_params
try:
img = None
if self.img_gen is not None:
img = next(self.img_gen)
except Exception as e:
img = None
if img is None:
img = self.default_img
my_print(f'img is None', 'ERROR')
img_size = (img.shape[1], img.shape[0])
if self.mtx_flag:
new_camera_mtx, roi = cv2.getOptimalNewCameraMatrix(self.mtx, self.dist, img_size, 0, img_size)
img = cv2.undistort(img, self.mtx, self.dist, None, new_camera_mtx)
save_path = f'{device_path}/{task_id}/{shelf_id}/{level_id}'
img_file = f'{img_name}.{task_id}.{shelf_id}.{level_id}.{cam_idx}.0.png'
cv2.imwrite(f'{save_path}/{img_file}', img)
my_print(f'cam{self.cam_id}-{self.pm.cam_id[self.cam_id]} save img done')
client_sock.sendall(f'cam{self.cam_id}-{self.pm.cam_id[self.cam_id]} done'.encode())
else:
my_print(f'msg error', 'ERROR')
client_sock.sendall(f'msg error'.encode())
def create_cap_process(pm: ParamManager):
for i in range(len(pm.cam_id)):
Process(target=cam_server, args=(pm, i,), daemon=True, name=f'Process - cam{i}-{pm.cam_id[i]}').start()
my_print(f'Process - cam{i}-{pm.cam_id[i]} - created!')
def create_cap_socket(pm: ParamManager):
cam_socket_dict = {}
time.sleep(3)
for cam_id in range(len(pm.cam_id)):
tmp_socket = socket(AF_INET, SOCK_STREAM)
retry_cnt = 0
connected = False
while True:
my_print(f'try to connect cam{cam_id} server', 'MAIN')
try:
tmp_socket.connect(('127.0.0.1', pm.base_port + cam_id))
my_print(f'connect cam{cam_id} server successful')
connected = True
break
except Exception as e:
my_print(f'connect cam{cam_id} server error, {e}')
pass
time.sleep(3)
retry_cnt += 1
if retry_cnt > 3:
break
if not connected:
tmp_socket = None
cam_socket_dict[cam_id] = tmp_socket
return cam_socket_dict
def cam_server(pm: ParamManager, cam_id: int):
def echo_handler(address, client_sock):
my_print('Got connection from {}'.format(address))
while True:
msg = client_sock.recv(8192)
if not msg:
break
my_print(f'cam{cam_id}-{pm.cam_id[cam_id]} received msg={msg}')
cam_ctrl.save_img_msg(client_sock, msg)
client_sock.close()
def echo_server(address, backlog=5):
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(address)
sock.listen(backlog)
while True:
client_sock, client_addr = sock.accept()
echo_handler(client_addr, client_sock)
cam_ctrl = CamControl(pm, cam_id)
set_log(pm, f'cam{cam_id}-{pm.cam_id[cam_id]}')
cam_ctrl.open()
release_port(pm.base_port + cam_id)
echo_server(('127.0.0.1', pm.base_port + cam_id)) # 开启一个TCP服务器
def ftp_upload_connect(pm: ParamManager, img_path, ftp): # 创建ftp handle,并进入到指定的服务器目录
level_path, img_name = os.path.split(img_path)
shelf_path, level_id = os.path.split(level_path)
task_path, shelf_id = os.path.split(shelf_path)
device_path, task_id = os.path.split(task_path)
save_path, device_id = os.path.split(device_path)
retry_cnt = 0
connect_flag = True
while True:
try:
ftp.connect(pm.remote_host, pm.port)
my_print('Connect to FTP successfully!!!', 'TH_UP_INFO')
break
except Exception as e:
my_print(f'Fail to connect FTP!!!, {e}', 'TH_UP_ERROR')
pass
time.sleep(1)
retry_cnt += 1
if retry_cnt > 3:
connect_flag = False
break
if connect_flag:
try:
ftp.login(pm.username, pm.psw)
# print(str(datetime.now()) + '||[INFO]: ftp login dir', ftp.dir())
except Exception as e:
connect_flag = False
my_print(f'ftp login Fail, {e}', 'TH_UP_INFO')
pass
# try:
# ftp.set_pasv(False)
# my_print(f'set pasv [False]', 'TH_UP_INFO')
# except Exception as e:
# connect_flag = False
# my_print(f'set pasv Fail, {e}', 'TH_UP_ERROR')
# pass
try:
ftp.mkd(pm.remote_dir)
# print(str(datetime.now()) + '||[INFO]: mk video dir', ftp.dir())
except Exception as e:
pass
try:
ftp.cwd(pm.remote_dir)
# print(str(datetime.now()) + '||[INFO]: cd video dir ', ftp.dir())
except Exception as e:
connect_flag = False
my_print(f'cd remote_dir={pm.remote_dir} Fail, {e}', 'TH_UP_ERROR')
pass
try:
ftp.mkd(device_id)
# print(str(datetime.now()) + '||[INFO]: mk task dir', ftp.dir())
except Exception as e:
pass
try:
ftp.cwd(device_id)
# print(str(datetime.now()) + '||[INFO]: cd task dir', ftp.dir())
except Exception as e:
connect_flag = False
my_print(f'cd device_dir ={device_id} Fail, {e}', 'TH_UP_ERROR')
pass
try:
ftp.mkd(f'{task_id}')
# print(str(datetime.now()) + '||[INFO]: mk task dir', ftp.dir())
except Exception as e:
pass
try:
ftp.cwd(f'{task_id}')
# print(str(datetime.now()) + '||[INFO]: cd task dir', ftp.dir())
except Exception as e:
connect_flag = False
my_print(f'cd task_dir={task_id} Fail, {e}', 'TH_UP_ERROR')
pass
try:
ftp.mkd(f'{shelf_id}')
# print(str(datetime.now()) + '||[INFO]: mk task dir', ftp.dir())
except Exception as e:
pass
try:
ftp.cwd(f'{shelf_id}')
# print(str(datetime.now()) + '||[INFO]: cd task dir', ftp.dir())
except Exception as e:
connect_flag = False
my_print(f'cd shelf_dir={shelf_id} Fail, {e}', 'TH_UP_ERROR')
pass
try:
ftp.mkd(f'{level_id}')
# print(str(datetime.now()) + '||[INFO]: mk task dir', ftp.dir())
except Exception as e:
pass
try:
ftp.cwd(f'{level_id}')
# print(str(datetime.now()) + '||[INFO]: cd task dir', ftp.dir())
except Exception as e:
connect_flag = False
my_print(f'cd level_dir={level_id} Fail, {e}', 'TH_UP_ERROR')
pass
return connect_flag, ftp
def upload_img(pm: ParamManager, img_path, ftp: FTP): # 上传图片的子线程
my_print(f'uploading ==> {img_path}', 'TH_UP_IMG_INFO')
level_path, img_name = os.path.split(img_path)
# shelf_path, level_id = os.path.split(level_path)
i_name, i_task_id, i_shelf_id, i_level_id, i_cam_id, i_upload, i_suffix = img_name.split('.')
new_img_name = '.'.join([i_name, i_task_id, i_shelf_id, i_level_id, i_cam_id, '1', i_suffix])
new_img_path = f'{level_path}/{new_img_name}'
connect_flag, ftp = ftp_upload_connect(pm, img_path, ftp)
if connect_flag:
local_img_size = os.path.getsize(img_path)
if local_img_size <= 0:
os.remove(img_path)
cv2.imwrite(img_path, np.zeros((pm.default_height, pm.default_width, 3), dtype=np.uint8))
local_img_size = os.path.getsize(img_path)
buf_size = 4096
remote_filename = '.'.join([i_name, i_task_id, i_shelf_id, i_level_id, i_cam_id, '0', '0', i_suffix])
remote_filename_final = '.'.join([i_name, i_task_id, i_shelf_id, i_level_id, i_cam_id, '0', '1', i_suffix])
# remote_filename_list = [f'{i_time}.{i_device_id}.{i_task_id}.{i}.1.{i_suffix}' for i in range(3)]
try:
remote_img_size = ftp.size(remote_filename) # 尝试获取远程文件大小
if remote_img_size == 0:
try:
ftp.delete(remote_filename)
except Exception as e:
my_print(f'try to delete remote_filename={remote_filename} with 0 size, {e}', 'TH_UP_IMG_ERROR')
except Exception as e:
remote_img_size = 0
pass
my_print(f'local_size={local_img_size} remote_size={remote_img_size}', 'TH_UP_IMG_INFO')
if local_img_size != remote_img_size: # 服务器和本机同一文件大小不等,则视情况采取完整上传或续传
fp = open(img_path, 'rb')
if remote_img_size == 0:
try:
ftp.storbinary("STOR {}".format(remote_filename), fp, buf_size) # 完整上传
except Exception as e:
my_print(f'ftp storbinary Fail', 'TH_UP_IMG_ERROR')
pass
fp.close()
elif remote_img_size > local_img_size:
try:
ftp.delete(remote_filename)
my_print(f'remote file size > local size, has been deleted', 'TH_UP_IMG')
except Exception as e:
my_print(f'remote file size which > local size cannot be deleted, {e}', 'TH_UP_IMG_ERROR')
fp.close()
try:
ftp.quit()
time.sleep(0.01)
except Exception as e:
pass
return
elif remote_img_size < local_img_size:
fp.seek(remote_img_size)
datasock = ''
esize = ''
try:
ftp.voidcmd('TYPE I')
datasock, esize = ftp.ntransfercmd("STOR {}".format(remote_filename), remote_img_size) # 续传
except Exception as e:
my_print(f'ftp ntransfercmd Fail', 'TH_UP_IMG_ERROR')
try:
ftp.quit()
time.sleep(0.01)
except Exception as e:
my_print(f'ntransfercmd exception ftp quit Fail', 'TH_UP_IMG_ERROR')
pass
fp.close()
return
cmpsize = remote_img_size
while True: # 续传
buf = fp.read(buf_size * 1024)
if not len(buf):
my_print(f'no data [break]', 'TH_UP_IMG_INFO')
break
try:
datasock.sendall(buf)
except Exception as e:
my_print(f'data send Fail', 'TH_UP_IMG_ERROR')
break
cmpsize += len(buf)
if cmpsize == local_img_size:
my_print(f'file size equal [break]', 'TH_UP_IMG_INFO')
break
try:
datasock.close()
except Exception as e:
my_print(f'datasock close Fail', 'TH_UP_IMG_ERROR')
pass
try:
ftp.voidcmd('NOOP')
except Exception as e:
my_print(f'ftp NOOP Fail', 'TH_UP_IMG_ERROR')
pass
try:
ftp.voidresp()
except Exception as e:
my_print(f'ftp voidresp Fail', 'TH_UP_IMG_ERROR')
pass
fp.close()
else: # 两端文件大小一致,则修改文件名里的用于表示上传状态的占位符
os.rename(img_path, new_img_path)
my_print(f'rename {img_path} ==> {new_img_path} done', 'TH_UP_IMG_INFO')
try:
ftp.rename(remote_filename, remote_filename_final)
my_print(f'ftp rename {remote_filename} ==> {remote_filename_final} done', 'TH_UP_IMG_INFO')
rename_flag = True
except Exception as e:
my_print(f'ftp rename failed, {e}', 'TH_UP_IMG_ERROR')
os.rename(new_img_path, img_path)
my_print(f'rename back {new_img_path} ==> {img_path} done', 'TH_UP_IMG_INFO')
rename_flag = False
# 在上级文件夹后添加.1,表示全部上传完成
try:
if rename_flag:
done_flag = True
existed_files = ftp.nlst()
cnt = 0
for existed_file in existed_files:
if existed_file.endswith('.1.png'):
cnt += 1
if cnt != pm.image_num_per_level: # check all images per level have been uploaded fully
done_flag = False
if done_flag:
my_print(f'upload done, remote existed files={existed_files}, done_flag={done_flag}',
'TH_UP_IMG_INFO')
ftp.cwd('..')
existed_dirs = ftp.nlst()
flag1 = (f'{i_level_id}.0' not in existed_dirs)
flag2 = (f'{i_level_id}' in existed_dirs)
if flag1 and flag2:
ftp.sendcmd(f'RNFR {i_level_id}')
ftp.voidcmd(f'RNTO {i_level_id}.0')
my_print(f'ftp rename {level_path} ==> {level_path}.0 done', 'TH_UP_IMG_INFO')
else:
my_print(f'{level_path}.0 not in existed dirs is {flag1}', 'TH_UP_IMG_INFO')
my_print(f'{level_path} in existed dirs is {flag2}', 'TH_UP_IMG_INFO')
except Exception as e:
pass
my_print(f'{img_path} uploaded', 'TH_UP_IMG_INFO')
try:
ftp.quit()
time.sleep(0.01)
except Exception as e:
# my_print(f'ftp quit Fail', 'TH_UP_IMG_ERROR')
pass
def upload_image(pm: ParamManager): # 上传图片的总线程
try:
my_print(f'main_up is starting', 'TH_UP_INFO')
# pm.upload_max = 1
upload_queue = Queue(maxsize=pm.upload_max)
while True:
img_path_list = glob.glob(
f'{pm.device_path}/*/*/*/*.0.png') # 未上传的图片集合 task_id/device_id/task_id/shelf_if/level_id/*.0.png
if img_path_list:
# my_print(f'Found {len(img_path_list)} caped images ==> {img_path_list}', 'TH_UP_INFO')
my_print(f'Found {len(img_path_list)} caped images', 'TH_UP_INFO')
img_path_list.sort(key=lambda x: float(os.path.split(x)[-1].split('.')[0]), reverse=True)
if len(img_path_list) > pm.upload_max:
img_path_list = img_path_list[:pm.upload_max]
for img_path in img_path_list:
if not upload_queue.full():
tmp_th = Thread(target=upload_img, args=(pm, img_path, FTP(),), daemon=True) # 上传队列未满时
tmp_th.start()
upload_queue.put(tmp_th)
time.sleep(0.1)
else:
while True: # 上传队列满时,循环等待
tmp_th = upload_queue.get()
if tmp_th.is_alive():
upload_queue.put(tmp_th)
else:
tmp_th = Thread(target=upload_img, args=(pm, img_path, FTP(),), daemon=True)
tmp_th.start()
upload_queue.put(tmp_th)
break
time.sleep(0.1)
while not upload_queue.empty(): # 等待上传队列为空
tmp_th = upload_queue.get()
if tmp_th.is_alive():
upload_queue.put(tmp_th)
time.sleep(0.1)
else:
my_print(f'Found none caped img', 'TH_UP_INFO')
time.sleep(0.1)
except Exception as e:
my_print(f'upload_image error, {e}', 'TH_UP_ERROR')
# def send_cmd_to_cam(pm: ParamManager, cam_socket_dict):
# for i in range(pm.cam_id):
# msg = f'{i}||{pm.target_path}||{pm.device_id}||{SaveTool.task_id}||{i}'
# cam_socket_dict[i].send(msg.encode())
# my_print(f'cap msg={msg} send!')
def get_pid_by_port(port):
pid = None
res = subprocess.run(f'echo "sagetech" | sudo -S lsof -i:{port}', shell=True, capture_output=True, encoding='utf-8')
my_print(f'search port={port} \n==>\n{res.stdout}')
if res.stdout != '':
pid = res.stdout.split('\n')[1].split(' ')[1]
return pid
def kill_pid(pid):
res = subprocess.run(f'echo "sagetech" | sudo -S kill -9 {pid}', shell=True, capture_output=True, encoding='utf-8')
def release_port(existed_port):
pid = get_pid_by_port(existed_port)
my_print(f'existed_port={existed_port} belong to process=[{pid}]')
if pid is not None:
kill_pid(pid)
my_print(f'kill pid={pid}')
time.sleep(0.1)
def post_res(pm: ParamManager, task_id_shm):
while True:
if task_id_shm.value != '19700101000000':
current_task_dir = task_id_shm.value
my_print(f'current task dir={current_task_dir}', 'POST_RES_INFO')
post_path_list = glob.glob(f'{pm.device_path}/{current_task_dir}/*/*/.post')
if post_path_list:
for post_path in post_path_list:
level_path, _ = os.path.split(post_path)
if glob.glob(f'{level_path}/*.*.png'):
shelf_path, level_id = os.path.split(level_path)
task_path, shelf_id = os.path.split(shelf_path)
_, task_id = os.path.split(task_path)
try:
response = requests.post(f'http://{pm.remote_host}:8773/query',
headers={'content-type': 'application/json'},
data=json.dumps({
'device_id': pm.device_id,
'task_id': task_id,
'shelf_id': shelf_id,
'level_id': level_id
}),
timeout=2.)
if response.status_code == 200:
# my_print(f'query ocr res of {level_path} success')
res_content = json.loads(response.content)
if res_content['flag'] == 1:
post_data = res_content['data']
try:
response = requests.post(pm.ocr_post_address,
headers={'content-type': 'application/json'},
data=json.dumps(post_data),
timeout=2.)
try:
if response.status_code == 200:
my_print(
f'ocr res of {level_path} post success, response content = {response.content}')
os.rename(post_path, os.path.join(level_path, '.posted'))
my_print(
f'rename {post_path} ==> {os.path.join(level_path, ".posted")}')
else:
my_print(
f'response status code != 200, ocr res of {level_path} post failed',
'ERROR')
except Exception as e:
my_print(f'process response, {e}', 'ERROR')
except Exception as e:
my_print(f'ocr res of {level_path} is prepared, but post failed, {e}', 'ERROR')
# else:
# my_print(f'ocr res of {level_path} is not prepared')
else:
my_print(f'response status code != 200, query ocr res failed', 'ERROR')
except Exception as e:
my_print(f'query ocr res failed, {e}', 'ERROR')
time.sleep(5)
def check_is_over(device_path, task_id, level_dict):
for k, v in level_dict.items():
levels = []
for v_level in v:
for level_id in v_level:
if level_id not in levels:
levels.append(level_id)
for level_i in levels:
level_path = f'{device_path}/{task_id}/{k}/{level_i}'
if os.path.exists(level_path):
if glob.glob(f'{level_path}/*.png'):
if not os.path.exists(f'{level_path}/.posted'):
return False
return True
def upload_task(pm: ParamManager, upload_task_q: Queue):
notified = []
while True:
while not upload_task_q.empty():
task_id, level_dict = upload_task_q.get()
if task_id not in notified:
if check_is_over(pm.device_path, task_id, level_dict):
try:
response = requests.get(f'{pm.ocr_post_address}/over', timeout=1.)
my_print(f'response.status_code={response.status_code}')
if response.status_code == 200:
my_print(f'response.content={response.content}')
notified.append(task_id)
except Exception as e:
my_print(f'ocr res of {task_id} is over, but notification failed', 'ERROR')
upload_task_q.put((task_id, level_dict))
time.sleep(5)
time.sleep(1)
def write_end(device_path, timestamp, shelf_dict: defaultdict):
if timestamp is None or timestamp == '19700101000000':
return
if shelf_dict:
for shelf_id, level_list in shelf_dict.items():
if shelf_id is not None and shelf_id != 'None':
for level_id in level_list:
if not level_id:
continue
try:
l_id = int(level_id)
except Exception as e:
continue
if l_id > 0:
level_path = f'{device_path}/{timestamp}/{shelf_id}/{level_id}'
if not glob.glob(f'{level_path}/*post*'):
with open(f'{level_path}/.post', 'wt') as f:
f.write('0')
if os.path.exists(level_path):
if not os.path.exists(f'{level_path}/.end'):
try:
with open(f'{level_path}/.end', 'wt') as f:
f.write('1')
except Exception as e:
pass