B3Mはパケット構成が緩いので、ファームウェアが想定していない条件で通信を行うと重篤な事態に陥りやすい。
WRITEとPOSITIONのみマルチに対応したコマンドとしてWRITEsとPOSITIONsを用意。
ERROR STATUSをどのように成否判定に使うべきかが判然としないので無視。
#!/usr/bin/python3 # -*- coding: utf-8 -*- # # pyB3M.py # SPDX-License-Identifier: MIT # SPDX-FileCopyrightText: (C) 2024 mukyokyo import serial, threading, array, struct from typing import Union from collections import namedtuple from struct import pack, unpack, iter_unpack from typing import overload ########################################################## # Functionalized the part of converting int to bytes. # If specified as a tuple, it is converted to bytes at once. ########################################################## def B2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d) else: return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d)) def W2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]]) else: return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d)) def L2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]]) else: return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d)) ########################################################## # API for Kondo B3M ########################################################## class B3MProtocol: BROADCASTING_ID = 0xff OPT_LOAD = 1 OPT_SAVE = 2 OPT_READ = 3 OPT_WRITE = 4 OPT_RESET = 5 OPT_POSITION = 6 (SYSW_BAUD_57600, SYSW_BAUD_115_2k, SYSW_BAUD_625k, SYSW_BAUD_1M, SYSW_BAUD_1_25M, SYSW_BAUD_1_5M, SYSW_BAUD_2M, SYSW_BAUD_3M) = range(8) (SYSW_PARITY_NONE, SYSW_PARITY_ODD, SYSW_PARITY_EVEN) = range(3) (MOTW_OPT_NONE, MOTW_OPT_TORQUEON, MOTW_OPT_FREE, _, MOTW_OPT_BRAKE, _, _, _, MOTW_OPT_HOLD) = range(9) def __init__(self, port : Union[serial.Serial, str], baudrate = 57600, timeout = 0.05, lock = None): if isinstance(port, serial.Serial): self.__serial = port self.__baudrate = port.baudrate self.__timeout = port.timeout else: self.__serial = serial.Serial(port, baudrate = baudrate, timeout = timeout) self.__baudrate = self.__serial.baudrate self.__timeout = self.__serial.timeout if lock == None: self.__lock = threading.Lock() else: self.__lock = lock self.__status = 0 def __del__(self): pass @property def lock(self): return self.__lock @property def baudrate(self): return self.__serial.baudrate @baudrate.setter def baudrate(self, baudrate): self.__baudrate = baudrate self.__serial.baudrate = baudrate @property def timeout(self): return self.__serial.timeout @timeout.setter def timeout(self, timeout): self.__timeout = timeout self.__serial.timeout = timeout def __reconfig(self): self.__serial.baudrate = self.__baudrate self.__serial.timeout = self.__timeout @property def status(self): return self.__status def TxPacket(self, cmd : int, opt : int, param : bytes, echo = False) -> (bytes, bool): self.__reconfig() if cmd >= 0 and cmd <= 0xff and opt >= 0 and opt <= 0xff and len(param) <= (256 - 5): txp = bytes((len(param) + 4, cmd, opt)) + bytes(param) txp += B2Bs(sum(txp) & 0xff) self.__serial.reset_input_buffer() if echo: print('TX:', txp.hex(':')) self.__serial.write(txp) return txp, True return None, False def __rx(self, length) -> bytes: s = self.__serial.read(length) l = len(s) if l == length: return s else: r = s length -= l while self.__serial.in_waiting > 0: s = self.__serial.read(length) r += s length -= len(s) if length <= 0: break return r def RxPacket(self, echo = False) -> (bytes, bool): rxp = self.__rx(1) if rxp: if len(rxp) == 1: if rxp[0] >= 5: l = rxp[0] - 1 rxp += self.__rx(l) if rxp[1] >= 0x81 and rxp[1] <= 0x86 and rxp[3] <= 0xfe and rxp[-1] == (sum(rxp[:-1]) & 0xff): self.__status = rxp[2] if echo: print('RX:', rxp.hex(':')) return bytes(rxp), True if echo: print('RX;', rxp.hex(';'), ' xxx') return None, False def WRITE(self, id : int, addr : int, data : bytes, reset = False, echo = False) -> bool: with self.__lock: if id >= 0 and id <= 255 and addr >= 0 and addr <= 0xff and len(data) <= 256 - 7: if self.TxPacket(self.OPT_WRITE, 0x80 if reset else 0x00, B2Bs(id) + data + B2Bs(addr) + B2Bs(1), echo)[1]: if id != self.BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return (d[3] == id) and (d[1] == (self.OPT_WRITE | 0x80)) else: return True return False def WRITEs(self, addr : int, iddatas : Union[list, tuple], reset = False, echo = False) -> bool: with self.__lock: if addr >= 0 and addr <= 0xff and (isinstance(iddatas, list) or isinstance(iddatas, tuple)): cnt = len(iddatas) if cnt > 0: txd = bytes(0) l = len(iddatas[0]) for iddata in iddatas: if iddata[0] < 0 or iddata[0] > 254 or len(iddata) != l: return False txd += iddata txd += B2Bs(addr) + B2Bs(cnt) return self.TxPacket(self.OPT_WRITE, 0x80 if reset else 0x00, txd, echo)[1] return False def WRITE8(self, id : int, addr : int, data : Union[int, list, tuple], reset = False, echo = False) -> bool: return self.WRITE(id, addr, B2Bs(data), reset, echo) def WRITE16(self, id : int, addr : int, data : Union[int, list, tuple], reset = False, echo = False) -> bool: return self.WRITE(id, addr, W2Bs(data), reset, echo) def WRITE32(self, id : int, addr : int, data : Union[int, list, tuple], reset = False, echo = False) -> bool: return self.WRITE(id, addr, L2Bs(data), reset, echo) def READ(self, id : int, addr : int, length : int, reset = False, echo = False) -> bytes: with self.__lock: if addr >= 0 and id <= 254 and addr >= 0 and addr <= 0xff and length >= 1 and length <= 250: if self.TxPacket(self.OPT_READ, 0x80 if reset else 0x00, B2Bs(id) + B2Bs(addr) + B2Bs(length), echo)[1]: d, r = self.RxPacket(echo) if r: if d[1] == (self.OPT_READ | 0x80) and id == d[3] and len(d) == length + 5: return bytes(d[4:-1]) return None def READ8(self, id : int, addr : int, length = 1, signed = False, reset = False, echo = False) -> int: r = self.READ(id, addr, length, reset, echo) if r != None: n = sum(iter_unpack('b' if signed else 'B', r), ()) return n if length > 1 else n[0] return None def READ16(self, id : int, addr : int, length = 1, signed = False, reset = False, echo = False) -> int: r = self.READ(id, addr, 2 * length, reset, echo) if r != None: n = sum(iter_unpack('h' if signed else 'H', r), ()) return n if length > 1 else n[0] return None def READ32(self, id : int, addr : int, length = 1, signed = False, reset = False, echo = False) -> int: r = self.READ(id, addr, 4 * length, reset, echo) if r != None: n = sum(iter_unpack('i' if signed else 'I', r), ()) return n if length > 1 else n[0] return None def LOAD(self, id : int, echo = False) -> bool: with self.__lock: if self.TxPacket(self.OPT_LOAD, 0x0, B2Bs(id), echo)[1]: if id != self.BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return id == d[3] and d[1] == (self.OPT_LOAD | 0x80) and len(d) == 5 else: return True; return False def SAVE(self, id : int, echo = False) -> bool: with self.__lock: if self.TxPacket(self.OPT_SAVE, 0x0, B2Bs(id), echo)[1]: if id != self.BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return id == d[3] and d[1] == (self.OPT_SAVE| 0x80) and len(d) == 5 else: return True; return False def POSITION(self, id : int, pos : int, tim : int, reset = False, echo = False) -> int: with self.__lock: if pos >= -32000 and pos <= 32000 and tim >= 0 and tim <= 65535: if self.TxPacket(self.OPT_POSITION, 0x80 if reset else 0x00, B2Bs(id) + W2Bs(pos) + W2Bs(tim), echo): if id != self.BROADCASTING_ID: dat, r = self.RxPacket(echo) if r: if dat[3] == id and len(dat) == 7: return tuple(n[0] for n in iter_unpack('<h', dat[4:6]))[0] else: return None else: return None return None def POSITIONs(self, idposs : Union[list, tuple], tim : int, reset = False, echo = False) -> bool: with self.__lock: if tim >= 0 and tim <= 65535: cnt = len(idposs) if cnt > 0: txd = bytes(0) l = len(idposs[0]) for idpos in idposs: if len(idpos) != 2: return False if idpos[0] < 0 or idpos[0] > 254 or len(idpos) != l or idpos[1] < -32000 or idpos[1] > 32000: return False txd += B2Bs(idpos[0]) + W2Bs(idpos[1]) txd += W2Bs(tim) return self.TxPacket(self.OPT_POSITION, 0x80 if reset else 0x00, txd, echo)[1] return False def RESET(self, id : int, tim : int, echo = False) -> bool: with self.__lock: if ((id >= 0 and id <= 255) or id == self.BROADCASTING_ID) and tim >= 0 and tim <= 255: if self.TxPacket(self.OPT_RESET, 0x0, B2Bs(id) + B2Bs(tim), echo)[1]: return True return False def Ping(self, id : int, echo = False) -> bool: if id >= 0 and id <= 254: for i in range(3): r = self.READ8(id, 0, 1, False, echo) if r != None: return True return False ########################################################## # test code ########################################################## if __name__ == "__main__": import time from pyB3M import * try: b3m = B3MProtocol('/dev/ttyAMA0', 1000000, 1) except: pass else: ID = 0 ec = True # r/w test print('reset=',b3m.RESET(ID, 0, echo = ec)) time.sleep(0.5) print('load=',b3m.LOAD(ID, echo = ec)) print('save=',b3m.SAVE(ID, echo = ec)) print('read 5~9=',b3m.READ(ID, 0, 5, echo = ec).hex(',')) print('read32 0=',b3m.READ32(ID, 0, echo = ec)) print('read16 5=',b3m.READ16(ID, 5, echo = ec)) print('read16 7=',b3m.READ16(ID, 7, echo = ec)) print('read16 0x50=',b3m.READ16(ID, 0x50, echo = ec)) # r/w pid gain gain = list(b3m.READ16(ID, 0x5e, 3, echo = ec)) print('pid r=',gain) print('pid w=',b3m.WRITE16(ID, 0x5e, gain, echo = ec)) print('pid r=',b3m.READ16(ID, 0x5e, 3, echo = ec)) # write (multi) d = (B2Bs(ID)+W2Bs(gain), B2Bs(ID+1)+W2Bs(gain), B2Bs(ID+2)+W2Bs(gain), B2Bs(ID+3)+W2Bs(gain),) print(d) print('writes=',b3m.WRITEs(0x5e, d, echo = ec)) # motor ctrl # set free if b3m.WRITE8(ID, 0x28, 2, echo = ec): # sel pos mode if b3m.WRITE8(ID, 0x28, 2, echo = ec): # sel profile if b3m.WRITE8(ID, 0x29, 1, echo = ec): # set normal if b3m.WRITE8(ID, 0x28, 0, echo = ec): # position for ang in (0, 4500, 9000, 4500, 0, -4500, -9000, -4500, 0): b3m.POSITION(ID, ang, 500) s = time.time() + 1 while s > time.time(): r = b3m.READ(ID, 0x27, 48) if r != None: n = tuple(iter_unpack('<BHBBhhhhhhHHHhIHhhHHHHiB', r))[0] print(f'READ(0x27~0x56)={ang:7d}{n[5]:7d}{n[8]:7d} {n[16]:5d}{n[17]:5d}{n[18]:7d}{n[19]:7d}', end='\r') else: print('READ(0x27~0x56)=xxxxx', end='\r') else: print('err:set normal') else: print('err:set profile') else: print('err:set pos mode') else: print('err:set free') print() # set free b3m.WRITE8(ID, 0x28, 2) del b3m print('fin')
IDやボーレートが分からない場合の検索。kbhit要。
B3M側の設定と異なるボーレートで疎通確認を行うため、B3Mの内部エラーが誘発されるのと、B3Mがパケットを誤認して設定が変わってしまうなどのリスクがあるので、あまり使うべきではない。
#!/usr/bin/python3 # # scan.py import sys, kbhit, time from pyB3M import B3MProtocol kb = kbhit.KBHit() try: b3m = B3MProtocol(sys.argv[1] if len(sys.argv) == 2 else '/dev/ttyAMA0', 115200, 0.02) for b in (115200, 1000000, 1250000, 1500000, 2000000, 3000000): b3m.baudrate = b for id in range(255): if kb.kbhit(): k = kb.getch() break print(f' baud:{b:7} id:{id:3} ', end='find\n' if b3m.Ping(id) else '\r', flush=True) time.sleep(0.5) sys.stdout.write('\033[2K\033[1G') except: print(' ERR:There is some problem.')
任意のIDへ変更。
#!/usr/bin/python3 # # changeid.py # Note that since processing begins after LOAD is performed, # various parameters are the same as immediately after startup. import os, sys from pyB3M import B3MProtocol baudlist = { 115200 : 0, 1000000 : 1, 1250000 : 2, 1500000 : 3, 2000000 : 4, 3000000 : 5 } if len(sys.argv) == 4: arg = sys.argv[1:] dev = '/dev/ttyAMA0' elif len(sys.argv) == 5: arg = sys.argv[2:] dev = sys.argv[1] else: arg = [] dev = '' if not os.path.exists(dev): dev = '' if dev != '': try: baud = int(arg[0]) baudind = baudlist[baud] previd = int(arg[1]) newid = int(arg[2]) b3m = B3MProtocol(dev, baud, timeout = 0.1) except: print(' ERR:There is some problem.') else: if newid >= 0 and newid <= 254 and previd >= 0 and previd <= 254: # Spell to eliminate the habit of poor readout of B3M b3m.Ping(previd) # Directs LOAD to the previous ID regardless of its presence or absence. b3m.LOAD(previd) # Confirm that the ID to be changed does not exist if not b3m.Ping(newid): # I don't know if it's necessary, but I'll put it in a free state. if b3m.WRITE8(previd, 0x28, 2): # Write new ID (It is an error because the response is changed with the new ID.) dum = b3m.WRITE8(previd, 0, newid) if b3m.Ping(newid): # SAVE if b3m.SAVE(newid): print(' OK') else: print(' NG -1') else: print(' NG -2') else: print(f' ERR: Device with ID:{previd} not found with BAUDRATE:{baud}') else: print(' ERR:Competing.') else: print(' ERR:There is some problem.') else: print(' usage: changeid [line] <baudrate> <prev id> <new id>')
任意のボーレートへ変更。
セオリー通りの手順なのだが、手持ちのB3Mでは保存されないことが頻出して嫌気がさしている。
#!/usr/bin/python3 # # changebaud.py # Note that since processing begins after LOAD is performed, # various parameters are the same as immediately after startup. import os, sys from pyB3M import B3MProtocol baudlist = { 115200 : 0, 1000000 : 1, 1250000 : 2, 1500000 : 3, 2000000 : 4, 3000000 : 5 } if len(sys.argv) == 4: arg = sys.argv[1:] dev = '/dev/ttyAMA0' elif len(sys.argv) == 5: arg = sys.argv[2:] dev = sys.argv[1] else: arg = [] dev = '' if not os.path.exists(dev): dev = '' if dev != '': try: prevbaud = int(arg[0]) prevind = baudlist[prevbaud] id = int(arg[1]) newbaud = int(arg[2]) newind = baudlist[newbaud] b3m = B3MProtocol(dev, newbaud, timeout = 0.1) except: print(' ERR: There is some problem.') else: # Verify that no devices exist at the changed baud rate if not b3m.Ping(id): b3m.baudrate = prevbaud # Spell to eliminate the habit of poor readout of B3M b3m.Ping(id) # Directs LOAD to the previous ID regardless of its presence or absence. b3m.LOAD(id) # I don't know if it's necessary, but I'll put it in a free state. if b3m.WRITE8(id, 0x28, 2): # Write new baudrate dum = b3m.WRITE32(id, 1, newbaud) if b3m.Ping(id): # SAVE if b3m.SAVE(id): b3m.RESET(id, 0) b3m.baudrate = newbaud if b3m.Ping(id): print(' OK') else: print(' NG -1') else: print(' NG -2') else: print(' NG -3') else: print(f' ERR: Device with ID:{id} not found with BAUDRATE:{prevbaud}') else: print(' ERR: Competing.') else: print(' usage: changebaud [line] <baudrate> <id> <new baudrate>')