Difficult-Rocket/test_threading.py

60 lines
1.2 KiB
Python
Raw Normal View History

2021-01-15 21:22:34 +08:00
import threading
import time
import os
2021-01-16 22:21:41 +08:00
import sys
2021-01-15 21:22:34 +08:00
import random
2021-01-16 22:21:41 +08:00
2021-01-15 21:22:34 +08:00
class Aclass(threading.Thread):
def __init__(self, dev, ID):
threading.Thread.__init__(self)
self.running = True
self.value = 0
self.dev = dev
self.id = ID
print(self.id)
2021-01-16 22:21:41 +08:00
2021-01-15 21:22:34 +08:00
def run(self):
while self.running:
while self.dev.using == True:
continue
self.dev.using = True
self.running = self.dev.running[self.id]
self.dev.using = False
print(self.id, os.getpid(), os.getppid(), self.running)
time.sleep(1)
2021-01-16 22:21:41 +08:00
def stop(self):
sys.exit()
2021-01-15 21:22:34 +08:00
class dev():
def __init__(self):
self.using = False
self.gets = {'a': False, 'b': False}
self.running = {'a': True, 'b': True}
2021-01-16 22:21:41 +08:00
2021-01-15 21:22:34 +08:00
Dev = dev()
A = Aclass(Dev, 'a')
time.sleep(random.random())
B = Aclass(Dev, 'b')
A.start()
B.start()
print("hmmm")
2021-01-16 22:21:41 +08:00
try:
while True:
In = input()
if In == "stop":
Dev.running['a'] = False
Dev.running['b'] = False
break
try:
eval(In)
except:
print(EnvironmentError)
except KeyboardInterrupt:
A.stop()
B.stop()