ERROR STATUSをどのように成否判定に使うべきかが判然としないので無視。
#!/usr/bin/python3 # -*- coding: utf-8 -*- # # pyB3M.py # SPDX-License-Identifier: MIT # SPDX-FileCopyrightText: (C) 2024 mukyokyo import serial, threading, array, struct from typing import Union from collections import namedtuple from struct import pack, unpack, iter_unpack from typing import overload ########################################################## # Functionalized the part of converting int to bytes. # If specified as a tuple, it is converted to bytes at once. ########################################################## def B2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d) else: return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d)) def W2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]]) else: return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d)) def L2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]]) else: return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d)) ########################################################## # API for Kondo B3M ########################################################## class B3MProtocol: BROADCASTING_ID = 0xff OPT_LOAD = 1 OPT_SAVE = 2 OPT_READ = 3 OPT_WRITE = 4 OPT_RESET = 5 OPT_POSITION = 6 (SYSW_BAUD_57600, SYSW_BAUD_115_2k, SYSW_BAUD_625k, SYSW_BAUD_1M, SYSW_BAUD_1_25M, SYSW_BAUD_1_5M, SYSW_BAUD_2M, SYSW_BAUD_3M) = range(8) (SYSW_PARITY_NONE, SYSW_PARITY_ODD, SYSW_PARITY_EVEN) = range(3) (MOTW_OPT_NONE, MOTW_OPT_TORQUEON, MOTW_OPT_FREE, _, MOTW_OPT_BRAKE, _, _, _, MOTW_OPT_HOLD) = range(9) def __init__(self, port : Union[serial.Serial, str], baudrate = 57600, timeout = 0.05, lock = None): if isinstance(port, serial.Serial): self.__serial = port self.__baudrate = port.baudrate self.__timeout = port.timeout else: self.__serial = serial.Serial(port, baudrate = baudrate, timeout = timeout) self.__baudrate = self.__serial.baudrate self.__timeout = self.__serial.timeout if lock == None: self.__lock = threading.Lock() else: self.__lock = lock self.__status = 0 def __del__(self): pass @property def lock(self): return self.__lock @property def baudrate(self): return self.__serial.baudrate @baudrate.setter def baudrate(self, baudrate): self.__baudrate = baudrate self.__serial.baudrate = baudrate @property def timeout(self): return self.__serial.timeout @timeout.setter def timeout(self, timeout): self.__timeout = timeout self.__serial.timeout = timeout def __reconfig(self): self.__serial.baudrate = self.__baudrate self.__serial.timeout = self.__timeout @property def status(self): return self.__status def TxPacket(self, cmd : int, opt : int, param : bytes, echo = False) -> (bytes, bool): self.__reconfig() if cmd >= 0 and cmd <= 0xff and opt >= 0 and opt <= 0xff and len(param) <= (256 - 5): txp = bytes((len(param) + 4, cmd, opt)) + bytes(param) txp += B2Bs(sum(txp) & 0xff) self.__serial.reset_input_buffer() if echo: print('TX:', txp.hex(':')) self.__serial.write(txp) return txp, True return None, False def __rx(self, length) -> bytes: s = self.__serial.read(length) l = len(s) if l == length: return s else: r = s length -= l while self.__serial.in_waiting > 0: s = self.__serial.read(length) r += s length -= len(s) if length <= 0: break return r def RxPacket(self, echo = False) -> (bytes, bool): rxp = self.__rx(1) if rxp: if len(rxp) == 1: if rxp[0] >= 5: l = rxp[0] - 1 rxp += self.__rx(l) if rxp[1] >= 0x81 and rxp[1] <= 0x86 and rxp[3] <= 0xfe and rxp[-1] == (sum(rxp[:-1]) & 0xff): self.__status = rxp[2] if echo: print('RX:', rxp.hex(':')) return bytes(rxp), True if echo: print('RX;', rxp.hex(';'), ' xxx') return None, False def WRITE(self, id : int, addr : int, data : bytes, reset = False, echo = False) -> bool: with self.__lock: if id >= 0 and id <= 255 and addr >= 0 and addr <= 0xff and len(data) <= 256 - 7: if self.TxPacket(self.OPT_WRITE, 0x80 if reset else 0x00, B2Bs(id) + data + B2Bs(addr) + B2Bs(1), echo)[1]: if id != self.BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return (d[3] == id) and (d[1] == (self.OPT_WRITE | 0x80)) else: return True return False def WRITEs(self, addr : int, iddatas : Union[list, tuple], reset = False, echo = False) -> bool: with self.__lock: if addr >= 0 and addr <= 0xff and (isinstance(iddatas, list) or isinstance(iddatas, tuple)): cnt = len(iddatas) if cnt > 0: txd = bytes(0) l = len(iddatas[0]) for iddata in iddatas: if iddata[0] < 0 or iddata[0] > 254 or len(iddata) != l: return False txd += iddata txd += B2Bs(addr) + B2Bs(cnt) return self.TxPacket(self.OPT_WRITE, 0x80 if reset else 0x00, txd, echo)[1] return False def WRITE8(self, id : int, addr : int, data : Union[int, list, tuple], reset = False, echo = False) -> bool: return self.WRITE(id, addr, B2Bs(data), reset, echo) def WRITE16(self, id : int, addr : int, data : Union[int, list, tuple], reset = False, echo = False) -> bool: return self.WRITE(id, addr, W2Bs(data), reset, echo) def WRITE32(self, id : int, addr : int, data : Union[int, list, tuple], reset = False, echo = False) -> bool: return self.WRITE(id, addr, L2Bs(data), reset, echo) def READ(self, id : int, addr : int, length : int, reset = False, echo = False) -> bytes: with self.__lock: if addr >= 0 and id <= 254 and addr >= 0 and addr <= 0xff and length >= 1 and length <= 250: if self.TxPacket(self.OPT_READ, 0x80 if reset else 0x00, B2Bs(id) + B2Bs(addr) + B2Bs(length), echo)[1]: d, r = self.RxPacket(echo) if r: if d[1] == (self.OPT_READ | 0x80) and id == d[3] and len(d) == length + 5: return bytes(d[4:-1]) return None def READ8(self, id : int, addr : int, length = 1, signed = False, reset = False, echo = False) -> int: r = self.READ(id, addr, length, reset, echo) if r != None: n = sum(iter_unpack('b' if signed else 'B', r), ()) return n if length > 1 else n[0] return None def READ16(self, id : int, addr : int, length = 1, signed = False, reset = False, echo = False) -> int: r = self.READ(id, addr, 2 * length, reset, echo) if r != None: n = sum(iter_unpack('h' if signed else 'H', r), ()) return n if length > 1 else n[0] return None def READ32(self, id : int, addr : int, length = 1, signed = False, reset = False, echo = False) -> int: r = self.READ(id, addr, 4 * length, reset, echo) if r != None: n = sum(iter_unpack('i' if signed else 'I', r), ()) return n if length > 1 else n[0] return None def LOAD(self, id : int, echo = False) -> bool: with self.__lock: if self.TxPacket(self.OPT_LOAD, 0x0, B2Bs(id), echo)[1]: if id != self.BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return id == d[3] and d[1] == (self.OPT_LOAD | 0x80) and len(d) == 5 else: return True; return False def SAVE(self, id : int, echo = False) -> bool: with self.__lock: if self.TxPacket(self.OPT_SAVE, 0x0, B2Bs(id), echo)[1]: if id != self.BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return id == d[3] and d[1] == (self.OPT_SAVE| 0x80) and len(d) == 5 else: return True; return False def POSITION(self, id : int, pos : int, tim : int, reset = False, echo = False) -> int: with self.__lock: if pos >= -32000 and pos <= 32000 and tim >= 0 and tim <= 65535: if self.TxPacket(self.OPT_POSITION, 0x80 if reset else 0x00, B2Bs(id) + W2Bs(pos) + W2Bs(tim), echo): if id != self.BROADCASTING_ID: dat, r = self.RxPacket(echo) if r: if dat[3] == id and len(dat) == 7: return tuple(n[0] for n in iter_unpack('<h', dat[4:6]))[0] else: return None else: return None return None def POSITIONs(self, idposs : Union[list, tuple], tim : int, reset = False, echo = False) -> bool: with self.__lock: if tim >= 0 and tim <= 65535: cnt = len(idposs) if cnt > 0: txd = bytes(0) l = len(idposs[0]) for idpos in idposs: if len(idpos) != 2: return False if idpos[0] < 0 or idpos[0] > 254 or len(idpos) != l or idpos[1] < -32000 or idpos[1] > 32000: return False txd += B2Bs(idpos[0]) + W2Bs(idpos[1]) txd += W2Bs(tim) return self.TxPacket(self.OPT_POSITION, 0x80 if reset else 0x00, txd, echo)[1] return False def RESET(self, id : int, tim : int, echo = False) -> bool: with self.__lock: if ((id >= 0 and id <= 255) or id == self.BROADCASTING_ID) and tim >= 0 and tim <= 255: if self.TxPacket(self.OPT_RESET, 0x0, B2Bs(id) + B2Bs(tim), echo)[1]: return True return False def Ping(self, id : int, echo = False) -> bool: if id >= 0 and id <= 254: for i in range(3): r = self.READ8(id, 0, 1, False, echo) if r != None: return True return False ########################################################## # test code ########################################################## if __name__ == "__main__": import time from pyB3M import * try: b3m = B3MProtocol('/dev/ttyAMA0', 1000000, 1) except: pass else: ID = 0 ec = True # r/w test print('reset=',b3m.RESET(ID, 0, echo = ec)) time.sleep(0.5) print('load=',b3m.LOAD(ID, echo = ec)) print('save=',b3m.SAVE(ID, echo = ec)) print('read 5~9=',b3m.READ(ID, 0, 5, echo = ec).hex(',')) print('read32 0=',b3m.READ32(ID, 0, echo = ec)) print('read16 5=',b3m.READ16(ID, 5, echo = ec)) print('read16 7=',b3m.READ16(ID, 7, echo = ec)) print('read16 0x50=',b3m.READ16(ID, 0x50, echo = ec)) # r/w pid gain gain = list(b3m.READ16(ID, 0x5e, 3, echo = ec)) print('pid r=',gain) print('pid w=',b3m.WRITE16(ID, 0x5e, gain, echo = ec)) print('pid r=',b3m.READ16(ID, 0x5e, 3, echo = ec)) # write (multi) d = (B2Bs(ID)+W2Bs(gain), B2Bs(ID+1)+W2Bs(gain), B2Bs(ID+2)+W2Bs(gain), B2Bs(ID+3)+W2Bs(gain),) print(d) print('writes=',b3m.WRITEs(0x5e, d, echo = ec)) # motor ctrl # set free if b3m.WRITE8(ID, 0x28, 2, echo = ec): # sel pos mode if b3m.WRITE8(ID, 0x28, 2, echo = ec): # sel profile if b3m.WRITE8(ID, 0x29, 1, echo = ec): # set normal if b3m.WRITE8(ID, 0x28, 0, echo = ec): # position for ang in (0, 4500, 9000, 4500, 0, -4500, -9000, -4500, 0): b3m.POSITION(ID, ang, 500) s = time.time() + 1 while s > time.time(): r = b3m.READ(ID, 0x27, 48) if r != None: n = tuple(iter_unpack('<BHBBhhhhhhHHHhIHhhHHHHiB', r))[0] print(f'READ(0x27~0x56)={ang:7d}{n[5]:7d}{n[8]:7d} {n[16]:5d}{n[17]:5d}{n[18]:7d}{n[19]:7d}', end='\r') else: print('READ(0x27~0x56)=xxxxx', end='\r') else: print('err:set normal') else: print('err:set profile') else: print('err:set pos mode') else: print('err:set free') print() # set free b3m.WRITE8(ID, 0x28, 2) del b3m print('fin')
#!/usr/bin/python3 # # scan.py import sys, kbhit, time from pyB3M import B3MProtocol kb = kbhit.KBHit() try: b3m = B3MProtocol(sys.argv[1] if len(sys.argv) == 2 else '/dev/ttyAMA0', 115200, 0.02) for b in (115200, 1000000, 1250000, 1500000, 2000000, 3000000): b3m.baudrate = b for id in range(255): if kb.kbhit(): k = kb.getch() break print(f' baud:{b:7} id:{id:3} ', end='find\n' if b3m.Ping(id) else '\r', flush=True) time.sleep(0.5) sys.stdout.write('\033[2K\033[1G') except: print(' ERR:There is some problem.')
#!/usr/bin/python3 # # changeid.py # Note that since processing begins after LOAD is performed, # various parameters are the same as immediately after startup. import os, sys from pyB3M import B3MProtocol baudlist = { 115200 : 0, 1000000 : 1, 1250000 : 2, 1500000 : 3, 2000000 : 4, 3000000 : 5 } if len(sys.argv) == 4: arg = sys.argv[1:] dev = '/dev/ttyAMA0' elif len(sys.argv) == 5: arg = sys.argv[2:] dev = sys.argv[1] else: arg = [] dev = '' if not os.path.exists(dev): dev = '' if dev != '': try: baud = int(arg[0]) baudind = baudlist[baud] previd = int(arg[1]) newid = int(arg[2]) b3m = B3MProtocol(dev, baud, timeout = 0.1) except: print(' ERR:There is some problem.') else: if newid >= 0 and newid <= 254 and previd >= 0 and previd <= 254: # Spell to eliminate the habit of poor readout of B3M b3m.Ping(previd) # Directs LOAD to the previous ID regardless of its presence or absence. b3m.LOAD(previd) # Confirm that the ID to be changed does not exist if not b3m.Ping(newid): # I don't know if it's necessary, but I'll put it in a free state. if b3m.WRITE8(previd, 0x28, 2): # Write new ID (It is an error because the response is changed with the new ID.) dum = b3m.WRITE8(previd, 0, newid) if b3m.Ping(newid): # SAVE if b3m.SAVE(newid): print(' OK') else: print(' NG -1') else: print(' NG -2') else: print(f' ERR: Device with ID:{previd} not found with BAUDRATE:{baud}') else: print(' ERR:Competing.') else: print(' ERR:There is some problem.') else: print(' usage: changeid [line] <baudrate> <prev id> <new id>')
#!/usr/bin/python3 # # changebaud.py # Note that since processing begins after LOAD is performed, # various parameters are the same as immediately after startup. import os, sys from pyB3M import B3MProtocol baudlist = { 115200 : 0, 1000000 : 1, 1250000 : 2, 1500000 : 3, 2000000 : 4, 3000000 : 5 } if len(sys.argv) == 4: arg = sys.argv[1:] dev = '/dev/ttyAMA0' elif len(sys.argv) == 5: arg = sys.argv[2:] dev = sys.argv[1] else: arg = [] dev = '' if not os.path.exists(dev): dev = '' if dev != '': try: prevbaud = int(arg[0]) prevind = baudlist[prevbaud] id = int(arg[1]) newbaud = int(arg[2]) newind = baudlist[newbaud] b3m = B3MProtocol(dev, newbaud, timeout = 0.1) except: print(' ERR: There is some problem.') else: # Verify that no devices exist at the changed baud rate if not b3m.Ping(id): b3m.baudrate = prevbaud # Spell to eliminate the habit of poor readout of B3M b3m.Ping(id) # Directs LOAD to the previous ID regardless of its presence or absence. b3m.LOAD(id) # I don't know if it's necessary, but I'll put it in a free state. if b3m.WRITE8(id, 0x28, 2): # Write new baudrate dum = b3m.WRITE32(id, 1, newbaud) if b3m.Ping(id): # SAVE if b3m.SAVE(id): b3m.RESET(id, 0) b3m.baudrate = newbaud if b3m.Ping(id): print(' OK') else: print(' NG -1') else: print(' NG -2') else: print(' NG -3') else: print(f' ERR: Device with ID:{id} not found with BAUDRATE:{prevbaud}') else: print(' ERR: Competing.') else: print(' usage: changebaud [line] <baudrate> <id> <new baudrate>')