192 lines
5.7 KiB
Python
192 lines
5.7 KiB
Python
|
from glob import glob
|
||
|
import os
|
||
|
from threading import Thread
|
||
|
from time import sleep
|
||
|
from smbus import SMBus
|
||
|
from time import sleep
|
||
|
import sys
|
||
|
import socket
|
||
|
import time
|
||
|
from configparser import ConfigParser
|
||
|
"""
|
||
|
scp tank.py root@tank-465ce7: && ssh root@tank-465ce7 "python3 tank.py"
|
||
|
"""
|
||
|
|
||
|
os.chdir("/root/")
|
||
|
|
||
|
config = ConfigParser()
|
||
|
config.read("config.ini")
|
||
|
enemy_bytes = config.get('tank','enemy_bytes').split(",")
|
||
|
robot_id = hex(int(config.get('tank','robot_id'),10))
|
||
|
hit_delay = int(config.get('tank','hit_delay'),10)
|
||
|
hit_delay = int(config.get('tank','hit_delay'),10)
|
||
|
|
||
|
#robot_id = socket.gethostname().split("-")[-1]
|
||
|
robot_shot_time = 0
|
||
|
robot_hit_timer = 0
|
||
|
|
||
|
bus = SMBus(0)
|
||
|
hit = False
|
||
|
|
||
|
|
||
|
#os.system("ifconfig wlan0 mtu 2304")
|
||
|
|
||
|
GSTREAM = """gst-launch-1.0 -v v4l2src device=%(cam)s ! capsfilter caps="video/x-h264, width=%(width)d, height=%(height)s, framerate=20/1" ! rtph264pay config-interval=10 pt=96 ! udpsink host=%(remote)s port=5000 &"""
|
||
|
|
||
|
|
||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
|
||
|
sock.settimeout(0.2)
|
||
|
sock.bind(("", 37020))
|
||
|
print("Receiver started")
|
||
|
|
||
|
#sockaddr = "192.168.2.255", 55555
|
||
|
#sock.sendto(b"hello", sockaddr)
|
||
|
|
||
|
def filter_scale(axis):
|
||
|
tresh = 3000
|
||
|
if abs(axis) < tresh:
|
||
|
return 0
|
||
|
if axis > 0:
|
||
|
return (axis-tresh) * 255 / (32768-tresh)
|
||
|
else:
|
||
|
return (axis+tresh) * 255 / (32768-tresh)
|
||
|
|
||
|
class MotorDriver(object):
|
||
|
|
||
|
MotorSpeedSet = 0x82
|
||
|
PWMFrequenceSet = 0x84
|
||
|
DirectionSet = 0xaa
|
||
|
MotorSetA = 0xa1
|
||
|
MotorSetB = 0xa5
|
||
|
Nothing = 0x01
|
||
|
EnableStepper = 0x1a
|
||
|
UnenableStepper = 0x1b
|
||
|
Stepernu = 0x1c
|
||
|
I2CMotorDriverAdd = 0x0f #Set the address of the I2CMotorDriver
|
||
|
|
||
|
def __init__(self, bus, address=0x0f):
|
||
|
self.I2CMotorDriverAdd=address
|
||
|
|
||
|
#Maps speed from 0-100 to 0-255
|
||
|
def map_vals(self,value, leftMin, leftMax, rightMin, rightMax):
|
||
|
#http://stackoverflow.com/questions/1969240/mapping-a-range-of-values-to-another
|
||
|
# Figure out how 'wide' each range is
|
||
|
leftSpan = leftMax - leftMin
|
||
|
rightSpan = rightMax - rightMin
|
||
|
|
||
|
# Convert the left range into a 0-1 range (float)
|
||
|
valueScaled = float(value - leftMin) / float(leftSpan)
|
||
|
|
||
|
# Convert the 0-1 range into a value in the right range.
|
||
|
return int(rightMin + (valueScaled * rightSpan))
|
||
|
|
||
|
#Set motor speed
|
||
|
def MotorSpeedSetAB(self,MotorSpeedA,MotorSpeedB):
|
||
|
#MotorSpeedA=self.map_vals(MotorSpeedA,0,32767,0,255)
|
||
|
#MotorSpeedB=self.map_vals(MotorSpeedB,0,32767,0,255)
|
||
|
bus.write_i2c_block_data(self.I2CMotorDriverAdd, self.MotorSpeedSet, [MotorSpeedA,MotorSpeedB])
|
||
|
sleep(.02)
|
||
|
|
||
|
#Set motor direction
|
||
|
def MotorDirectionSet(self,Direction):
|
||
|
bus.write_i2c_block_data(self.I2CMotorDriverAdd, self.DirectionSet, [Direction,0])
|
||
|
sleep(.02)
|
||
|
|
||
|
bus = SMBus(0)
|
||
|
m = MotorDriver(bus)
|
||
|
oldDirection = None
|
||
|
old_speed = 0, 0
|
||
|
addr = None
|
||
|
|
||
|
class irReciver(Thread):
|
||
|
def run(self):
|
||
|
global hit, addr
|
||
|
global robot_hit_timer
|
||
|
while True:
|
||
|
try:
|
||
|
bytes=bus.read_i2c_block_data(0x10,0x02,5)
|
||
|
byte = hex(bytes[1] << 24 | bytes[2] << 16 | bytes[3] << 8 | bytes[4])
|
||
|
if byte != "0x0":
|
||
|
print(byte)
|
||
|
#print(byte)
|
||
|
# if code belong to enemy then hit
|
||
|
if byte in enemy_bytes and byte !=robot_id and not hit:
|
||
|
print("Cabuum")
|
||
|
hit = True
|
||
|
robot_hit_timer = time.time()
|
||
|
bus.write_byte_data(0x10,0x06,0x02)
|
||
|
except Exception as e:
|
||
|
print(e)
|
||
|
sleep(0.2)
|
||
|
|
||
|
|
||
|
|
||
|
t = irReciver(daemon=True)
|
||
|
t.start()
|
||
|
|
||
|
while True:
|
||
|
try:
|
||
|
data, addr = sock.recvfrom(100)
|
||
|
except OSError:
|
||
|
m.MotorSpeedSetAB(0,0)
|
||
|
continue
|
||
|
buf = data.decode("ascii").split(":")
|
||
|
|
||
|
cmd = buf[0]
|
||
|
|
||
|
if hit:
|
||
|
sock.sendto("hit".encode("ascii"),addr)
|
||
|
m.MotorDirectionSet(0b0110)
|
||
|
hit = False
|
||
|
|
||
|
if time.time() - robot_hit_timer < 5:
|
||
|
m.MotorDirectionSet(0b0110)
|
||
|
m.MotorSpeedSetAB(250,250)
|
||
|
continue
|
||
|
else:
|
||
|
#end led flashing
|
||
|
bus.write_byte_data(0x10,0x06,0x01)
|
||
|
|
||
|
|
||
|
|
||
|
if cmd == "play":
|
||
|
remote, port = addr
|
||
|
width, height = int(buf[1]), int(buf[2])
|
||
|
|
||
|
|
||
|
os.system("killall gst-launch-1.0")
|
||
|
cam = sorted(glob("/dev/video*"))[-1]
|
||
|
cmd = GSTREAM % locals()
|
||
|
print("Launching:", cmd)
|
||
|
os.system(cmd)
|
||
|
elif cmd == "shoot":
|
||
|
if time.time() - robot_shot_time > 1:
|
||
|
#bus.write_byte_data(0x20,0x01,socket.gethostname().split("-")[-1])
|
||
|
bus.write_byte_data(0x20,0x01,int(robot_id,16))
|
||
|
robot_shot_time = time.time()
|
||
|
elif cmd == "speed":
|
||
|
forward_backward, left_right = int(buf[1]), int(buf[2])
|
||
|
axis1 = -filter_scale(forward_backward)
|
||
|
axis2 = filter_scale(left_right)
|
||
|
m1 = axis1 - axis2
|
||
|
m2 = axis1 + axis2
|
||
|
|
||
|
mx = max(abs(m1), abs(m2))
|
||
|
if mx > 255:
|
||
|
m1, m2 = m1 * 255 / mx , m2 * 255 / mx
|
||
|
|
||
|
d = (0b1000 if m2 < 0 else 0b0100) | (0b10 if m1 < 0 else 0b01)
|
||
|
#print("axis1 %03d axis2 %03d m1 %03d m2 %03d d %s" % ( axis1, axis2, m1, m2, bin(d)))
|
||
|
|
||
|
if d != oldDirection:
|
||
|
oldDirection = d
|
||
|
m.MotorSpeedSetAB(0,0)
|
||
|
m.MotorDirectionSet(d)
|
||
|
|
||
|
m1 = abs(m1)
|
||
|
m2 = abs(m2)
|
||
|
|
||
|
if (m1, m2) != old_speed:
|
||
|
old_speed = m1, m2
|
||
|
m.MotorSpeedSetAB(int(m1),int(m2))
|