from glob import glob import os from threading import Thread from time import sleep from smbus import SMBus from time import sleep import sys import socket import time from configparser import ConfigParser """ scp tank.py root@tank-465ce7: && ssh root@tank-465ce7 "python3 tank.py" """ os.chdir("/root/") config = ConfigParser() config.read("config.ini") enemy_bytes = config.get('tank','enemy_bytes').split(",") robot_id = hex(int(config.get('tank','robot_id'),10)) hit_delay = int(config.get('tank','hit_delay'),10) hit_delay = int(config.get('tank','hit_delay'),10) #robot_id = socket.gethostname().split("-")[-1] robot_shot_time = 0 robot_hit_timer = 0 bus = SMBus(0) hit = False #os.system("ifconfig wlan0 mtu 2304") GSTREAM = """gst-launch-1.0 -v v4l2src device=%(cam)s ! capsfilter caps="video/x-h264, width=%(width)d, height=%(height)s, framerate=20/1" ! rtph264pay config-interval=10 pt=96 ! udpsink host=%(remote)s port=5000 &""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP sock.settimeout(0.2) sock.bind(("", 37020)) print("Receiver started") #sockaddr = "192.168.2.255", 55555 #sock.sendto(b"hello", sockaddr) def filter_scale(axis): tresh = 3000 if abs(axis) < tresh: return 0 if axis > 0: return (axis-tresh) * 255 / (32768-tresh) else: return (axis+tresh) * 255 / (32768-tresh) class MotorDriver(object): MotorSpeedSet = 0x82 PWMFrequenceSet = 0x84 DirectionSet = 0xaa MotorSetA = 0xa1 MotorSetB = 0xa5 Nothing = 0x01 EnableStepper = 0x1a UnenableStepper = 0x1b Stepernu = 0x1c I2CMotorDriverAdd = 0x0f #Set the address of the I2CMotorDriver def __init__(self, bus, address=0x0f): self.I2CMotorDriverAdd=address #Maps speed from 0-100 to 0-255 def map_vals(self,value, leftMin, leftMax, rightMin, rightMax): #http://stackoverflow.com/questions/1969240/mapping-a-range-of-values-to-another # Figure out how 'wide' each range is leftSpan = leftMax - leftMin rightSpan = rightMax - rightMin # Convert the left range into a 0-1 range (float) valueScaled = float(value - leftMin) / float(leftSpan) # Convert the 0-1 range into a value in the right range. return int(rightMin + (valueScaled * rightSpan)) #Set motor speed def MotorSpeedSetAB(self,MotorSpeedA,MotorSpeedB): #MotorSpeedA=self.map_vals(MotorSpeedA,0,32767,0,255) #MotorSpeedB=self.map_vals(MotorSpeedB,0,32767,0,255) bus.write_i2c_block_data(self.I2CMotorDriverAdd, self.MotorSpeedSet, [MotorSpeedA,MotorSpeedB]) sleep(.02) #Set motor direction def MotorDirectionSet(self,Direction): bus.write_i2c_block_data(self.I2CMotorDriverAdd, self.DirectionSet, [Direction,0]) sleep(.02) bus = SMBus(0) m = MotorDriver(bus) oldDirection = None old_speed = 0, 0 addr = None class irReciver(Thread): def run(self): global hit, addr global robot_hit_timer while True: try: bytes=bus.read_i2c_block_data(0x10,0x02,5) byte = hex(bytes[1] << 24 | bytes[2] << 16 | bytes[3] << 8 | bytes[4]) if byte != "0x0": print(byte) #print(byte) # if code belong to enemy then hit if byte in enemy_bytes and byte !=robot_id and not hit: print("Cabuum") hit = True robot_hit_timer = time.time() bus.write_byte_data(0x10,0x06,0x02) except Exception as e: print(e) sleep(0.2) t = irReciver(daemon=True) t.start() while True: try: data, addr = sock.recvfrom(100) except OSError: m.MotorSpeedSetAB(0,0) continue buf = data.decode("ascii").split(":") cmd = buf[0] if hit: sock.sendto("hit".encode("ascii"),addr) m.MotorDirectionSet(0b0110) hit = False if time.time() - robot_hit_timer < 5: m.MotorDirectionSet(0b0110) m.MotorSpeedSetAB(250,250) continue else: #end led flashing bus.write_byte_data(0x10,0x06,0x01) if cmd == "play": remote, port = addr width, height = int(buf[1]), int(buf[2]) os.system("killall gst-launch-1.0") cam = sorted(glob("/dev/video*"))[-1] cmd = GSTREAM % locals() print("Launching:", cmd) os.system(cmd) elif cmd == "shoot": if time.time() - robot_shot_time > 1: #bus.write_byte_data(0x20,0x01,socket.gethostname().split("-")[-1]) bus.write_byte_data(0x20,0x01,int(robot_id,16)) robot_shot_time = time.time() elif cmd == "speed": forward_backward, left_right = int(buf[1]), int(buf[2]) axis1 = -filter_scale(forward_backward) axis2 = filter_scale(left_right) m1 = axis1 - axis2 m2 = axis1 + axis2 mx = max(abs(m1), abs(m2)) if mx > 255: m1, m2 = m1 * 255 / mx , m2 * 255 / mx d = (0b1000 if m2 < 0 else 0b0100) | (0b10 if m1 < 0 else 0b01) #print("axis1 %03d axis2 %03d m1 %03d m2 %03d d %s" % ( axis1, axis2, m1, m2, bin(d))) if d != oldDirection: oldDirection = d m.MotorSpeedSetAB(0,0) m.MotorDirectionSet(d) m1 = abs(m1) m2 = abs(m2) if (m1, m2) != old_speed: old_speed = m1, m2 m.MotorSpeedSetAB(int(m1),int(m2))