現: 2024-04-13 (土) 22:53:38 takaboo | |||
---|---|---|---|
Line 1: | Line 1: | ||
+ | TITLE:Tasting KO PMX | ||
+ | **PMXシリーズ [#wb9f3fea] | ||
+ | #ref(pmx_test.png) | ||
+ | 動かすだけならここだけおさえておけば良しという事で、公式サイトのリンクをブックマーク。~ | ||
+ | [[PMXシリーズ:https://kondo-robot.com/product-category/servomotor/pmx]]~ | ||
+ | [[PMXサーボシリーズオンラインマニュアル:https://www.kondokagaku.jp/wp_mn/archives/1144]] | ||
+ | ***下準備 [#ud298d0b] | ||
+ | B3Mシリーズと大差ないと思いきや、コネクタが全く違う。さらに標準のケーブルがぶっとい。[[DXHUB]]につなぎたかったので、[[コレ>https://kondo-robot.com/product/02257]]を使ってケーブルを自作。 | ||
+ | #ref(EH_DF1B.png) | ||
+ | ひとまず電源は手持ちのACアダプタを使う事にしたが、結果的に市販の物では全く容量足りず。トラブルを避けるには、ガッツリ電流取れる電池か安定化電源用意した方が良いだろう。 | ||
+ | |||
+ | 聞くところによるとファームウェアは日々更新されているらしく、必ず新しいバージョンを適用して使うべしとの事。~ | ||
+ | 確かにそのままでは後に紹介するスクリプトがエラーになる事があったので、有無も言わさず更新すべきかと。 | ||
+ | |||
+ | ***雑感 [#dce2fa5b] | ||
+ | 上澄みだけさらった感触を書いてみる。 | ||
+ | -KRS時代には無かった単純なPIDによる角度・速度・電流の制御 | ||
+ | -角度・速度・電流を組み合わせたカスケード制御が可能 | ||
+ | -分解能の高い非接触角度センサを装備し、個々に高精度なキャリブレーションがなされている | ||
+ | -一般的なUARTの通信プロトコル(8N1) | ||
+ | -シンプルな通信プロトコルで、大抵の機能へのアクセスはメモリマップ上に配置された情報の読み書きで済む | ||
+ | -B3M程の発熱は感じず | ||
+ | -メモリマップの読み書きだけでも運転可能 | ||
+ | -パケットの応答時間がどこぞやと異なり、かなり一定 | ||
+ | -フィールドでのファームウェアのパッチ当てが可能 | ||
+ | |||
+ | DXLやB3Mシリーズを使ったことがあれば運用は簡単。PICのイメージが色濃いKRSとは一線を画すので、それに慣れ切っていると難しい。~ | ||
+ | B3Mからの仕切り直し感を感じるので、種々のリクエストを吸収してもらえる事を大いに期待し、初期リリース版に触れて感じた点も書いておく。 | ||
+ | -3つの制御モード全てを組み合わせたカスケード制御は不可 | ||
+ | -ゲインにフィードフォワードが無い~ | ||
+ | 足回りなどで常時オフセットが生じる速度制御をメインで使う時には結構有用なのだが | ||
+ | -パケット上アドレスは16bit幅だがデータ長が8bit指定なため、全メモリの一括読み出しは不可~ | ||
+ | メモリマップ全てを仮想化するには分割して読むしかなく | ||
+ | -複数軸を同期させる方法の記述が無いが、いずれB3Mのマルチみたいなものが追加されるのか? | ||
+ | -エラーフラグを読み出さないといけないのが少々煩雑 | ||
+ | -そこそこ電源容量が必要なためACアダプタによる運用は注意が必要~ | ||
+ | 実際に[[秋月のACアダプタ>https://akizukidenshi.com/catalog/g/gM-06961/]]でも運用しきれなかった | ||
+ | -今後はBLDCなのかな。。。 | ||
+ | |||
+ | **テストコード [#a586a4ca] | ||
+ | Pythonばかり使っていると悪い頭が益々悪くなりそうだし鬼のように遅いが、多勢に無勢。あれやこれやと説明せずとも大抵の事はヌルッとイケるので、ここでも動作確認用に使う事にした。 | ||
+ | |||
+ | ***Python[#mdb668e2] | ||
+ | [[ここら辺>https://kondo-robot.com/faq/pmx-library-for-python]]にある事は既知だが、それをあえて見ずに基本的なコマンドを関数にしたクラスを作った(Python3.9以降で動作する事を確認)。~ | ||
+ | ホントは[[DXL用のスクリプト>DYNAMIXEL basic tutorial#k91ab321]]を作ったついでにあつらえたのだが、メモリマップ上に配置された情報を読み書きするという概念は同じなので、あっさりと実現。~ | ||
+ | なおbytesを直接的に扱うのは避けたかったので、一部を除きできるだけint型とリストやタプルで記述できるようにした。使い方は末尾のデモコードを参考にすれば大半は理解できるかと思う。~ | ||
+ | なお連続で通信する場合は2バイト以上の間隔を置けと注意事項に記されているのだが、ここではそれを一切考慮していない。将来奇跡的にホスト側の処理スピードが速くなり、前後のパケットが近接するようになったら考える事にする。 | ||
+ | |||
+ | #html{{ | ||
+ | <style type="text/css"> | ||
+ | .syntaxhighlighter { | ||
+ | overflow-y: auto !important; | ||
+ | overflow-x: auto !important; | ||
+ | max-height: 900px; | ||
+ | -webkit-text-size-adjust: 100%; | ||
+ | } | ||
+ | </style> | ||
+ | <pre class="brush: python;" title="pyPMX.py"> | ||
+ | #!/usr/bin/python3 | ||
+ | # -*- coding: utf-8 -*- | ||
+ | # | ||
+ | # pyPMX.py | ||
+ | # SPDX-License-Identifier: MIT | ||
+ | # SPDX-FileCopyrightText: (C) 2024 mukyokyo | ||
+ | |||
+ | import serial, threading, array, struct | ||
+ | from typing import Union | ||
+ | from collections import namedtuple | ||
+ | from struct import pack, unpack, iter_unpack | ||
+ | |||
+ | ########################################################## | ||
+ | # Functionalized the part of converting int to bytes. | ||
+ | # If specified as a tuple, it is converted to bytes at once. | ||
+ | ########################################################## | ||
+ | def B2Bs(d) --> bytes: | ||
+ | if isinstance(d, list) or isinstance(d, tuple): | ||
+ | return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d) | ||
+ | else: | ||
+ | return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d)) | ||
+ | |||
+ | def W2Bs(d) --> bytes: | ||
+ | if isinstance(d, list) or isinstance(d, tuple): | ||
+ | return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]]) | ||
+ | else: | ||
+ | return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d)) | ||
+ | |||
+ | def L2Bs(d) --> bytes: | ||
+ | if isinstance(d, list) or isinstance(d, tuple): | ||
+ | return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]]) | ||
+ | else: | ||
+ | return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d)) | ||
+ | |||
+ | ########################################################## | ||
+ | # API for Kondo PMX | ||
+ | ########################################################## | ||
+ | class PMXProtocol: | ||
+ | BROADCASTING_ID = 0xff | ||
+ | CMD_MemREAD = 0xa0 | ||
+ | CMD_MemWRITE = 0xa1 | ||
+ | CMD_LOAD = 0xa2 | ||
+ | CMD_SAVE = 0xa3 | ||
+ | CMD_MotorREAD = 0xa4 | ||
+ | CMD_MotorWRITE = 0xa5 | ||
+ | CMD_SystemREAD = 0xbb | ||
+ | CMD_SystemWRITE = 0xbc | ||
+ | CMD_ReBoot = 0xbd | ||
+ | CMD_FactoryReset = 0xbe | ||
+ | CMD_SystemINIT = 0xbf | ||
+ | |||
+ | (SYSW_BAUD_57600, SYSW_BAUD_115_2k, SYSW_BAUD_625k, SYSW_BAUD_1M, SYSW_BAUD_1_25M, SYSW_BAUD_1_5M, SYSW_BAUD_2M, SYSW_BAUD_3M) = range(8) | ||
+ | (SYSW_PARITY_NONE, SYSW_PARITY_ODD, SYSW_PARITY_EVEN) = range(3) | ||
+ | (MOTW_OPT_NONE, MOTW_OPT_TORQUEON, MOTW_OPT_FREE, _, MOTW_OPT_BRAKE, _, _, _, MOTW_OPT_HOLD) = range(9) | ||
+ | |||
+ | __crc16_lutable = array.array('H') | ||
+ | |||
+ | def __init__(self, port : Union[serial.Serial, str], baudrate = 57600, timeout = 0.05, lock = None): | ||
+ | if isinstance(port, serial.Serial): | ||
+ | self.__serial = port | ||
+ | self.__baudrate = port.baudrate | ||
+ | self.__timeout = port.timeout | ||
+ | else: | ||
+ | self.__serial = serial.Serial(port, baudrate = baudrate, timeout = timeout) | ||
+ | self.__baudrate = self.__serial.baudrate | ||
+ | self.__timeout = self.__serial.timeout | ||
+ | |||
+ | if lock == None: | ||
+ | self.__lock = threading.Lock() | ||
+ | else: | ||
+ | self.__lock = lock | ||
+ | self.__status = 0 | ||
+ | poly = 0x1021 | ||
+ | for i in range(256): | ||
+ | nAccum = i << 8 | ||
+ | for j in range(8): | ||
+ | nAccum = (poly ^ (nAccum << 1) if nAccum & 0x8000 else nAccum << 1) & 0xffff | ||
+ | self.__crc16_lutable.append(nAccum) | ||
+ | |||
+ | def __del__(self): | ||
+ | del self.__crc16_lutable[:] | ||
+ | |||
+ | @property | ||
+ | def lock(self): | ||
+ | return self.__lock | ||
+ | |||
+ | @property | ||
+ | def baudrate(self): | ||
+ | return self.__serial.baudrate | ||
+ | |||
+ | @baudrate.setter | ||
+ | def baudrate(self, baudrate): | ||
+ | self.__baudrate = baudrate | ||
+ | self.__serial.baudrate = baudrate | ||
+ | |||
+ | @property | ||
+ | def timeout(self): | ||
+ | return self.__serial.timeout | ||
+ | |||
+ | @timeout.setter | ||
+ | def timeout(self, timeout): | ||
+ | self.__timeout = timeout | ||
+ | self.__serial.timeout = timeout | ||
+ | |||
+ | def __reconfig(self): | ||
+ | self.__serial.baudrate = self.__baudrate | ||
+ | self.__serial.timeout = self.__timeout | ||
+ | |||
+ | @property | ||
+ | def status(self): | ||
+ | return self.__status | ||
+ | |||
+ | def __crc16(self, data : bytes) --> int: | ||
+ | crc = 0 | ||
+ | for d in data: | ||
+ | crc = (crc << 8) ^ self.__crc16_lutable[((crc ->-> 8) ^ d) & 0xff] | ||
+ | return crc & 0xffff | ||
+ | |||
+ | def TxPacket(self, id : int, cmd : int, opt : int, param : bytes, echo = False) --> (bytes, bool): | ||
+ | self.__reconfig() | ||
+ | if ((id ->= 0 and id <= 239) or id == self.BROADCASTING_ID) and len(param) <= (256 - 8): | ||
+ | txp = bytes([0xfe,0xfe,id,len(param) + 8,cmd,opt]) + bytes(param) | ||
+ | txp += W2Bs(self.__crc16(txp)) | ||
+ | self.__serial.reset_input_buffer() | ||
+ | if echo: print('TX:', txp.hex(':')) | ||
+ | self.__serial.write(txp) | ||
+ | return txp, True | ||
+ | return None, False | ||
+ | |||
+ | def __rx(self, length) --> bytes: | ||
+ | s = self.__serial.read(length) | ||
+ | l = len(s) | ||
+ | if l == length: | ||
+ | return s | ||
+ | else: | ||
+ | r = s | ||
+ | length -= l | ||
+ | while self.__serial.in_waiting -> 0: | ||
+ | s = self.__serial.read(length) | ||
+ | r += s | ||
+ | length -= len(s) | ||
+ | if length <= 0: | ||
+ | break | ||
+ | return r | ||
+ | |||
+ | def RxPacket(self, echo = False) --> (bytes, bool): | ||
+ | rxp = self.__rx(6) | ||
+ | if rxp: | ||
+ | if len(rxp) == 6: | ||
+ | if rxp[3] -> 0: | ||
+ | l = rxp[3] - 6 | ||
+ | if l -> 0: | ||
+ | rxp += self.__rx(l) | ||
+ | if len(rxp) == l + 6 and rxp[0] == 0xfe and rxp[1] == 0xfe and unpack('<H', rxp[-2:])[0] == self.__crc16(rxp[:-2]): | ||
+ | self.__status = rxp[5] | ||
+ | if echo: print('RX:', rxp.hex(':')) | ||
+ | return bytes(rxp), True | ||
+ | return None, False | ||
+ | |||
+ | def MemWRITE(self, id : int, addr : int, data : bytes, echo = False) --> bool: | ||
+ | with self.__lock: | ||
+ | if ((id ->= 0 and id <= 239) or id == self.BROADCASTING_ID) and addr ->= 0 and addr <= 0x4ff: | ||
+ | if self.TxPacket(id, self.CMD_MemWRITE, 1, W2Bs(addr) + data, echo)[1]: | ||
+ | if id != self.BROADCASTING_ID: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return (d[2] == id) and (d[4] == 0x21) | ||
+ | else: | ||
+ | return True | ||
+ | return False | ||
+ | |||
+ | def MemWRITE8(self, id : int, addr : int, data : Union[int, list, tuple], echo = False) --> bool: | ||
+ | return self.MemWRITE(id, addr, B2Bs(data), echo) | ||
+ | |||
+ | def MemWRITE16(self, id : int, addr : int, data : Union[int, list, tuple], echo = False) --> bool: | ||
+ | return self.MemWRITE(id, addr, W2Bs(data), echo) | ||
+ | |||
+ | def MemWRITE32(self, id : int, addr : int, data : Union[int, list, tuple], echo = False) --> bool: | ||
+ | return self.MemWRITE(id, addr, L2Bs(data), echo) | ||
+ | |||
+ | def MemREAD(self, id : int, addr : int, length : int, echo = False) --> bytes: | ||
+ | with self.__lock: | ||
+ | if addr ->= 0 and id <= 239 and addr ->= 0 and addr <= 0x4ff and length -> 0 and length -> 0 and length <= 247: | ||
+ | if self.TxPacket(id, self.CMD_MemREAD, 0, W2Bs(addr) + B2Bs(length), echo)[1]: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | if d[4] == 0x20 and id == d[2] and (d[5] & (4 + 8 + 0x10 + 0x40)) == 0: | ||
+ | return bytes(d[6:-2]) | ||
+ | return None | ||
+ | |||
+ | def MemREAD8(self, id : int, addr : int, length = 1, signed = False, echo = False) --> int: | ||
+ | r = self.MemREAD(id, addr, length, echo) | ||
+ | if r != None: | ||
+ | n = sum(iter_unpack('b' if signed else 'B', r), ()) | ||
+ | return n if length -> 1 else n[0] | ||
+ | return None | ||
+ | |||
+ | def MemREAD16(self, id : int, addr : int, length = 1, signed = False, echo = False) --> int: | ||
+ | r = self.MemREAD(id, addr, 2 * length, echo) | ||
+ | if r != None: | ||
+ | n = sum(iter_unpack('h' if signed else 'H', r), ()) | ||
+ | return n if length -> 1 else n[0] | ||
+ | return None | ||
+ | |||
+ | def MemREAD32(self, id : int, addr : int, length = 1, signed = False, echo = False) --> int: | ||
+ | r = self.MemREAD(id, addr, 4 * length, echo) | ||
+ | if r != None: | ||
+ | n = sum(iter_unpack('i' if signed else 'I', r), ()) | ||
+ | return n if length -> 1 else n[0] | ||
+ | return None | ||
+ | |||
+ | def LOAD(self, id : int, echo = False) --> bool: | ||
+ | with self.__lock: | ||
+ | if self.TxPacket(id, self.CMD_LOAD, 0, (), echo)[1]: | ||
+ | if id != self.BROADCASTING_ID: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return id == d[2] and d[4] == 0x22 | ||
+ | else: | ||
+ | return True; | ||
+ | return False | ||
+ | |||
+ | def SAVE(self, id : int, echo = False) --> bool: | ||
+ | with self.__lock: | ||
+ | if self.TxPacket(id, self.CMD_SAVE, 0, (), echo)[1]: | ||
+ | if id != self.BROADCASTING_ID: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return id == d[2] and d[4] == 0x23 | ||
+ | else: | ||
+ | return True; | ||
+ | return False | ||
+ | |||
+ | def MotorREAD(self, id : int, echo = False) --> tuple: | ||
+ | with self.__lock: | ||
+ | if self.TxPacket(id, self.CMD_MotorREAD, 0, (), echo): | ||
+ | if id != self.BROADCASTING_ID: | ||
+ | dat, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return tuple([dat[6], tuple(n[0] for n in iter_unpack('<h', dat[7:-2])) if len(dat[7:-2]) -> 0 and dat[4] == 0x24 else ()]) | ||
+ | else: | ||
+ | return None | ||
+ | else: | ||
+ | return () | ||
+ | return None | ||
+ | |||
+ | def MotorWRITE(self, id : int, opt : int, dat : (), echo = False) --> tuple: | ||
+ | with self.__lock: | ||
+ | if self.TxPacket(id, self.CMD_MotorWRITE, opt, W2Bs(dat), echo): | ||
+ | if id != self.BROADCASTING_ID: | ||
+ | dat, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return tuple([dat[6], tuple(n[0] for n in iter_unpack('<h', dat[7:-2])) if len(dat[7:-2]) -> 0 and dat[4] == 0x25 else ()]) | ||
+ | else: | ||
+ | return None | ||
+ | else: | ||
+ | return () | ||
+ | return None | ||
+ | |||
+ | def SystemREAD(self, id : int, echo = False) --> tuple: | ||
+ | with self.__lock: | ||
+ | if self.TxPacket(id, self.CMD_SystemREAD, 0, (), echo)[1]: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | if len(d[6:-2]) == 13 and d[4] == 0x3b: | ||
+ | v = tuple(iter_unpack('<IIIB', d[6:-2]))[0] | ||
+ | return tuple(v) | ||
+ | return None | ||
+ | |||
+ | def SystemWRITE(self, id : int, data : (), echo = False) --> bool: | ||
+ | if data[0] ->= 0 and data[0] <= 239 and data[1] ->= 0 and data[1] <= 7 and data[2] ->= 0 and data[2] <= 2 and data[3] ->= 1 and data[3] <= 255: | ||
+ | d = self.SystemREAD(id, echo) | ||
+ | if d: | ||
+ | with self.__lock: | ||
+ | if self.TxPacket(id, self.CMD_SystemWRITE, 0xf, L2Bs(d[0]) + B2Bs(data), echo)[1]: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return id == d[2] and d[4] == 0x3c and (d[5] & (4 + 8 + 0x10 + 0x20 + 0x40)) == 0 | ||
+ | return False | ||
+ | |||
+ | def ReBoot(self, id : int, echo = False) --> bool: | ||
+ | with self.__lock: | ||
+ | if id != self.BROADCASTING_ID: | ||
+ | if self.TxPacket(id, self.CMD_ReBoot, 0, W2Bs(0), echo)[1]: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return d[2] == id and d[4] == 0x3d | ||
+ | return False | ||
+ | |||
+ | def FactoryReset(self, id : int, echo = False) --> bool: | ||
+ | if id != self.BROADCASTING_ID: | ||
+ | d = self.SystemREAD(id, echo) | ||
+ | if d != None: | ||
+ | with self.__lock: | ||
+ | if self.TxPacket(id, self.CMD_FactoryReset, 0, L2Bs(d[0]), echo)[1]: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return d[2] == id and d[4] == 0x3e | ||
+ | return False | ||
+ | |||
+ | def FullScan(self) --> tuple: | ||
+ | orgtimeout = self.timeout | ||
+ | orgbaudrate = self.baudrate | ||
+ | baud=(57600,115200,625000,1000000,1250000,1500000,2000000,3000000) | ||
+ | self.timeout = 0.005 | ||
+ | for b in baud: | ||
+ | self.baudrate = b | ||
+ | for id in range(240): | ||
+ | r = self.MemREAD8(id, 0) | ||
+ | print(f'baud:{b:7} id:{id:3} :', end='find\n' if r else 'none\r') | ||
+ | self.timeout = orgtimeout | ||
+ | self.baudrate = orgbaudrate | ||
+ | |||
+ | ########################################################## | ||
+ | # test code | ||
+ | ########################################################## | ||
+ | if __name__ == "__main__": | ||
+ | import time | ||
+ | from pyPMX import * | ||
+ | |||
+ | try: | ||
+ | pmx = PMXProtocol('\\\\.\\COM12', 3000000) | ||
+ | except: | ||
+ | pass | ||
+ | else: | ||
+ | ID = 0 | ||
+ | |||
+ | # reboot | ||
+ | print('ReBoot=', pmx.ReBoot(ID, echo=True)) | ||
+ | time.sleep(0.5) | ||
+ | ''' | ||
+ | # factory reset | ||
+ | print('FactoryReset=',pmx.FactoryReset(ID)) | ||
+ | time.sleep(0.5) | ||
+ | ''' | ||
+ | # load | ||
+ | print('LOAD=', pmx.LOAD(ID, echo=True)) | ||
+ | # save | ||
+ | print('SAVE=', pmx.SAVE(ID, echo=True)) | ||
+ | ''' | ||
+ | # sys write | ||
+ | print('SysWrite=',pmx.SystemWRITE(ID, (ID, pmx.SYSW_BAUD_115_2k, pmx.SYSW_PARITY_NONE, 20))) | ||
+ | time.sleep(0.5) | ||
+ | ''' | ||
+ | # sys read | ||
+ | r = pmx.SystemREAD(ID) | ||
+ | print('SysRead=', f'${r[0]:08x} ${r[1]:08x} ${r[2]:08x} {r[3]:d} ' if r else 'False') | ||
+ | # dump memory | ||
+ | print('dump') | ||
+ | for addr in range(0, 800, 20): | ||
+ | r = pmx.MemREAD(ID, addr, 20) | ||
+ | print(f' MemREAD({addr}:{20})=', r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!') | ||
+ | # gain | ||
+ | r = pmx.MemREAD(ID, 0, 64) | ||
+ | print('gain\n MemREAD(0:64)=', *iter_unpack('<IIIIIIIIIIIIIIII', r) if r else '!!!!!!!!') | ||
+ | # voltage limit | ||
+ | r = pmx.MemREAD(ID, 76, 8) | ||
+ | print('volt lim\n MemREAD(76:8)=', *iter_unpack('<HHHH', r) if r else '!!!!!!!!') | ||
+ | # present value | ||
+ | r = pmx.MemREAD(ID, 300, 24) | ||
+ | print('present val\n MemREAD(300:20)=', *iter_unpack('<hhhhhhhhhHHH', r) if r else '!!!!!!!!') | ||
+ | # error stat | ||
+ | r = pmx.MemREAD(ID, 400, 6) | ||
+ | print('err stat\n MemREAD(0:400:7)=', *iter_unpack('<BBBxH', r) if r else '!!!!!!!!') | ||
+ | |||
+ | print('preparation for pos ctrl') | ||
+ | # ctrl off | ||
+ | |||
+ | print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) | ||
+ | time.sleep(0.1) | ||
+ | # 1:pos,2:speed,4:cur,8:torq,16:pwm,32:time | ||
+ | print(' MemWRITE8(501)=', pmx.MemWRITE8(ID, 501, 1)) | ||
+ | print(' MemWRITE8(502)=', pmx.MemWRITE8(ID, 502, 0b11111)) | ||
+ | time.sleep(0.1) | ||
+ | |||
+ | # goal angle | ||
+ | print('start pos ctrl') | ||
+ | print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_TORQUEON, ())) | ||
+ | for ang in tuple(range(0, 32000, 500)) + tuple(range(32000, -32000, -500)) + tuple(range(-32000, 500, 500)): | ||
+ | for n in range(10): | ||
+ | r = pmx.MotorWRITE(ID, pmx.MOTW_OPT_NONE, (ang,)) | ||
+ | if r != None: | ||
+ | if len(r) -> 0: | ||
+ | print(f' MotorWRITE= {r[0]},', ''.join('{:6},'.format(k) for k in r[1]), 'stat=', pmx.status, end = '\r') | ||
+ | pmx.MemREAD(ID, 400, 6) | ||
+ | else: | ||
+ | print(' MotorWRITE= !!!!!!!!!!!!!!!!!!!!!!!!') | ||
+ | break | ||
+ | time.sleep(0.005) | ||
+ | else: | ||
+ | continue | ||
+ | pass | ||
+ | break | ||
+ | else: | ||
+ | print() | ||
+ | time.sleep(0.1) | ||
+ | print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) | ||
+ | |||
+ | |||
+ | |||
+ | print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) | ||
+ | time.sleep(0.1) | ||
+ | # 1:pos,2:speed,4:cur,8:torq,16:pwm,32:time | ||
+ | print(' MemWRITE8(501)=', pmx.MemWRITE8(ID, 501, 2)) | ||
+ | time.sleep(0.1) | ||
+ | |||
+ | # goal angle | ||
+ | print('start speed ctrl') | ||
+ | print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_TORQUEON, ())) | ||
+ | for spd in tuple(range(0, 2000, 50)) + tuple(range(2000, -2000, -50)) + tuple(range(-2000, 50, 50)): | ||
+ | for n in range(30): | ||
+ | r = pmx.MotorWRITE(ID, pmx.MOTW_OPT_NONE, (spd,)) | ||
+ | if r != None: | ||
+ | if len(r) -> 0: | ||
+ | print(f' MotorWRITE= {r[0]},', ''.join('{:6},'.format(k) for k in r[1]), 'stat=', pmx.status, end = '\r') | ||
+ | pmx.MemREAD(ID, 400, 6) | ||
+ | else: | ||
+ | print(' MotorWRITE= !!!!!!!!!!!!!!!!!!!!!!!!') | ||
+ | break | ||
+ | time.sleep(0.005) | ||
+ | else: | ||
+ | continue | ||
+ | pass | ||
+ | break | ||
+ | else: | ||
+ | print() | ||
+ | time.sleep(0.1) | ||
+ | print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) | ||
+ | |||
+ | del pmx | ||
+ | |||
+ | print('fin') | ||
+ | </pre> | ||
+ | }} | ||
+ | こちらでは主に[[ここ>DYNAMIXEL basic tutorial#z879d754]]にあるWindows環境の未リリース版イメージで試しているが、基本的には[[MSYS2>https://www.msys2.org/]]をインストールしておけばその中で動かせる。もちろんLinuxやmacでも動くので、気楽に試してみてはどうだろう。 | ||
+ | |||
+ | ****全検索 [#w7b4050b] | ||
+ | IDやボーレートが分からないと何も始められないので、まずは検索ができると何かと便利。~ | ||
+ | 検索はライブラリに入れておいた機能を呼び出すだけなので、次のコードで簡単に利用できる。 | ||
+ | #html{{ | ||
+ | <pre class="brush: python;" title="scan.py"> | ||
+ | import os, sys | ||
+ | from pyPMX import PMXProtocol | ||
+ | |||
+ | dev = '\\\\.\\COM12' if len(sys.argv) == 1 else sys.argv[1] | ||
+ | if not os.path.exists(dev): | ||
+ | dev = '' | ||
+ | |||
+ | if dev != '': | ||
+ | try: | ||
+ | pmx = PMXProtocol(dev, 57600) | ||
+ | pmx.FullScan() | ||
+ | except: | ||
+ | print(' ERR:There is some problem.') | ||
+ | pass | ||
+ | sys.stdout.write('\033[2K\033[1G') | ||
+ | else: | ||
+ | print(' usage: scan [line]') | ||
+ | </pre> | ||
+ | }} | ||
+ | |||
+ | ****ID変更 [#e74d62c1] | ||
+ | ネットワーク上のPMXは各々個別のIDが割り振られている必要があり、同一のものが存在している事を許容していない。~ | ||
+ | 次のコードは任意のIDへ変更するものだが、少しでも疑義が生じたらIDの変更を行わないようにしている。 | ||
+ | #html{{ | ||
+ | <pre class="brush: python;" title="changeid.py"> | ||
+ | #!/usr/bin/python3 | ||
+ | import os, sys | ||
+ | from pyPMX import PMXProtocol | ||
+ | |||
+ | baudlist = { 57600 : 0, 115200 : 1, 625000 : 2, 1000000 : 3, 1250000 : 4, 1500000 : 5, 2000000 : 6, 3000000 : 7 } | ||
+ | |||
+ | if len(sys.argv) == 4: | ||
+ | arg = sys.argv[1:] | ||
+ | dev = '\\\\.\\COM12' | ||
+ | elif len(sys.argv) == 5: | ||
+ | arg = sys.argv[2:] | ||
+ | dev = sys.argv[1] | ||
+ | else: | ||
+ | arg = [] | ||
+ | dev = '' | ||
+ | |||
+ | if not os.path.exists(dev): | ||
+ | dev = '' | ||
+ | |||
+ | if dev != '': | ||
+ | try: | ||
+ | baud = int(arg[0]) | ||
+ | baudind = baudlist[baud] | ||
+ | previd = int(arg[1]) | ||
+ | newid = int(arg[2]) | ||
+ | pmx = PMXProtocol(dev, baud) | ||
+ | except: | ||
+ | print(' ERR:There is some problem.') | ||
+ | else: | ||
+ | if newid >= 0 and newid <= 239: | ||
+ | if pmx.MemREAD8(newid, 0) == None: | ||
+ | s = (newid, baudind, pmx.SYSW_PARITY_NONE, 20) | ||
+ | if pmx.MotorWRITE(previd, pmx.MOTW_OPT_FREE, ()) != None: | ||
+ | if pmx.SystemWRITE(previd, s): | ||
+ | print(' OK') | ||
+ | else: | ||
+ | print(' NG') | ||
+ | else: | ||
+ | print(f' ERR: Device with ID:{previd} not found with BAUDRATE:{baud}') | ||
+ | else: | ||
+ | print(' ERR:Competing.') | ||
+ | else: | ||
+ | print(' ERR:There is some problem.') | ||
+ | else: | ||
+ | print(' usage: changeid [line] <baudrate> <prev id> <new id>') | ||
+ | </pre> | ||
+ | }} | ||
+ | |||
+ | ****ボーレート変更 [#r468c92e] | ||
+ | 次のコードは任意のボーレートへ変更するものだが、少しでも疑義が生じたらボーレートの変更を行わないようにしている。 | ||
+ | #html{{ | ||
+ | <pre class="brush: python;" title="changebaud.py"> | ||
+ | #!/usr/bin/python3 | ||
+ | import os, sys | ||
+ | from pyPMX import PMXProtocol | ||
+ | |||
+ | baudlist = { 57600 : 0, 115200 : 1, 625000 : 2, 1000000 : 3, 1250000 : 4, 1500000 : 5, 2000000 : 6, 3000000 : 7 } | ||
+ | |||
+ | if len(sys.argv) == 4: | ||
+ | arg = sys.argv[1:] | ||
+ | dev = '\\\\.\\COM12' | ||
+ | elif len(sys.argv) == 5: | ||
+ | arg = sys.argv[2:] | ||
+ | dev = sys.argv[1] | ||
+ | else: | ||
+ | arg = [] | ||
+ | dev = '' | ||
+ | |||
+ | if not os.path.exists(dev): | ||
+ | dev = '' | ||
+ | |||
+ | if dev != '': | ||
+ | try: | ||
+ | prevbaud = int(arg[0]) | ||
+ | prevind = baudlist[prevbaud] | ||
+ | id = int(arg[1]) | ||
+ | newbaud = int(arg[2]) | ||
+ | newind = baudlist[newbaud] | ||
+ | pmx = PMXProtocol(dev, newbaud) | ||
+ | except: | ||
+ | print(' ERR: There is some problem.') | ||
+ | else: | ||
+ | if pmx.MemREAD8(id, 0) == None: | ||
+ | pmx.baudrate = prevbaud | ||
+ | if pmx.MotorWRITE(id, pmx.MOTW_OPT_FREE, ()) != None: | ||
+ | s = (id, newind, pmx.SYSW_PARITY_NONE, 20) | ||
+ | if pmx.SystemWRITE(id, s): | ||
+ | print(' OK') | ||
+ | else: | ||
+ | print(' NG') | ||
+ | else: | ||
+ | print(f' ERR: Device with ID:{id} not found with BAUDRATE:{prevbaud}') | ||
+ | else: | ||
+ | print(' ERR: Competing.') | ||
+ | else: | ||
+ | print(' usage: changebaud [line] <baudrate> <id> <new baudrate>') | ||
+ | </pre> | ||
+ | }} | ||
+ | |||
+ | ***MicroPython [#u62caf90] | ||
+ | [[PicoSHIELD]]+[[Raspberry Pi Pico>https://micropython.org/download/RPI_PICO/]]を対象に[[Python>#mdb668e2]]のコードを作ってみた。 | ||
+ | #html{{ | ||
+ | <pre class="brush: python;" title="pyPMX.py"> | ||
+ | # pyPMX.py for MicroPython(RP2040) | ||
+ | # SPDX-License-Identifier: MIT | ||
+ | # SPDX-FileCopyrightText: (C) 2024 mukyokyo | ||
+ | |||
+ | from machine import UART, Pin | ||
+ | import time, array | ||
+ | from collections import namedtuple | ||
+ | from struct import pack, unpack | ||
+ | |||
+ | ########################################################## | ||
+ | # Functionalized the part of converting int to bytes. | ||
+ | # If specified as a tuple, it is converted to bytes at once. | ||
+ | ########################################################## | ||
+ | @micropython.native | ||
+ | def B2Bs(d) -> bytes: | ||
+ | if isinstance(d, list) or isinstance(d, tuple): | ||
+ | return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d) | ||
+ | else: | ||
+ | return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d)) | ||
+ | |||
+ | @micropython.native | ||
+ | def W2Bs(d) -> bytes: | ||
+ | if isinstance(d, list) or isinstance(d, tuple): | ||
+ | return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]]) | ||
+ | else: | ||
+ | return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d)) | ||
+ | |||
+ | @micropython.native | ||
+ | def L2Bs(d) -> bytes: | ||
+ | if isinstance(d, list) or isinstance(d, tuple): | ||
+ | return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]]) | ||
+ | else: | ||
+ | return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d)) | ||
+ | |||
+ | ########################################################## | ||
+ | # API for Kondo PMX | ||
+ | ########################################################## | ||
+ | class PMXProtocol: | ||
+ | BROADCASTING_ID = const(0xff) | ||
+ | CMD_MemREAD = const(0xa0) | ||
+ | CMD_MemWRITE = const(0xa1) | ||
+ | CMD_LOAD = const(0xa2) | ||
+ | CMD_SAVE = const(0xa3) | ||
+ | CMD_MotorREAD = const(0xa4) | ||
+ | CMD_MotorWRITE = const(0xa5) | ||
+ | CMD_SystemREAD = const(0xbb) | ||
+ | CMD_SystemWRITE = const(0xbc) | ||
+ | CMD_ReBoot = const(0xbd) | ||
+ | CMD_FactoryReset = const(0xbe) | ||
+ | CMD_SystemINIT = const(0xbf) | ||
+ | |||
+ | (SYSW_BAUD_57600, SYSW_BAUD_115_2k, SYSW_BAUD_625k, SYSW_BAUD_1M, SYSW_BAUD_1_25M, SYSW_BAUD_1_5M, SYSW_BAUD_2M, SYSW_BAUD_3M) = range(8) | ||
+ | (SYSW_PARITY_NONE, SYSW_PARITY_ODD, SYSW_PARITY_EVEN) = range(3) | ||
+ | (MOTW_OPT_NONE, MOTW_OPT_TORQUEON, MOTW_OPT_FREE, _, MOTW_OPT_BRAKE, _, _, _, MOTW_OPT_HOLD) = range(9) | ||
+ | |||
+ | __crc16_lutable = array.array('H',( | ||
+ | 0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef, | ||
+ | 0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de, | ||
+ | 0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485,0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d, | ||
+ | 0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4,0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc, | ||
+ | 0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823,0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b, | ||
+ | 0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12,0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a, | ||
+ | 0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41,0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49, | ||
+ | 0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70,0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78, | ||
+ | 0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f,0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067, | ||
+ | 0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e,0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256, | ||
+ | 0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d,0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405, | ||
+ | 0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c,0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634, | ||
+ | 0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab,0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3, | ||
+ | 0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a,0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92, | ||
+ | 0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9,0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x01ce0,0xcc1, | ||
+ | 0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8,0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0 | ||
+ | )) | ||
+ | |||
+ | def __init__(self, baudrate = 115200, timeout_ms = 50): | ||
+ | """ | ||
+ | Initalize | ||
+ | |||
+ | parameters | ||
+ | ------------- | ||
+ | baudrate : int | ||
+ | Serial baudrate[bps] | ||
+ | timeout_ms : float | ||
+ | Read timeout[s] | ||
+ | """ | ||
+ | self.__Error = 0 | ||
+ | self.__baudrate = baudrate | ||
+ | self.__timeout = timeout_ms | ||
+ | self.__status = 0 | ||
+ | self.__serial = UART(1, baudrate, tx = Pin(4), rx = Pin(5), txbuf = 500, rxbuf = 500, timeout = timeout_ms) | ||
+ | |||
+ | @property | ||
+ | def baudrate(self): | ||
+ | return self.__baudrate | ||
+ | |||
+ | @baudrate.setter | ||
+ | def baudrate(self, baudrate): | ||
+ | self.__baudrate = baudrate | ||
+ | self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout) | ||
+ | |||
+ | @property | ||
+ | def timeout(self): | ||
+ | return self.__timeout | ||
+ | |||
+ | @timeout.setter | ||
+ | def timeout(self, timeout_ms): | ||
+ | self.__timeout = timeout_ms | ||
+ | self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout) | ||
+ | |||
+ | @property | ||
+ | def status(self): | ||
+ | return self.__status | ||
+ | |||
+ | @micropython.viper | ||
+ | def __crc16(self, dat : ptr8, l:int) -> int: | ||
+ | crc:int = 0 | ||
+ | pt = ptr16(self.__crc16_lutable) | ||
+ | for i in range(l): | ||
+ | crc = (crc << 8) ^ pt[((crc >> 8) ^ dat[i]) & 0xff] | ||
+ | return crc & 0xffff | ||
+ | |||
+ | @micropython.native | ||
+ | def TxPacket(self, id : int, cmd : int, opt : int, param : bytes, echo = False) -> (bytes, bool): | ||
+ | if ((id >= 0 and id <= 239) or id == BROADCASTING_ID) and len(param) <= (256 - 8): | ||
+ | txp = bytes([0xfe,0xfe,id,len(param) + 8,cmd,opt]) + bytes(param) | ||
+ | txp += W2Bs(self.__crc16(txp, len(txp))) | ||
+ | _ = self.__serial.read(self.__serial.any()) | ||
+ | if echo: print('TX:', txp.hex(':')) | ||
+ | self.__serial.write(txp) | ||
+ | return txp, True | ||
+ | return None, False | ||
+ | |||
+ | @micropython.native | ||
+ | def __rx(self, length) -> bytes: | ||
+ | s = self.__serial.read(length) | ||
+ | if s: | ||
+ | l = len(s) | ||
+ | if l == length: | ||
+ | return s | ||
+ | else: | ||
+ | r = s | ||
+ | length -= l | ||
+ | while self.__serial.any() > 0: | ||
+ | s = self.__serial.read(length) | ||
+ | r += s | ||
+ | length -= len(s) | ||
+ | if length == 0: | ||
+ | break | ||
+ | return r | ||
+ | else: | ||
+ | return bytes() | ||
+ | |||
+ | @micropython.native | ||
+ | def RxPacket(self, echo = False) -> (bytes, bool): | ||
+ | self.__serial.flush() | ||
+ | rxp = self.__rx(6) | ||
+ | if rxp: | ||
+ | if len(rxp) == 6: | ||
+ | l = rxp[3] - 6 | ||
+ | rxp += self.__rx(l) | ||
+ | if len(rxp) == l + 6 and rxp[0] == 0xfe and rxp[1] == 0xfe and unpack('<H', rxp[-2:])[0] == self.__crc16(rxp[:-2], len(rxp[:-2])): | ||
+ | self.__status = rxp[5] | ||
+ | if echo: print('RX:', rxp.hex(':')) | ||
+ | return bytes(rxp), True | ||
+ | return None, False | ||
+ | |||
+ | @micropython.native | ||
+ | def MemWRITE(self, id : int, addr : int, data : bytes, echo = False) -> bool: | ||
+ | if ((id >= 0 and id <= 239) or id == BROADCASTING_ID) and addr >= 0 and addr <= 0x4ff: | ||
+ | if self.TxPacket(id, CMD_MemWRITE, 1, W2Bs(addr) + data, echo)[1]: | ||
+ | if id != BROADCASTING_ID: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return (d[2] == id) and (d[4] == 0x21) | ||
+ | else: | ||
+ | return True | ||
+ | return False | ||
+ | |||
+ | @micropython.native | ||
+ | def MemWRITE8(self, id : int, addr : int, data : int | list | tuple, echo = False) -> bool: | ||
+ | return self.MemWRITE(id, addr, B2Bs(data), echo) | ||
+ | |||
+ | @micropython.native | ||
+ | def MemWRITE16(self, id : int, addr : int, data : int | list | tuple, echo = False) -> bool: | ||
+ | return self.MemWRITE(id, addr, W2Bs(data), echo) | ||
+ | |||
+ | @micropython.native | ||
+ | def MemWRITE32(self, id : int, addr : int, data : int | list | tuple, echo = False) -> bool: | ||
+ | return self.MemWRITE(id, addr, L2Bs(data), echo) | ||
+ | |||
+ | @micropython.native | ||
+ | def MemREAD(self, id : int, addr : int, length : int, echo = False) -> bytes: | ||
+ | if addr >= 0 and id <= 239 and addr >= 0 and addr <= 0x4ff and length > 0 and length > 0 and length <= 247: | ||
+ | if self.TxPacket(id, CMD_MemREAD, 0, W2Bs(addr) + B2Bs(length), echo)[1]: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | if d[4] == 0x20 and id == d[2] and (d[5] & (4 + 8 + 0x10 + 0x40)) == 0: | ||
+ | return bytes(d[6:-2]) | ||
+ | return None | ||
+ | |||
+ | @micropython.native | ||
+ | def MemREAD8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: | ||
+ | r = self.MemREAD(id, addr, length, echo) | ||
+ | if r != None: | ||
+ | n = sum(iter_unpack('b' if signed else 'B', r), ()) | ||
+ | return n if length > 1 else n[0] | ||
+ | return None | ||
+ | |||
+ | @micropython.native | ||
+ | def MemREAD16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: | ||
+ | r = self.MemREAD(id, addr, 2 * length, echo) | ||
+ | if r != None: | ||
+ | n = sum(iter_unpack('h' if signed else 'H', r), ()) | ||
+ | return n if length > 1 else n[0] | ||
+ | return None | ||
+ | |||
+ | @micropython.native | ||
+ | def MemREAD32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: | ||
+ | r = self.MemREAD(id, addr, 4 * length, echo) | ||
+ | if r != None: | ||
+ | n = sum(iter_unpack('i' if signed else 'I', r), ()) | ||
+ | return n if length > 1 else n[0] | ||
+ | return None | ||
+ | |||
+ | @micropython.native | ||
+ | def LOAD(self, id : int, echo = False) -> bool: | ||
+ | if self.TxPacket(id, CMD_LOAD, 0, (), echo)[1]: | ||
+ | if id != BROADCASTING_ID: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return id == d[2] and d[4] == 0x22 | ||
+ | else: | ||
+ | return True; | ||
+ | return False | ||
+ | |||
+ | @micropython.native | ||
+ | def SAVE(self, id : int, echo = False) -> bool: | ||
+ | if self.TxPacket(id, CMD_SAVE, 0, (), echo)[1]: | ||
+ | if id != BROADCASTING_ID: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return id == d[2] and d[4] == 0x23 | ||
+ | else: | ||
+ | return True; | ||
+ | return False | ||
+ | |||
+ | @micropython.native | ||
+ | def MotorREAD(self, id : int, echo = False) -> tuple: | ||
+ | if self.TxPacket(id, CMD_MotorREAD, 0, (), echo): | ||
+ | if id != BROADCASTING_ID: | ||
+ | dat, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return tuple([dat[6], tuple(n[0] for n in iter_unpack('<h', dat[7:-2])) if len(dat[7:-2]) > 0 and dat[4] == 0x24 else ()]) | ||
+ | else: | ||
+ | return None | ||
+ | else: | ||
+ | return () | ||
+ | return None | ||
+ | |||
+ | @micropython.native | ||
+ | def MotorWRITE(self, id : int, opt : int, dat : (), echo = False) -> tuple: | ||
+ | if self.TxPacket(id, CMD_MotorWRITE, opt, W2Bs(dat), echo): | ||
+ | if id != BROADCASTING_ID: | ||
+ | dat, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return tuple([dat[6], unpack('<'+'h'*(len(dat[7:-2])>>1), dat[7:-2]) if len(dat[7:-2]) > 0 and dat[4] == 0x25 else ()]) | ||
+ | else: | ||
+ | return None | ||
+ | else: | ||
+ | return () | ||
+ | return None | ||
+ | |||
+ | @micropython.native | ||
+ | def SystemREAD(self, id : int, echo = False) -> tuple: | ||
+ | if self.TxPacket(id, CMD_SystemREAD, 0, (), echo)[1]: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | if len(d[6:-2]) == 13 and d[4] == 0x3b: | ||
+ | return unpack('<IIIB', d[6:-2]) | ||
+ | return None | ||
+ | |||
+ | @micropython.native | ||
+ | def SystemWRITE(self, id : int, data : (), echo = False) -> bool: | ||
+ | if data[0] >= 0 and data[0] <= 239 and data[1] >= 0 and data[1] <= 7 and data[2] >= 0 and data[2] <= 2 and data[3] >= 1 and data[3] <= 255: | ||
+ | d = self.SystemREAD(id, echo) | ||
+ | if d: | ||
+ | with self.__mutex: | ||
+ | if self.TxPacket(id, CMD_SystemWRITE, 0xf, L2Bs(d[0]) + B2Bs(data), echo)[1]: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return id == d[2] and d[4] == 0x3c and (d[5] & (4 + 8 + 0x10 + 0x40)) == 0 | ||
+ | return False | ||
+ | |||
+ | @micropython.native | ||
+ | def ReBoot(self, id : int, echo = False) -> bool: | ||
+ | if id != BROADCASTING_ID: | ||
+ | if self.TxPacket(id, CMD_ReBoot, 0, W2Bs(0), echo)[1]: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return d[2] == id and d[4] == 0x3d | ||
+ | return False | ||
+ | |||
+ | @micropython.native | ||
+ | def FactoryReset(self, id : int, echo = False) -> bool: | ||
+ | if id != BROADCASTING_ID: | ||
+ | d = self.SystemREAD(id, echo) | ||
+ | if d != None: | ||
+ | if self.TxPacket(id, CMD_FactoryReset, 0, L2Bs(d[0]), echo)[1]: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return d[2] == id and d[4] == 0x3e | ||
+ | return False | ||
+ | |||
+ | @micropython.native | ||
+ | def FullScan(self) -> tuple: | ||
+ | orgtimeout = self.__timeout | ||
+ | orgbaudrate = self.__baudrate | ||
+ | #baud=(3000000,2000000,1500000,1250000,1000000,625000,115200,57600) | ||
+ | baud=(57600,115200,625000,1000000,1250000,1500000,2000000,3000000) | ||
+ | self.__serial.timeout = 0.001 | ||
+ | for b in baud: | ||
+ | for id in range(240): | ||
+ | self.__serial.baudrate = b | ||
+ | r = self.MemREAD8(id, 0) | ||
+ | print(f'baud:{b:7} id:{id:3} :', end='find\n' if r else 'none\r') | ||
+ | self.__serial.timeout = orgtimeout | ||
+ | self.__baudrate = orgbaudrate | ||
+ | |||
+ | ########################################################## | ||
+ | # test code | ||
+ | ########################################################## | ||
+ | if __name__ == "__main__": | ||
+ | import time | ||
+ | from pyPMX import * | ||
+ | |||
+ | machine.freq(250000000) # set the CPU frequency to 250 MHz | ||
+ | |||
+ | try: | ||
+ | pmx = PMXProtocol() | ||
+ | except: | ||
+ | pass | ||
+ | else: | ||
+ | ID = 0 | ||
+ | |||
+ | # reboot | ||
+ | print('ReBoot=', pmx.ReBoot(ID, echo=True)) | ||
+ | time.sleep(0.5) | ||
+ | ''' | ||
+ | # factory reset | ||
+ | print('FactoryReset=',pmx.FactoryReset(ID)) | ||
+ | time.sleep(0.5) | ||
+ | ''' | ||
+ | # load | ||
+ | print('LOAD=', pmx.LOAD(ID, echo=True)) | ||
+ | # save | ||
+ | print('SAVE=', pmx.SAVE(ID, echo=True)) | ||
+ | ''' | ||
+ | # sys write | ||
+ | print('SysWrite=',pmx.SystemWRITE(ID, (ID, pmx.SYSW_BAUD_115_2k, pmx.SYSW_PARITY_NONE, 20))) | ||
+ | time.sleep(0.5) | ||
+ | ''' | ||
+ | # sys read | ||
+ | r = pmx.SystemREAD(ID, echo=True) | ||
+ | print('SysRead=', f'${r[0]:08x} ${r[1]:08x} ${r[2]:08x} {r[3]:d} ' if r else 'False') | ||
+ | # dump memory | ||
+ | print('dump') | ||
+ | for addr in range(0, 800, 20): | ||
+ | r = pmx.MemREAD(ID, addr, 20) | ||
+ | print(f' MemREAD({addr}:{20})=', r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!') | ||
+ | # gain | ||
+ | r = pmx.MemREAD(ID, 0, 64) | ||
+ | print('gain\n MemREAD(0:64)=', unpack('<IIIIIIIIIIIIIIII', r) if r else '!!!!!!!!') | ||
+ | # voltage limit | ||
+ | r = pmx.MemREAD(ID, 76, 8) | ||
+ | print('volt lim\n MemREAD(76:8)=', unpack('<HHHH', r) if r else '!!!!!!!!') | ||
+ | # present value | ||
+ | r = pmx.MemREAD(ID, 300, 24) | ||
+ | print('present val\n MemREAD(300:20)=', unpack('<hhhhhhhhhHHH', r) if r else '!!!!!!!!') | ||
+ | # error stat | ||
+ | r = pmx.MemREAD(ID, 400, 6) | ||
+ | print('err stat\n MemREAD(0:400:7)=', unpack('<BBBxH', r) if r else '!!!!!!!!') | ||
+ | |||
+ | print('preparation for pos ctrl') | ||
+ | # ctrl off | ||
+ | |||
+ | print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) | ||
+ | time.sleep(0.1) | ||
+ | # 1:pos,2:speed,4:cur,8:torq,16:pwm,32:time | ||
+ | print(' MemWRITE8(501)=', pmx.MemWRITE8(ID, 501, 1)) | ||
+ | print(' MemWRITE8(502)=', pmx.MemWRITE8(ID, 502, 0b11111)) | ||
+ | time.sleep(0.1) | ||
+ | |||
+ | # goal angle | ||
+ | print('start pos ctrl') | ||
+ | print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_TORQUEON, ())) | ||
+ | for ang in tuple(range(0, 32000, 500)) + tuple(range(32000, -32000, -500)) + tuple(range(-32000, 500, 500)): | ||
+ | for n in range(10): | ||
+ | r = pmx.MotorWRITE(ID, pmx.MOTW_OPT_NONE, (ang,)) | ||
+ | if r != None: | ||
+ | if len(r) > 0: | ||
+ | print(f' MotorWRITE= {r[0]},', ''.join('{:6},'.format(k) for k in r[1]), 'stat=', pmx.status, end = '\r') | ||
+ | pmx.MemREAD(ID, 400, 6) | ||
+ | else: | ||
+ | pass | ||
+ | print(' MotorWRITE= !!!!!!!!!!!!!!!!!!!!!!!!') | ||
+ | break | ||
+ | time.sleep(0.005) | ||
+ | else: | ||
+ | continue | ||
+ | pass | ||
+ | break | ||
+ | else: | ||
+ | print() | ||
+ | time.sleep(0.1) | ||
+ | print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) | ||
+ | |||
+ | |||
+ | |||
+ | print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) | ||
+ | time.sleep(0.1) | ||
+ | # 1:pos,2:speed,4:cur,8:torq,16:pwm,32:time | ||
+ | print(' MemWRITE8(501)=', pmx.MemWRITE8(ID, 501, 2)) | ||
+ | time.sleep(0.1) | ||
+ | |||
+ | # goal angle | ||
+ | print('start speed ctrl') | ||
+ | print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_TORQUEON, ())) | ||
+ | for spd in tuple(range(0, 2000, 50)) + tuple(range(2000, -2000, -50)) + tuple(range(-2000, 50, 50)): | ||
+ | for n in range(10): | ||
+ | r = pmx.MotorWRITE(ID, pmx.MOTW_OPT_NONE, (spd,)) | ||
+ | if r != None: | ||
+ | if len(r) > 0: | ||
+ | print(f' MotorWRITE= {r[0]},', ''.join('{:6},'.format(k) for k in r[1]), 'stat=', pmx.status, end = '\r') | ||
+ | pmx.MemREAD(ID, 400, 6) | ||
+ | else: | ||
+ | pass | ||
+ | print(' MotorWRITE= !!!!!!!!!!!!!!!!!!!!!!!!') | ||
+ | break | ||
+ | time.sleep(0.005) | ||
+ | else: | ||
+ | continue | ||
+ | pass | ||
+ | break | ||
+ | else: | ||
+ | print() | ||
+ | time.sleep(0.1) | ||
+ | print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) | ||
+ | |||
+ | del pmx | ||
+ | |||
+ | print('fin') | ||
+ | </pre> | ||
+ | }} |