rc-ir-tank/robot.py

192 lines
5.7 KiB
Python
Raw Permalink Normal View History

2019-04-07 21:58:35 +00:00
from glob import glob
import os
from threading import Thread
from time import sleep
from smbus import SMBus
from time import sleep
import sys
import socket
import time
from configparser import ConfigParser
"""
scp tank.py root@tank-465ce7: && ssh root@tank-465ce7 "python3 tank.py"
"""
os.chdir("/root/")
config = ConfigParser()
config.read("config.ini")
enemy_bytes = config.get('tank','enemy_bytes').split(",")
robot_id = hex(int(config.get('tank','robot_id'),10))
hit_delay = int(config.get('tank','hit_delay'),10)
hit_delay = int(config.get('tank','hit_delay'),10)
#robot_id = socket.gethostname().split("-")[-1]
robot_shot_time = 0
robot_hit_timer = 0
bus = SMBus(0)
hit = False
#os.system("ifconfig wlan0 mtu 2304")
GSTREAM = """gst-launch-1.0 -v v4l2src device=%(cam)s ! capsfilter caps="video/x-h264, width=%(width)d, height=%(height)s, framerate=20/1" ! rtph264pay config-interval=10 pt=96 ! udpsink host=%(remote)s port=5000 &"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
sock.settimeout(0.2)
sock.bind(("", 37020))
print("Receiver started")
#sockaddr = "192.168.2.255", 55555
#sock.sendto(b"hello", sockaddr)
def filter_scale(axis):
tresh = 3000
if abs(axis) < tresh:
return 0
if axis > 0:
return (axis-tresh) * 255 / (32768-tresh)
else:
return (axis+tresh) * 255 / (32768-tresh)
class MotorDriver(object):
MotorSpeedSet = 0x82
PWMFrequenceSet = 0x84
DirectionSet = 0xaa
MotorSetA = 0xa1
MotorSetB = 0xa5
Nothing = 0x01
EnableStepper = 0x1a
UnenableStepper = 0x1b
Stepernu = 0x1c
I2CMotorDriverAdd = 0x0f #Set the address of the I2CMotorDriver
def __init__(self, bus, address=0x0f):
self.I2CMotorDriverAdd=address
#Maps speed from 0-100 to 0-255
def map_vals(self,value, leftMin, leftMax, rightMin, rightMax):
#http://stackoverflow.com/questions/1969240/mapping-a-range-of-values-to-another
# Figure out how 'wide' each range is
leftSpan = leftMax - leftMin
rightSpan = rightMax - rightMin
# Convert the left range into a 0-1 range (float)
valueScaled = float(value - leftMin) / float(leftSpan)
# Convert the 0-1 range into a value in the right range.
return int(rightMin + (valueScaled * rightSpan))
#Set motor speed
def MotorSpeedSetAB(self,MotorSpeedA,MotorSpeedB):
#MotorSpeedA=self.map_vals(MotorSpeedA,0,32767,0,255)
#MotorSpeedB=self.map_vals(MotorSpeedB,0,32767,0,255)
bus.write_i2c_block_data(self.I2CMotorDriverAdd, self.MotorSpeedSet, [MotorSpeedA,MotorSpeedB])
sleep(.02)
#Set motor direction
def MotorDirectionSet(self,Direction):
bus.write_i2c_block_data(self.I2CMotorDriverAdd, self.DirectionSet, [Direction,0])
sleep(.02)
bus = SMBus(0)
m = MotorDriver(bus)
oldDirection = None
old_speed = 0, 0
addr = None
class irReciver(Thread):
def run(self):
global hit, addr
global robot_hit_timer
while True:
try:
bytes=bus.read_i2c_block_data(0x10,0x02,5)
byte = hex(bytes[1] << 24 | bytes[2] << 16 | bytes[3] << 8 | bytes[4])
if byte != "0x0":
print(byte)
#print(byte)
# if code belong to enemy then hit
if byte in enemy_bytes and byte !=robot_id and not hit:
print("Cabuum")
hit = True
robot_hit_timer = time.time()
bus.write_byte_data(0x10,0x06,0x02)
except Exception as e:
print(e)
sleep(0.2)
t = irReciver(daemon=True)
t.start()
while True:
try:
data, addr = sock.recvfrom(100)
except OSError:
m.MotorSpeedSetAB(0,0)
continue
buf = data.decode("ascii").split(":")
cmd = buf[0]
if hit:
sock.sendto("hit".encode("ascii"),addr)
m.MotorDirectionSet(0b0110)
hit = False
if time.time() - robot_hit_timer < 5:
m.MotorDirectionSet(0b0110)
m.MotorSpeedSetAB(250,250)
continue
else:
#end led flashing
bus.write_byte_data(0x10,0x06,0x01)
if cmd == "play":
remote, port = addr
width, height = int(buf[1]), int(buf[2])
os.system("killall gst-launch-1.0")
cam = sorted(glob("/dev/video*"))[-1]
cmd = GSTREAM % locals()
print("Launching:", cmd)
os.system(cmd)
elif cmd == "shoot":
if time.time() - robot_shot_time > 1:
#bus.write_byte_data(0x20,0x01,socket.gethostname().split("-")[-1])
bus.write_byte_data(0x20,0x01,int(robot_id,16))
robot_shot_time = time.time()
elif cmd == "speed":
forward_backward, left_right = int(buf[1]), int(buf[2])
axis1 = -filter_scale(forward_backward)
axis2 = filter_scale(left_right)
m1 = axis1 - axis2
m2 = axis1 + axis2
mx = max(abs(m1), abs(m2))
if mx > 255:
m1, m2 = m1 * 255 / mx , m2 * 255 / mx
d = (0b1000 if m2 < 0 else 0b0100) | (0b10 if m1 < 0 else 0b01)
#print("axis1 %03d axis2 %03d m1 %03d m2 %03d d %s" % ( axis1, axis2, m1, m2, bin(d)))
if d != oldDirection:
oldDirection = d
m.MotorSpeedSetAB(0,0)
m.MotorDirectionSet(d)
m1 = abs(m1)
m2 = abs(m2)
if (m1, m2) != old_speed:
old_speed = m1, m2
m.MotorSpeedSetAB(int(m1),int(m2))