B3Mシリーズと大差ないと思いきや、コネクタが全く違う。さらに標準のケーブルがぶっとい。DXHUBにつなぎたかったので、コレを使ってケーブルを自作。
ひとまず電源は手持ちのACアダプタを使う事にしたが、結果的に市販の物では全く容量足りず。トラブルを避けるには、ガッツリ電流取れる電池か安定化電源用意した方が良いだろう。
聞くところによるとファームウェアは日々更新されているらしく、必ず新しいバージョンを適用して使うべしとの事。
確かにそのままでは後に紹介するスクリプトがエラーになる事があったので、有無も言わさず更新すべきかと。
上澄みだけさらった感触を書いてみる。
DXLやB3Mシリーズを使ったことがあれば運用は簡単。PICのイメージが色濃いKRSとは一線を画すので、それに慣れ切っていると難しい。
B3Mからの仕切り直し感を感じるので、種々のリクエストを吸収してもらえる事を大いに期待し、初期リリース版に触れて感じた点も書いておく。
ここら辺にある事は既知だが、それをあえて見ずに基本的なコマンドを関数にしたクラスを作った(Python3.9以降で動作する事を確認)。
ホントはDXL用のスクリプトを作ったついでにあつらえたのだが、メモリマップ上に配置された情報を読み書きするという概念は同じなので、あっさりと実現。
なおbytesを直接的に扱うのは避けたかったので、一部を除きできるだけint型とリストやタプルで記述できるようにした。使い方は末尾のデモコードを参考にすれば大半は理解できるかと思う。
なお連続で通信する場合は2バイト以上の間隔を置けと注意事項に記されているのだが、ここではそれを一切考慮していない。将来奇跡的にホスト側の処理スピードが速くなり、前後のパケットが近接するようになったら考える事にする。
#!/usr/bin/python3 # -*- coding: utf-8 -*- # # pyPMX.py # SPDX-License-Identifier: MIT # SPDX-FileCopyrightText: (C) 2024 mukyokyo import serial, threading, array, struct from typing import Union from collections import namedtuple from struct import pack, unpack, iter_unpack ########################################################## # Functionalized the part of converting int to bytes. # If specified as a tuple, it is converted to bytes at once. ########################################################## def B2Bs(d) --> bytes: if isinstance(d, list) or isinstance(d, tuple): return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d) else: return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d)) def W2Bs(d) --> bytes: if isinstance(d, list) or isinstance(d, tuple): return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]]) else: return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d)) def L2Bs(d) --> bytes: if isinstance(d, list) or isinstance(d, tuple): return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]]) else: return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d)) ########################################################## # API for Kondo PMX ########################################################## class PMXProtocol: BROADCASTING_ID = 0xff CMD_MemREAD = 0xa0 CMD_MemWRITE = 0xa1 CMD_LOAD = 0xa2 CMD_SAVE = 0xa3 CMD_MotorREAD = 0xa4 CMD_MotorWRITE = 0xa5 CMD_SystemREAD = 0xbb CMD_SystemWRITE = 0xbc CMD_ReBoot = 0xbd CMD_FactoryReset = 0xbe CMD_SystemINIT = 0xbf (SYSW_BAUD_57600, SYSW_BAUD_115_2k, SYSW_BAUD_625k, SYSW_BAUD_1M, SYSW_BAUD_1_25M, SYSW_BAUD_1_5M, SYSW_BAUD_2M, SYSW_BAUD_3M) = range(8) (SYSW_PARITY_NONE, SYSW_PARITY_ODD, SYSW_PARITY_EVEN) = range(3) (MOTW_OPT_NONE, MOTW_OPT_TORQUEON, MOTW_OPT_FREE, _, MOTW_OPT_BRAKE, _, _, _, MOTW_OPT_HOLD) = range(9) __crc16_lutable = array.array('H') def __init__(self, port : Union[serial.Serial, str], baudrate = 57600, timeout = 0.05, lock = None): if isinstance(port, serial.Serial): self.__serial = port self.__baudrate = port.baudrate self.__timeout = port.timeout else: self.__serial = serial.Serial(port, baudrate = baudrate, timeout = timeout) self.__baudrate = self.__serial.baudrate self.__timeout = self.__serial.timeout if lock == None: self.__lock = threading.Lock() else: self.__lock = lock self.__status = 0 poly = 0x1021 for i in range(256): nAccum = i << 8 for j in range(8): nAccum = (poly ^ (nAccum << 1) if nAccum & 0x8000 else nAccum << 1) & 0xffff self.__crc16_lutable.append(nAccum) def __del__(self): del self.__crc16_lutable[:] @property def lock(self): return self.__lock @property def baudrate(self): return self.__serial.baudrate @baudrate.setter def baudrate(self, baudrate): self.__baudrate = baudrate self.__serial.baudrate = baudrate @property def timeout(self): return self.__serial.timeout @timeout.setter def timeout(self, timeout): self.__timeout = timeout self.__serial.timeout = timeout def __reconfig(self): self.__serial.baudrate = self.__baudrate self.__serial.timeout = self.__timeout @property def status(self): return self.__status def __crc16(self, data : bytes) --> int: crc = 0 for d in data: crc = (crc << 8) ^ self.__crc16_lutable[((crc ->-> 8) ^ d) & 0xff] return crc & 0xffff def TxPacket(self, id : int, cmd : int, opt : int, param : bytes, echo = False) --> (bytes, bool): self.__reconfig() if ((id ->= 0 and id <= 239) or id == self.BROADCASTING_ID) and len(param) <= (256 - 8): txp = bytes([0xfe,0xfe,id,len(param) + 8,cmd,opt]) + bytes(param) txp += W2Bs(self.__crc16(txp)) self.__serial.reset_input_buffer() if echo: print('TX:', txp.hex(':')) self.__serial.write(txp) return txp, True return None, False def __rx(self, length) --> bytes: s = self.__serial.read(length) l = len(s) if l == length: return s else: r = s length -= l while self.__serial.in_waiting -> 0: s = self.__serial.read(length) r += s length -= len(s) if length <= 0: break return r def RxPacket(self, echo = False) --> (bytes, bool): rxp = self.__rx(6) if rxp: if len(rxp) == 6: if rxp[3] -> 0: l = rxp[3] - 6 if l -> 0: rxp += self.__rx(l) if len(rxp) == l + 6 and rxp[0] == 0xfe and rxp[1] == 0xfe and unpack('<H', rxp[-2:])[0] == self.__crc16(rxp[:-2]): self.__status = rxp[5] if echo: print('RX:', rxp.hex(':')) return bytes(rxp), True return None, False def MemWRITE(self, id : int, addr : int, data : bytes, echo = False) --> bool: with self.__lock: if ((id ->= 0 and id <= 239) or id == self.BROADCASTING_ID) and addr ->= 0 and addr <= 0x4ff: if self.TxPacket(id, self.CMD_MemWRITE, 1, W2Bs(addr) + data, echo)[1]: if id != self.BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return (d[2] == id) and (d[4] == 0x21) else: return True return False def MemWRITE8(self, id : int, addr : int, data : Union[int, list, tuple], echo = False) --> bool: return self.MemWRITE(id, addr, B2Bs(data), echo) def MemWRITE16(self, id : int, addr : int, data : Union[int, list, tuple], echo = False) --> bool: return self.MemWRITE(id, addr, W2Bs(data), echo) def MemWRITE32(self, id : int, addr : int, data : Union[int, list, tuple], echo = False) --> bool: return self.MemWRITE(id, addr, L2Bs(data), echo) def MemREAD(self, id : int, addr : int, length : int, echo = False) --> bytes: with self.__lock: if addr ->= 0 and id <= 239 and addr ->= 0 and addr <= 0x4ff and length -> 0 and length -> 0 and length <= 247: if self.TxPacket(id, self.CMD_MemREAD, 0, W2Bs(addr) + B2Bs(length), echo)[1]: d, r = self.RxPacket(echo) if r: if d[4] == 0x20 and id == d[2] and (d[5] & (4 + 8 + 0x10 + 0x40)) == 0: return bytes(d[6:-2]) return None def MemREAD8(self, id : int, addr : int, length = 1, signed = False, echo = False) --> int: r = self.MemREAD(id, addr, length, echo) if r != None: n = sum(iter_unpack('b' if signed else 'B', r), ()) return n if length -> 1 else n[0] return None def MemREAD16(self, id : int, addr : int, length = 1, signed = False, echo = False) --> int: r = self.MemREAD(id, addr, 2 * length, echo) if r != None: n = sum(iter_unpack('h' if signed else 'H', r), ()) return n if length -> 1 else n[0] return None def MemREAD32(self, id : int, addr : int, length = 1, signed = False, echo = False) --> int: r = self.MemREAD(id, addr, 4 * length, echo) if r != None: n = sum(iter_unpack('i' if signed else 'I', r), ()) return n if length -> 1 else n[0] return None def LOAD(self, id : int, echo = False) --> bool: with self.__lock: if self.TxPacket(id, self.CMD_LOAD, 0, (), echo)[1]: if id != self.BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return id == d[2] and d[4] == 0x22 else: return True; return False def SAVE(self, id : int, echo = False) --> bool: with self.__lock: if self.TxPacket(id, self.CMD_SAVE, 0, (), echo)[1]: if id != self.BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return id == d[2] and d[4] == 0x23 else: return True; return False def MotorREAD(self, id : int, echo = False) --> tuple: with self.__lock: if self.TxPacket(id, self.CMD_MotorREAD, 0, (), echo): if id != self.BROADCASTING_ID: dat, r = self.RxPacket(echo) if r: return tuple([dat[6], tuple(n[0] for n in iter_unpack('<h', dat[7:-2])) if len(dat[7:-2]) -> 0 and dat[4] == 0x24 else ()]) else: return None else: return () return None def MotorWRITE(self, id : int, opt : int, dat : (), echo = False) --> tuple: with self.__lock: if self.TxPacket(id, self.CMD_MotorWRITE, opt, W2Bs(dat), echo): if id != self.BROADCASTING_ID: dat, r = self.RxPacket(echo) if r: return tuple([dat[6], tuple(n[0] for n in iter_unpack('<h', dat[7:-2])) if len(dat[7:-2]) -> 0 and dat[4] == 0x25 else ()]) else: return None else: return () return None def SystemREAD(self, id : int, echo = False) --> tuple: with self.__lock: if self.TxPacket(id, self.CMD_SystemREAD, 0, (), echo)[1]: d, r = self.RxPacket(echo) if r: if len(d[6:-2]) == 13 and d[4] == 0x3b: v = tuple(iter_unpack('<IIIB', d[6:-2]))[0] return tuple(v) return None def SystemWRITE(self, id : int, data : (), echo = False) --> bool: if data[0] ->= 0 and data[0] <= 239 and data[1] ->= 0 and data[1] <= 7 and data[2] ->= 0 and data[2] <= 2 and data[3] ->= 1 and data[3] <= 255: d = self.SystemREAD(id, echo) if d: with self.__lock: if self.TxPacket(id, self.CMD_SystemWRITE, 0xf, L2Bs(d[0]) + B2Bs(data), echo)[1]: d, r = self.RxPacket(echo) if r: return id == d[2] and d[4] == 0x3c and (d[5] & (4 + 8 + 0x10 + 0x20 + 0x40)) == 0 return False def ReBoot(self, id : int, echo = False) --> bool: with self.__lock: if id != self.BROADCASTING_ID: if self.TxPacket(id, self.CMD_ReBoot, 0, W2Bs(0), echo)[1]: d, r = self.RxPacket(echo) if r: return d[2] == id and d[4] == 0x3d return False def FactoryReset(self, id : int, echo = False) --> bool: if id != self.BROADCASTING_ID: d = self.SystemREAD(id, echo) if d != None: with self.__lock: if self.TxPacket(id, self.CMD_FactoryReset, 0, L2Bs(d[0]), echo)[1]: d, r = self.RxPacket(echo) if r: return d[2] == id and d[4] == 0x3e return False def FullScan(self) --> tuple: orgtimeout = self.timeout orgbaudrate = self.baudrate baud=(57600,115200,625000,1000000,1250000,1500000,2000000,3000000) self.timeout = 0.005 for b in baud: self.baudrate = b for id in range(240): r = self.MemREAD8(id, 0) print(f'baud:{b:7} id:{id:3} :', end='find\n' if r else 'none\r') self.timeout = orgtimeout self.baudrate = orgbaudrate ########################################################## # test code ########################################################## if __name__ == "__main__": import time from pyPMX import * try: pmx = PMXProtocol('\\\\.\\COM12', 3000000) except: pass else: ID = 0 # reboot print('ReBoot=', pmx.ReBoot(ID, echo=True)) time.sleep(0.5) ''' # factory reset print('FactoryReset=',pmx.FactoryReset(ID)) time.sleep(0.5) ''' # load print('LOAD=', pmx.LOAD(ID, echo=True)) # save print('SAVE=', pmx.SAVE(ID, echo=True)) ''' # sys write print('SysWrite=',pmx.SystemWRITE(ID, (ID, pmx.SYSW_BAUD_115_2k, pmx.SYSW_PARITY_NONE, 20))) time.sleep(0.5) ''' # sys read r = pmx.SystemREAD(ID) print('SysRead=', f'${r[0]:08x} ${r[1]:08x} ${r[2]:08x} {r[3]:d} ' if r else 'False') # dump memory print('dump') for addr in range(0, 800, 20): r = pmx.MemREAD(ID, addr, 20) print(f' MemREAD({addr}:{20})=', r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!') # gain r = pmx.MemREAD(ID, 0, 64) print('gain\n MemREAD(0:64)=', *iter_unpack('<IIIIIIIIIIIIIIII', r) if r else '!!!!!!!!') # voltage limit r = pmx.MemREAD(ID, 76, 8) print('volt lim\n MemREAD(76:8)=', *iter_unpack('<HHHH', r) if r else '!!!!!!!!') # present value r = pmx.MemREAD(ID, 300, 24) print('present val\n MemREAD(300:20)=', *iter_unpack('<hhhhhhhhhHHH', r) if r else '!!!!!!!!') # error stat r = pmx.MemREAD(ID, 400, 6) print('err stat\n MemREAD(0:400:7)=', *iter_unpack('<BBBxH', r) if r else '!!!!!!!!') print('preparation for pos ctrl') # ctrl off print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) time.sleep(0.1) # 1:pos,2:speed,4:cur,8:torq,16:pwm,32:time print(' MemWRITE8(501)=', pmx.MemWRITE8(ID, 501, 1)) print(' MemWRITE8(502)=', pmx.MemWRITE8(ID, 502, 0b11111)) time.sleep(0.1) # goal angle print('start pos ctrl') print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_TORQUEON, ())) for ang in tuple(range(0, 32000, 500)) + tuple(range(32000, -32000, -500)) + tuple(range(-32000, 500, 500)): for n in range(10): r = pmx.MotorWRITE(ID, pmx.MOTW_OPT_NONE, (ang,)) if r != None: if len(r) -> 0: print(f' MotorWRITE= {r[0]},', ''.join('{:6},'.format(k) for k in r[1]), 'stat=', pmx.status, end = '\r') pmx.MemREAD(ID, 400, 6) else: print(' MotorWRITE= !!!!!!!!!!!!!!!!!!!!!!!!') break time.sleep(0.005) else: continue pass break else: print() time.sleep(0.1) print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) time.sleep(0.1) # 1:pos,2:speed,4:cur,8:torq,16:pwm,32:time print(' MemWRITE8(501)=', pmx.MemWRITE8(ID, 501, 2)) time.sleep(0.1) # goal angle print('start speed ctrl') print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_TORQUEON, ())) for spd in tuple(range(0, 2000, 50)) + tuple(range(2000, -2000, -50)) + tuple(range(-2000, 50, 50)): for n in range(30): r = pmx.MotorWRITE(ID, pmx.MOTW_OPT_NONE, (spd,)) if r != None: if len(r) -> 0: print(f' MotorWRITE= {r[0]},', ''.join('{:6},'.format(k) for k in r[1]), 'stat=', pmx.status, end = '\r') pmx.MemREAD(ID, 400, 6) else: print(' MotorWRITE= !!!!!!!!!!!!!!!!!!!!!!!!') break time.sleep(0.005) else: continue pass break else: print() time.sleep(0.1) print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) del pmx print('fin')
こちらでは主にここにあるWindows環境の未リリース版イメージで試しているが、基本的にはMSYS2をインストールしておけばその中で動かせる。もちろんLinuxやmacでも動くので、気楽に試してみてはどうだろう。
IDやボーレートが分からないと何も始められないので、まずは検索ができると何かと便利。
検索はライブラリに入れておいた機能を呼び出すだけなので、次のコードで簡単に利用できる。
#!/usr/bin/python3 import os, sys from pyPMX import PMXProtocol dev = '\\\\.\\COM12' if len(sys.argv) == 1 else sys.argv[1] if not os.path.exists(dev): dev = '' if dev != '': try: pmx = PMXProtocol(dev, 57600) pmx.FullScan() except: print(' ERR:There is some problem.') pass sys.stdout.write('\033[2K\033[1G') else: print(' usage: scan [line]')
ネットワーク上のPMXは各々個別のIDが割り振られている必要があり、同一のものが存在している事を許容していない。
次のコードは任意のIDへ変更するものだが、少しでも疑義が生じたらIDの変更を行わないようにしている。
#!/usr/bin/python3 import os, sys from pyPMX import PMXProtocol baudlist = { 57600 : 0, 115200 : 1, 625000 : 2, 1000000 : 3, 1250000 : 4, 1500000 : 5, 2000000 : 6, 3000000 : 7 } if len(sys.argv) == 4: arg = sys.argv[1:] dev = '\\\\.\\COM12' elif len(sys.argv) == 5: arg = sys.argv[2:] dev = sys.argv[1] else: arg = [] dev = '' if not os.path.exists(dev): dev = '' if dev != '': try: baud = int(arg[0]) baudind = baudlist[baud] previd = int(arg[1]) newid = int(arg[2]) pmx = PMXProtocol(dev, baud) except: print(' ERR:There is some problem.') else: if newid >= 0 and newid <= 239: if pmx.MemREAD8(newid, 0) == None: s = (newid, baudind, pmx.SYSW_PARITY_NONE, 20) if pmx.MotorWRITE(previd, pmx.MOTW_OPT_FREE, ()) != None: if pmx.SystemWRITE(previd, s): print(' OK') else: print(' NG') else: print(f' ERR: Device with ID:{previd} not found with BAUDRATE:{baud}') else: print(' ERR:Competing.') else: print(' ERR:There is some problem.') else: print(' usage: changeid [line] <baudrate> <prev id> <new id>')
次のコードは任意のボーレートへ変更するものだが、少しでも疑義が生じたらボーレートの変更を行わないようにしている。
#!/usr/bin/python3 import os, sys from pyPMX import PMXProtocol baudlist = { 57600 : 0, 115200 : 1, 625000 : 2, 1000000 : 3, 1250000 : 4, 1500000 : 5, 2000000 : 6, 3000000 : 7 } if len(sys.argv) == 4: arg = sys.argv[1:] dev = '\\\\.\\COM12' elif len(sys.argv) == 5: arg = sys.argv[2:] dev = sys.argv[1] else: arg = [] dev = '' if not os.path.exists(dev): dev = '' if dev != '': try: prevbaud = int(arg[0]) prevind = baudlist[prevbaud] id = int(arg[1]) newbaud = int(arg[2]) newind = baudlist[newbaud] pmx = PMXProtocol(dev, newbaud) except: print(' ERR: There is some problem.') else: if pmx.MemREAD8(id, 0) == None: pmx.baudrate = prevbaud if pmx.MotorWRITE(id, pmx.MOTW_OPT_FREE, ()) != None: s = (id, newind, pmx.SYSW_PARITY_NONE, 20) if pmx.SystemWRITE(id, s): print(' OK') else: print(' NG') else: print(f' ERR: Device with ID:{id} not found with BAUDRATE:{prevbaud}') else: print(' ERR: Competing.') else: print(' usage: changebaud [line] <baudrate> <id> <new baudrate>')
PicoSHIELD+Raspberry Pi Picoを対象にPythonのコードを作ってみた。
# pyPMX.py for MicroPython(RP2040) # SPDX-License-Identifier: MIT # SPDX-FileCopyrightText: (C) 2024 mukyokyo from machine import UART, Pin import time, array from collections import namedtuple from struct import pack, unpack ########################################################## # Functionalized the part of converting int to bytes. # If specified as a tuple, it is converted to bytes at once. ########################################################## @micropython.native def B2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d) else: return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d)) @micropython.native def W2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]]) else: return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d)) @micropython.native def L2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]]) else: return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d)) ########################################################## # API for Kondo PMX ########################################################## class PMXProtocol: BROADCASTING_ID = const(0xff) CMD_MemREAD = const(0xa0) CMD_MemWRITE = const(0xa1) CMD_LOAD = const(0xa2) CMD_SAVE = const(0xa3) CMD_MotorREAD = const(0xa4) CMD_MotorWRITE = const(0xa5) CMD_SystemREAD = const(0xbb) CMD_SystemWRITE = const(0xbc) CMD_ReBoot = const(0xbd) CMD_FactoryReset = const(0xbe) CMD_SystemINIT = const(0xbf) (SYSW_BAUD_57600, SYSW_BAUD_115_2k, SYSW_BAUD_625k, SYSW_BAUD_1M, SYSW_BAUD_1_25M, SYSW_BAUD_1_5M, SYSW_BAUD_2M, SYSW_BAUD_3M) = range(8) (SYSW_PARITY_NONE, SYSW_PARITY_ODD, SYSW_PARITY_EVEN) = range(3) (MOTW_OPT_NONE, MOTW_OPT_TORQUEON, MOTW_OPT_FREE, _, MOTW_OPT_BRAKE, _, _, _, MOTW_OPT_HOLD) = range(9) __crc16_lutable = array.array('H',( 0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef, 0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de, 0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485,0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d, 0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4,0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc, 0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823,0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b, 0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12,0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a, 0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41,0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49, 0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70,0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78, 0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f,0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067, 0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e,0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256, 0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d,0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405, 0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c,0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634, 0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab,0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3, 0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a,0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92, 0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9,0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x01ce0,0xcc1, 0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8,0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0 )) def __init__(self, baudrate = 115200, timeout_ms = 50): """ Initalize parameters ------------- baudrate : int Serial baudrate[bps] timeout_ms : float Read timeout[s] """ self.__Error = 0 self.__baudrate = baudrate self.__timeout = timeout_ms self.__status = 0 self.__serial = UART(1, baudrate, tx = Pin(4), rx = Pin(5), txbuf = 500, rxbuf = 500, timeout = timeout_ms) @property def baudrate(self): return self.__baudrate @baudrate.setter def baudrate(self, baudrate): self.__baudrate = baudrate self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout) @property def timeout(self): return self.__timeout @timeout.setter def timeout(self, timeout_ms): self.__timeout = timeout_ms self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout) @property def status(self): return self.__status @micropython.viper def __crc16(self, dat : ptr8, l:int) -> int: crc:int = 0 pt = ptr16(self.__crc16_lutable) for i in range(l): crc = (crc << 8) ^ pt[((crc >> 8) ^ dat[i]) & 0xff] return crc & 0xffff @micropython.native def TxPacket(self, id : int, cmd : int, opt : int, param : bytes, echo = False) -> (bytes, bool): if ((id >= 0 and id <= 239) or id == BROADCASTING_ID) and len(param) <= (256 - 8): txp = bytes([0xfe,0xfe,id,len(param) + 8,cmd,opt]) + bytes(param) txp += W2Bs(self.__crc16(txp, len(txp))) _ = self.__serial.read(self.__serial.any()) if echo: print('TX:', txp.hex(':')) self.__serial.write(txp) return txp, True return None, False @micropython.native def __rx(self, length) -> bytes: s = self.__serial.read(length) if s: l = len(s) if l == length: return s else: r = s length -= l while self.__serial.any() > 0: s = self.__serial.read(length) r += s length -= len(s) if length == 0: break return r else: return bytes() @micropython.native def RxPacket(self, echo = False) -> (bytes, bool): self.__serial.flush() rxp = self.__rx(6) if rxp: if len(rxp) == 6: l = rxp[3] - 6 rxp += self.__rx(l) if len(rxp) == l + 6 and rxp[0] == 0xfe and rxp[1] == 0xfe and unpack('<H', rxp[-2:])[0] == self.__crc16(rxp[:-2], len(rxp[:-2])): self.__status = rxp[5] if echo: print('RX:', rxp.hex(':')) return bytes(rxp), True return None, False @micropython.native def MemWRITE(self, id : int, addr : int, data : bytes, echo = False) -> bool: if ((id >= 0 and id <= 239) or id == BROADCASTING_ID) and addr >= 0 and addr <= 0x4ff: if self.TxPacket(id, CMD_MemWRITE, 1, W2Bs(addr) + data, echo)[1]: if id != BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return (d[2] == id) and (d[4] == 0x21) else: return True return False @micropython.native def MemWRITE8(self, id : int, addr : int, data : int | list | tuple, echo = False) -> bool: return self.MemWRITE(id, addr, B2Bs(data), echo) @micropython.native def MemWRITE16(self, id : int, addr : int, data : int | list | tuple, echo = False) -> bool: return self.MemWRITE(id, addr, W2Bs(data), echo) @micropython.native def MemWRITE32(self, id : int, addr : int, data : int | list | tuple, echo = False) -> bool: return self.MemWRITE(id, addr, L2Bs(data), echo) @micropython.native def MemREAD(self, id : int, addr : int, length : int, echo = False) -> bytes: if addr >= 0 and id <= 239 and addr >= 0 and addr <= 0x4ff and length > 0 and length > 0 and length <= 247: if self.TxPacket(id, CMD_MemREAD, 0, W2Bs(addr) + B2Bs(length), echo)[1]: d, r = self.RxPacket(echo) if r: if d[4] == 0x20 and id == d[2] and (d[5] & (4 + 8 + 0x10 + 0x40)) == 0: return bytes(d[6:-2]) return None @micropython.native def MemREAD8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.MemREAD(id, addr, length, echo) if r != None: n = sum(iter_unpack('b' if signed else 'B', r), ()) return n if length > 1 else n[0] return None @micropython.native def MemREAD16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.MemREAD(id, addr, 2 * length, echo) if r != None: n = sum(iter_unpack('h' if signed else 'H', r), ()) return n if length > 1 else n[0] return None @micropython.native def MemREAD32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.MemREAD(id, addr, 4 * length, echo) if r != None: n = sum(iter_unpack('i' if signed else 'I', r), ()) return n if length > 1 else n[0] return None @micropython.native def LOAD(self, id : int, echo = False) -> bool: if self.TxPacket(id, CMD_LOAD, 0, (), echo)[1]: if id != BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return id == d[2] and d[4] == 0x22 else: return True; return False @micropython.native def SAVE(self, id : int, echo = False) -> bool: if self.TxPacket(id, CMD_SAVE, 0, (), echo)[1]: if id != BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return id == d[2] and d[4] == 0x23 else: return True; return False @micropython.native def MotorREAD(self, id : int, echo = False) -> tuple: if self.TxPacket(id, CMD_MotorREAD, 0, (), echo): if id != BROADCASTING_ID: dat, r = self.RxPacket(echo) if r: return tuple([dat[6], tuple(n[0] for n in iter_unpack('<h', dat[7:-2])) if len(dat[7:-2]) > 0 and dat[4] == 0x24 else ()]) else: return None else: return () return None @micropython.native def MotorWRITE(self, id : int, opt : int, dat : (), echo = False) -> tuple: if self.TxPacket(id, CMD_MotorWRITE, opt, W2Bs(dat), echo): if id != BROADCASTING_ID: dat, r = self.RxPacket(echo) if r: return tuple([dat[6], unpack('<'+'h'*(len(dat[7:-2])>>1), dat[7:-2]) if len(dat[7:-2]) > 0 and dat[4] == 0x25 else ()]) else: return None else: return () return None @micropython.native def SystemREAD(self, id : int, echo = False) -> tuple: if self.TxPacket(id, CMD_SystemREAD, 0, (), echo)[1]: d, r = self.RxPacket(echo) if r: if len(d[6:-2]) == 13 and d[4] == 0x3b: return unpack('<IIIB', d[6:-2]) return None @micropython.native def SystemWRITE(self, id : int, data : (), echo = False) -> bool: if data[0] >= 0 and data[0] <= 239 and data[1] >= 0 and data[1] <= 7 and data[2] >= 0 and data[2] <= 2 and data[3] >= 1 and data[3] <= 255: d = self.SystemREAD(id, echo) if d: with self.__mutex: if self.TxPacket(id, CMD_SystemWRITE, 0xf, L2Bs(d[0]) + B2Bs(data), echo)[1]: d, r = self.RxPacket(echo) if r: return id == d[2] and d[4] == 0x3c and (d[5] & (4 + 8 + 0x10 + 0x40)) == 0 return False @micropython.native def ReBoot(self, id : int, echo = False) -> bool: if id != BROADCASTING_ID: if self.TxPacket(id, CMD_ReBoot, 0, W2Bs(0), echo)[1]: d, r = self.RxPacket(echo) if r: return d[2] == id and d[4] == 0x3d return False @micropython.native def FactoryReset(self, id : int, echo = False) -> bool: if id != BROADCASTING_ID: d = self.SystemREAD(id, echo) if d != None: if self.TxPacket(id, CMD_FactoryReset, 0, L2Bs(d[0]), echo)[1]: d, r = self.RxPacket(echo) if r: return d[2] == id and d[4] == 0x3e return False @micropython.native def FullScan(self) -> tuple: orgtimeout = self.__timeout orgbaudrate = self.__baudrate #baud=(3000000,2000000,1500000,1250000,1000000,625000,115200,57600) baud=(57600,115200,625000,1000000,1250000,1500000,2000000,3000000) self.__serial.timeout = 0.001 for b in baud: for id in range(240): self.__serial.baudrate = b r = self.MemREAD8(id, 0) print(f'baud:{b:7} id:{id:3} :', end='find\n' if r else 'none\r') self.__serial.timeout = orgtimeout self.__baudrate = orgbaudrate ########################################################## # test code ########################################################## if __name__ == "__main__": import time from pyPMX import * machine.freq(250000000) # set the CPU frequency to 250 MHz try: pmx = PMXProtocol() except: pass else: ID = 0 # reboot print('ReBoot=', pmx.ReBoot(ID, echo=True)) time.sleep(0.5) ''' # factory reset print('FactoryReset=',pmx.FactoryReset(ID)) time.sleep(0.5) ''' # load print('LOAD=', pmx.LOAD(ID, echo=True)) # save print('SAVE=', pmx.SAVE(ID, echo=True)) ''' # sys write print('SysWrite=',pmx.SystemWRITE(ID, (ID, pmx.SYSW_BAUD_115_2k, pmx.SYSW_PARITY_NONE, 20))) time.sleep(0.5) ''' # sys read r = pmx.SystemREAD(ID, echo=True) print('SysRead=', f'${r[0]:08x} ${r[1]:08x} ${r[2]:08x} {r[3]:d} ' if r else 'False') # dump memory print('dump') for addr in range(0, 800, 20): r = pmx.MemREAD(ID, addr, 20) print(f' MemREAD({addr}:{20})=', r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!') # gain r = pmx.MemREAD(ID, 0, 64) print('gain\n MemREAD(0:64)=', unpack('<IIIIIIIIIIIIIIII', r) if r else '!!!!!!!!') # voltage limit r = pmx.MemREAD(ID, 76, 8) print('volt lim\n MemREAD(76:8)=', unpack('<HHHH', r) if r else '!!!!!!!!') # present value r = pmx.MemREAD(ID, 300, 24) print('present val\n MemREAD(300:20)=', unpack('<hhhhhhhhhHHH', r) if r else '!!!!!!!!') # error stat r = pmx.MemREAD(ID, 400, 6) print('err stat\n MemREAD(0:400:7)=', unpack('<BBBxH', r) if r else '!!!!!!!!') print('preparation for pos ctrl') # ctrl off print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) time.sleep(0.1) # 1:pos,2:speed,4:cur,8:torq,16:pwm,32:time print(' MemWRITE8(501)=', pmx.MemWRITE8(ID, 501, 1)) print(' MemWRITE8(502)=', pmx.MemWRITE8(ID, 502, 0b11111)) time.sleep(0.1) # goal angle print('start pos ctrl') print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_TORQUEON, ())) for ang in tuple(range(0, 32000, 500)) + tuple(range(32000, -32000, -500)) + tuple(range(-32000, 500, 500)): for n in range(10): r = pmx.MotorWRITE(ID, pmx.MOTW_OPT_NONE, (ang,)) if r != None: if len(r) > 0: print(f' MotorWRITE= {r[0]},', ''.join('{:6},'.format(k) for k in r[1]), 'stat=', pmx.status, end = '\r') pmx.MemREAD(ID, 400, 6) else: pass print(' MotorWRITE= !!!!!!!!!!!!!!!!!!!!!!!!') break time.sleep(0.005) else: continue pass break else: print() time.sleep(0.1) print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) time.sleep(0.1) # 1:pos,2:speed,4:cur,8:torq,16:pwm,32:time print(' MemWRITE8(501)=', pmx.MemWRITE8(ID, 501, 2)) time.sleep(0.1) # goal angle print('start speed ctrl') print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_TORQUEON, ())) for spd in tuple(range(0, 2000, 50)) + tuple(range(2000, -2000, -50)) + tuple(range(-2000, 50, 50)): for n in range(10): r = pmx.MotorWRITE(ID, pmx.MOTW_OPT_NONE, (spd,)) if r != None: if len(r) > 0: print(f' MotorWRITE= {r[0]},', ''.join('{:6},'.format(k) for k in r[1]), 'stat=', pmx.status, end = '\r') pmx.MemREAD(ID, 400, 6) else: pass print(' MotorWRITE= !!!!!!!!!!!!!!!!!!!!!!!!') break time.sleep(0.005) else: continue pass break else: print() time.sleep(0.1) print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) del pmx print('fin')