|
一覧
検索
最新
ページへ戻る
履歴
KONDO PMX
のソース
knowledge
:
KONDO PMX
のソース
差分
を表示
KONDO PMX
へ行く。
TITLE:Tasting KO PMX **PMXシリーズ [#wb9f3fea] #ref(pmx_test.png) 動かすだけならここだけおさえておけば良しという事で、公式サイトのリンクをブックマーク。~ [[PMXシリーズ:https://kondo-robot.com/product-category/servomotor/pmx]]~ [[PMXサーボシリーズオンラインマニュアル:https://www.kondokagaku.jp/wp_mn/archives/1144]] ***下準備 [#ud298d0b] B3Mシリーズと大差ないと思いきや、コネクタが全く違う。さらに標準のケーブルがぶっとい。[[DXHUB]]につなぎたかったので、[[コレ>https://kondo-robot.com/product/02257]]を使ってケーブルを自作。 #ref(EH_DF1B.png) ひとまず電源は手持ちのACアダプタを使う事にしたが、結果的に市販の物では全く容量足りず。トラブルを避けるには、ガッツリ電流取れる電池か安定化電源用意した方が良いだろう。 聞くところによるとファームウェアは日々更新されているらしく、必ず新しいバージョンを適用して使うべしとの事。~ 確かにそのままでは後に紹介するスクリプトがエラーになる事があったので、有無も言わさず更新すべきかと。 ***雑感 [#dce2fa5b] 上澄みだけさらった感触を書いてみる。 -KRS時代には無かった単純なPIDによる角度・速度・電流の制御 -角度・速度・電流を組み合わせたカスケード制御が可能 -分解能の高い非接触角度センサを装備し、個々に高精度なキャリブレーションがなされている -一般的なUARTの通信プロトコル(8N1) -シンプルな通信プロトコルで、大抵の機能へのアクセスはメモリマップ上に配置された情報の読み書きで済む -B3M程の発熱は感じず -メモリマップの読み書きだけでも運転可能 -パケットの応答時間がどこぞやと異なり、かなり一定 -フィールドでのファームウェアのパッチ当てが可能 DXLやB3Mシリーズを使ったことがあれば運用は簡単。PICのイメージが色濃いKRSとは一線を画すので、それに慣れ切っていると難しい。~ B3Mからの仕切り直し感を感じるので、種々のリクエストを吸収してもらえる事を大いに期待し、初期リリース版に触れて感じた点も書いておく。 -3つの制御モード全てを組み合わせたカスケード制御は不可 -ゲインにフィードフォワードが無い~ 足回りなどで常時オフセットが生じる速度制御をメインで使う時には結構有用なのだが -パケット上アドレスは16bit幅だがデータ長が8bit指定なため、全メモリの一括読み出しは不可~ メモリマップ全てを仮想化するには分割して読むしかなく -複数軸を同期させる方法の記述が無いが、いずれB3Mのマルチみたいなものが追加されるのか? -エラーフラグを読み出さないといけないのが少々煩雑 -そこそこ電源容量が必要なためACアダプタによる運用は注意が必要~ 実際に[[秋月のACアダプタ>https://akizukidenshi.com/catalog/g/gM-06961/]]でも運用しきれなかった -今後はBLDCなのかな。。。 **テストコード [#a586a4ca] Pythonばかり使っていると悪い頭が益々悪くなりそうだし鬼のように遅いが、多勢に無勢。あれやこれやと説明せずとも大抵の事はヌルッとイケるので、ここでも動作確認用に使う事にした。 ***Python[#mdb668e2] [[ここら辺>https://kondo-robot.com/faq/pmx-library-for-python]]にある事は既知だが、それをあえて見ずに基本的なコマンドを関数にしたクラスを作った(Python3.9以降で動作する事を確認)。~ ホントは[[DXL用のスクリプト>DYNAMIXEL basic tutorial#k91ab321]]を作ったついでにあつらえたのだが、メモリマップ上に配置された情報を読み書きするという概念は同じなので、あっさりと実現。~ なおbytesを直接的に扱うのは避けたかったので、一部を除きできるだけint型とリストやタプルで記述できるようにした。使い方は末尾のデモコードを参考にすれば大半は理解できるかと思う。~ なお連続で通信する場合は2バイト以上の間隔を置けと注意事項に記されているのだが、ここではそれを一切考慮していない。将来奇跡的にホスト側の処理スピードが速くなり、前後のパケットが近接するようになったら考える事にする。 [[ソースへの直リンク>https://github.com/mukyokyo/pyPMX/raw/main/pyPMX.py]] #html{{ <style type="text/css"> .syntaxhighlighter { overflow-y: auto !important; overflow-x: auto !important; max-height: 900px; -webkit-text-size-adjust: 100%; } </style> <pre class="brush: python;" title="pyPMX.py"> #!/usr/bin/python3 # -*- coding: utf-8 -*- # # pyPMX.py # SPDX-License-Identifier: MIT # SPDX-FileCopyrightText: (C) 2024 mukyokyo import serial, threading, array, struct from typing import Union from collections import namedtuple from struct import pack, unpack, iter_unpack ########################################################## # Functionalized the part of converting int to bytes. # If specified as a tuple, it is converted to bytes at once. ########################################################## def B2Bs(d) --> bytes: if isinstance(d, list) or isinstance(d, tuple): return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d) else: return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d)) def W2Bs(d) --> bytes: if isinstance(d, list) or isinstance(d, tuple): return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]]) else: return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d)) def L2Bs(d) --> bytes: if isinstance(d, list) or isinstance(d, tuple): return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]]) else: return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d)) ########################################################## # API for Kondo PMX ########################################################## class PMXProtocol: BROADCASTING_ID = 0xff CMD_MemREAD = 0xa0 CMD_MemWRITE = 0xa1 CMD_LOAD = 0xa2 CMD_SAVE = 0xa3 CMD_MotorREAD = 0xa4 CMD_MotorWRITE = 0xa5 CMD_SystemREAD = 0xbb CMD_SystemWRITE = 0xbc CMD_ReBoot = 0xbd CMD_FactoryReset = 0xbe CMD_SystemINIT = 0xbf (SYSW_BAUD_57600, SYSW_BAUD_115_2k, SYSW_BAUD_625k, SYSW_BAUD_1M, SYSW_BAUD_1_25M, SYSW_BAUD_1_5M, SYSW_BAUD_2M, SYSW_BAUD_3M) = range(8) (SYSW_PARITY_NONE, SYSW_PARITY_ODD, SYSW_PARITY_EVEN) = range(3) (MOTW_OPT_NONE, MOTW_OPT_TORQUEON, MOTW_OPT_FREE, _, MOTW_OPT_BRAKE, _, _, _, MOTW_OPT_HOLD) = range(9) __crc16_lutable = array.array('H') def __init__(self, port : Union[serial.Serial, str], baudrate = 57600, timeout = 0.05, lock = None): if isinstance(port, serial.Serial): self.__serial = port self.__baudrate = port.baudrate self.__timeout = port.timeout else: self.__serial = serial.Serial(port, baudrate = baudrate, timeout = timeout) self.__baudrate = self.__serial.baudrate self.__timeout = self.__serial.timeout if lock == None: self.__lock = threading.Lock() else: self.__lock = lock self.__status = 0 poly = 0x1021 for i in range(256): nAccum = i << 8 for j in range(8): nAccum = (poly ^ (nAccum << 1) if nAccum & 0x8000 else nAccum << 1) & 0xffff self.__crc16_lutable.append(nAccum) def __del__(self): del self.__crc16_lutable[:] @property def lock(self): return self.__lock @property def baudrate(self): return self.__serial.baudrate @baudrate.setter def baudrate(self, baudrate): self.__baudrate = baudrate self.__serial.baudrate = baudrate @property def timeout(self): return self.__serial.timeout @timeout.setter def timeout(self, timeout): self.__timeout = timeout self.__serial.timeout = timeout def __reconfig(self): self.__serial.baudrate = self.__baudrate self.__serial.timeout = self.__timeout @property def status(self): return self.__status def __crc16(self, data : bytes) --> int: crc = 0 for d in data: crc = (crc << 8) ^ self.__crc16_lutable[((crc ->-> 8) ^ d) & 0xff] return crc & 0xffff def TxPacket(self, id : int, cmd : int, opt : int, param : bytes, echo = False) --> (bytes, bool): self.__reconfig() if ((id ->= 0 and id <= 239) or id == self.BROADCASTING_ID) and len(param) <= (256 - 8): txp = bytes([0xfe,0xfe,id,len(param) + 8,cmd,opt]) + bytes(param) txp += W2Bs(self.__crc16(txp)) self.__serial.reset_input_buffer() if echo: print('TX:', txp.hex(':')) self.__serial.write(txp) return txp, True return None, False def __rx(self, length) --> bytes: s = self.__serial.read(length) l = len(s) if l == length: return s else: r = s length -= l while self.__serial.in_waiting -> 0: s = self.__serial.read(length) r += s length -= len(s) if length <= 0: break return r def RxPacket(self, echo = False) --> (bytes, bool): rxp = self.__rx(6) if rxp: if len(rxp) == 6: if rxp[3] -> 0: l = rxp[3] - 6 if l -> 0: rxp += self.__rx(l) if len(rxp) == l + 6 and rxp[0] == 0xfe and rxp[1] == 0xfe and unpack('<H', rxp[-2:])[0] == self.__crc16(rxp[:-2]): self.__status = rxp[5] if echo: print('RX:', rxp.hex(':')) return bytes(rxp), True return None, False def MemWRITE(self, id : int, addr : int, data : bytes, echo = False) --> bool: with self.__lock: if ((id ->= 0 and id <= 239) or id == self.BROADCASTING_ID) and addr ->= 0 and addr <= 0x4ff: if self.TxPacket(id, self.CMD_MemWRITE, 1, W2Bs(addr) + data, echo)[1]: if id != self.BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return (d[2] == id) and (d[4] == 0x21) else: return True return False def MemWRITE8(self, id : int, addr : int, data : Union[int, list, tuple], echo = False) --> bool: return self.MemWRITE(id, addr, B2Bs(data), echo) def MemWRITE16(self, id : int, addr : int, data : Union[int, list, tuple], echo = False) --> bool: return self.MemWRITE(id, addr, W2Bs(data), echo) def MemWRITE32(self, id : int, addr : int, data : Union[int, list, tuple], echo = False) --> bool: return self.MemWRITE(id, addr, L2Bs(data), echo) def MemREAD(self, id : int, addr : int, length : int, echo = False) --> bytes: with self.__lock: if addr ->= 0 and id <= 239 and addr ->= 0 and addr <= 0x4ff and length -> 0 and length -> 0 and length <= 247: if self.TxPacket(id, self.CMD_MemREAD, 0, W2Bs(addr) + B2Bs(length), echo)[1]: d, r = self.RxPacket(echo) if r: if d[4] == 0x20 and id == d[2] and (d[5] & (4 + 8 + 0x10 + 0x40)) == 0: return bytes(d[6:-2]) return None def MemREAD8(self, id : int, addr : int, length = 1, signed = False, echo = False) --> int: r = self.MemREAD(id, addr, length, echo) if r != None: n = sum(iter_unpack('b' if signed else 'B', r), ()) return n if length -> 1 else n[0] return None def MemREAD16(self, id : int, addr : int, length = 1, signed = False, echo = False) --> int: r = self.MemREAD(id, addr, 2 * length, echo) if r != None: n = sum(iter_unpack('h' if signed else 'H', r), ()) return n if length -> 1 else n[0] return None def MemREAD32(self, id : int, addr : int, length = 1, signed = False, echo = False) --> int: r = self.MemREAD(id, addr, 4 * length, echo) if r != None: n = sum(iter_unpack('i' if signed else 'I', r), ()) return n if length -> 1 else n[0] return None def LOAD(self, id : int, echo = False) --> bool: with self.__lock: if self.TxPacket(id, self.CMD_LOAD, 0, (), echo)[1]: if id != self.BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return id == d[2] and d[4] == 0x22 else: return True; return False def SAVE(self, id : int, echo = False) --> bool: with self.__lock: if self.TxPacket(id, self.CMD_SAVE, 0, (), echo)[1]: if id != self.BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return id == d[2] and d[4] == 0x23 else: return True; return False def MotorREAD(self, id : int, echo = False) --> tuple: with self.__lock: if self.TxPacket(id, self.CMD_MotorREAD, 0, (), echo): if id != self.BROADCASTING_ID: dat, r = self.RxPacket(echo) if r: return tuple([dat[6], tuple(n[0] for n in iter_unpack('<h', dat[7:-2])) if len(dat[7:-2]) -> 0 and dat[4] == 0x24 else ()]) else: return None else: return () return None def MotorWRITE(self, id : int, opt : int, dat : (), echo = False) --> tuple: with self.__lock: if self.TxPacket(id, self.CMD_MotorWRITE, opt, W2Bs(dat), echo): if id != self.BROADCASTING_ID: dat, r = self.RxPacket(echo) if r: return tuple([dat[6], tuple(n[0] for n in iter_unpack('<h', dat[7:-2])) if len(dat[7:-2]) -> 0 and dat[4] == 0x25 else ()]) else: return None else: return () return None def SystemREAD(self, id : int, echo = False) --> tuple: with self.__lock: if self.TxPacket(id, self.CMD_SystemREAD, 0, (), echo)[1]: d, r = self.RxPacket(echo) if r: if len(d[6:-2]) == 13 and d[4] == 0x3b: v = tuple(iter_unpack('<IIIB', d[6:-2]))[0] return tuple(v) return None def SystemWRITE(self, id : int, data : (), echo = False) --> bool: if data[0] ->= 0 and data[0] <= 239 and data[1] ->= 0 and data[1] <= 7 and data[2] ->= 0 and data[2] <= 2 and data[3] ->= 1 and data[3] <= 255: d = self.SystemREAD(id, echo) if d: with self.__lock: if self.TxPacket(id, self.CMD_SystemWRITE, 0xf, L2Bs(d[0]) + B2Bs(data), echo)[1]: d, r = self.RxPacket(echo) if r: return id == d[2] and d[4] == 0x3c and (d[5] & (4 + 8 + 0x10 + 0x20 + 0x40)) == 0 return False def ReBoot(self, id : int, echo = False) --> bool: with self.__lock: if id != self.BROADCASTING_ID: if self.TxPacket(id, self.CMD_ReBoot, 0, W2Bs(0), echo)[1]: d, r = self.RxPacket(echo) if r: return d[2] == id and d[4] == 0x3d return False def FactoryReset(self, id : int, echo = False) --> bool: if id != self.BROADCASTING_ID: d = self.SystemREAD(id, echo) if d != None: with self.__lock: if self.TxPacket(id, self.CMD_FactoryReset, 0, L2Bs(d[0]), echo)[1]: d, r = self.RxPacket(echo) if r: return d[2] == id and d[4] == 0x3e return False def FullScan(self) --> tuple: orgtimeout = self.timeout orgbaudrate = self.baudrate baud=(57600,115200,625000,1000000,1250000,1500000,2000000,3000000) self.timeout = 0.005 for b in baud: self.baudrate = b for id in range(240): r = self.MemREAD8(id, 0) print(f'baud:{b:7} id:{id:3} :', end='find\n' if r else 'none\r') self.timeout = orgtimeout self.baudrate = orgbaudrate ########################################################## # test code ########################################################## if __name__ == "__main__": import time from pyPMX import * try: pmx = PMXProtocol('\\\\.\\COM12', 3000000) except: pass else: ID = 0 # reboot print('ReBoot=', pmx.ReBoot(ID, echo=True)) time.sleep(0.5) ''' # factory reset print('FactoryReset=',pmx.FactoryReset(ID)) time.sleep(0.5) ''' # load print('LOAD=', pmx.LOAD(ID, echo=True)) # save print('SAVE=', pmx.SAVE(ID, echo=True)) ''' # sys write print('SysWrite=',pmx.SystemWRITE(ID, (ID, pmx.SYSW_BAUD_115_2k, pmx.SYSW_PARITY_NONE, 20))) time.sleep(0.5) ''' # sys read r = pmx.SystemREAD(ID) print('SysRead=', f'${r[0]:08x} ${r[1]:08x} ${r[2]:08x} {r[3]:d} ' if r else 'False') # dump memory print('dump') for addr in range(0, 800, 20): r = pmx.MemREAD(ID, addr, 20) print(f' MemREAD({addr}:{20})=', r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!') # gain r = pmx.MemREAD(ID, 0, 64) print('gain\n MemREAD(0:64)=', *iter_unpack('<IIIIIIIIIIIIIIII', r) if r else '!!!!!!!!') # voltage limit r = pmx.MemREAD(ID, 76, 8) print('volt lim\n MemREAD(76:8)=', *iter_unpack('<HHHH', r) if r else '!!!!!!!!') # present value r = pmx.MemREAD(ID, 300, 24) print('present val\n MemREAD(300:20)=', *iter_unpack('<hhhhhhhhhHHH', r) if r else '!!!!!!!!') # error stat r = pmx.MemREAD(ID, 400, 6) print('err stat\n MemREAD(0:400:7)=', *iter_unpack('<BBBxH', r) if r else '!!!!!!!!') print('preparation for pos ctrl') # ctrl off print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) time.sleep(0.1) # 1:pos,2:speed,4:cur,8:torq,16:pwm,32:time print(' MemWRITE8(501)=', pmx.MemWRITE8(ID, 501, 1)) print(' MemWRITE8(502)=', pmx.MemWRITE8(ID, 502, 0b11111)) time.sleep(0.1) # goal angle print('start pos ctrl') print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_TORQUEON, ())) for ang in tuple(range(0, 32000, 500)) + tuple(range(32000, -32000, -500)) + tuple(range(-32000, 500, 500)): for n in range(10): r = pmx.MotorWRITE(ID, pmx.MOTW_OPT_NONE, (ang,)) if r != None: if len(r) -> 0: print(f' MotorWRITE= {r[0]},', ''.join('{:6},'.format(k) for k in r[1]), 'stat=', pmx.status, end = '\r') pmx.MemREAD(ID, 400, 6) else: print(' MotorWRITE= !!!!!!!!!!!!!!!!!!!!!!!!') break time.sleep(0.005) else: continue pass break else: print() time.sleep(0.1) print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) time.sleep(0.1) # 1:pos,2:speed,4:cur,8:torq,16:pwm,32:time print(' MemWRITE8(501)=', pmx.MemWRITE8(ID, 501, 2)) time.sleep(0.1) # goal angle print('start speed ctrl') print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_TORQUEON, ())) for spd in tuple(range(0, 2000, 50)) + tuple(range(2000, -2000, -50)) + tuple(range(-2000, 50, 50)): for n in range(30): r = pmx.MotorWRITE(ID, pmx.MOTW_OPT_NONE, (spd,)) if r != None: if len(r) -> 0: print(f' MotorWRITE= {r[0]},', ''.join('{:6},'.format(k) for k in r[1]), 'stat=', pmx.status, end = '\r') pmx.MemREAD(ID, 400, 6) else: print(' MotorWRITE= !!!!!!!!!!!!!!!!!!!!!!!!') break time.sleep(0.005) else: continue pass break else: print() time.sleep(0.1) print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) del pmx print('fin') </pre> }} こちらでは主に[[ここ>DYNAMIXEL basic tutorial#z879d754]]にあるWindows環境の未リリース版イメージで試しているが、基本的には[[MSYS2>https://www.msys2.org/]]をインストールしておけばその中で動かせる。もちろんLinuxやmacでも動くので、気楽に試してみてはどうだろう。 ****全検索 [#w7b4050b] IDやボーレートが分からないと何も始められないので、まずは検索ができると何かと便利。~ 検索はライブラリに入れておいた機能を呼び出すだけなので、次のコードで簡単に利用できる。 #html{{ <pre class="brush: python;" title="scan.py"> #!/usr/bin/python3 import os, sys from pyPMX import PMXProtocol dev = '\\\\.\\COM12' if len(sys.argv) == 1 else sys.argv[1] if not os.path.exists(dev): dev = '' if dev != '': try: pmx = PMXProtocol(dev, 57600) pmx.FullScan() except: print(' ERR:There is some problem.') pass sys.stdout.write('\033[2K\033[1G') else: print(' usage: scan [line]') </pre> }} ****ID変更 [#e74d62c1] ネットワーク上のPMXは各々個別のIDが割り振られている必要があり、同一のものが存在している事を許容していない。~ 次のコードは任意のIDへ変更するものだが、少しでも疑義が生じたらIDの変更を行わないようにしている。 #html{{ <pre class="brush: python;" title="changeid.py"> #!/usr/bin/python3 import os, sys from pyPMX import PMXProtocol baudlist = { 57600 : 0, 115200 : 1, 625000 : 2, 1000000 : 3, 1250000 : 4, 1500000 : 5, 2000000 : 6, 3000000 : 7 } if len(sys.argv) == 4: arg = sys.argv[1:] dev = '\\\\.\\COM12' elif len(sys.argv) == 5: arg = sys.argv[2:] dev = sys.argv[1] else: arg = [] dev = '' if not os.path.exists(dev): dev = '' if dev != '': try: baud = int(arg[0]) baudind = baudlist[baud] previd = int(arg[1]) newid = int(arg[2]) pmx = PMXProtocol(dev, baud) except: print(' ERR:There is some problem.') else: if newid >= 0 and newid <= 239: if pmx.MemREAD8(newid, 0) == None: s = (newid, baudind, pmx.SYSW_PARITY_NONE, 20) if pmx.MotorWRITE(previd, pmx.MOTW_OPT_FREE, ()) != None: if pmx.SystemWRITE(previd, s): print(' OK') else: print(' NG') else: print(f' ERR: Device with ID:{previd} not found with BAUDRATE:{baud}') else: print(' ERR:Competing.') else: print(' ERR:There is some problem.') else: print(' usage: changeid [line] <baudrate> <prev id> <new id>') </pre> }} ****ボーレート変更 [#r468c92e] 次のコードは任意のボーレートへ変更するものだが、少しでも疑義が生じたらボーレートの変更を行わないようにしている。 #html{{ <pre class="brush: python;" title="changebaud.py"> #!/usr/bin/python3 import os, sys from pyPMX import PMXProtocol baudlist = { 57600 : 0, 115200 : 1, 625000 : 2, 1000000 : 3, 1250000 : 4, 1500000 : 5, 2000000 : 6, 3000000 : 7 } if len(sys.argv) == 4: arg = sys.argv[1:] dev = '\\\\.\\COM12' elif len(sys.argv) == 5: arg = sys.argv[2:] dev = sys.argv[1] else: arg = [] dev = '' if not os.path.exists(dev): dev = '' if dev != '': try: prevbaud = int(arg[0]) prevind = baudlist[prevbaud] id = int(arg[1]) newbaud = int(arg[2]) newind = baudlist[newbaud] pmx = PMXProtocol(dev, newbaud) except: print(' ERR: There is some problem.') else: if pmx.MemREAD8(id, 0) == None: pmx.baudrate = prevbaud if pmx.MotorWRITE(id, pmx.MOTW_OPT_FREE, ()) != None: s = (id, newind, pmx.SYSW_PARITY_NONE, 20) if pmx.SystemWRITE(id, s): print(' OK') else: print(' NG') else: print(f' ERR: Device with ID:{id} not found with BAUDRATE:{prevbaud}') else: print(' ERR: Competing.') else: print(' usage: changebaud [line] <baudrate> <id> <new baudrate>') </pre> }} ***MicroPython [#u62caf90] [[PicoSHIELD]]+[[Raspberry Pi Pico>https://micropython.org/download/RPI_PICO/]]を対象に[[Python>#mdb668e2]]のコードを作ってみた。 #html{{ <pre class="brush: python;" title="pyPMX.py"> # pyPMX.py for MicroPython(RP2040) # SPDX-License-Identifier: MIT # SPDX-FileCopyrightText: (C) 2024 mukyokyo from machine import UART, Pin import time, array from collections import namedtuple from struct import pack, unpack ########################################################## # Functionalized the part of converting int to bytes. # If specified as a tuple, it is converted to bytes at once. ########################################################## @micropython.native def B2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return bytes(((d & 0x7f) | 0x80) if d < 0 else d for d in d) else: return bytes(pack('<B', ((d & 0x7f) | 0x80) if d < 0 else d)) @micropython.native def W2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return b''.join([pack('<H',d) for d in [((d & 0x7fff) | 0x8000) if d < 0 else d for d in d]]) else: return bytes(pack('<H', ((d & 0x7fff) | 0x8000) if d < 0 else d)) @micropython.native def L2Bs(d) -> bytes: if isinstance(d, list) or isinstance(d, tuple): return b''.join([pack('<I',d) for d in [((d & 0x7fffffff) | 0x80000000) if d < 0 else d for d in d]]) else: return bytes(pack('<I', ((d & 0x7fffffff) | 0x80000000) if d < 0 else d)) ########################################################## # API for Kondo PMX ########################################################## class PMXProtocol: BROADCASTING_ID = const(0xff) CMD_MemREAD = const(0xa0) CMD_MemWRITE = const(0xa1) CMD_LOAD = const(0xa2) CMD_SAVE = const(0xa3) CMD_MotorREAD = const(0xa4) CMD_MotorWRITE = const(0xa5) CMD_SystemREAD = const(0xbb) CMD_SystemWRITE = const(0xbc) CMD_ReBoot = const(0xbd) CMD_FactoryReset = const(0xbe) CMD_SystemINIT = const(0xbf) (SYSW_BAUD_57600, SYSW_BAUD_115_2k, SYSW_BAUD_625k, SYSW_BAUD_1M, SYSW_BAUD_1_25M, SYSW_BAUD_1_5M, SYSW_BAUD_2M, SYSW_BAUD_3M) = range(8) (SYSW_PARITY_NONE, SYSW_PARITY_ODD, SYSW_PARITY_EVEN) = range(3) (MOTW_OPT_NONE, MOTW_OPT_TORQUEON, MOTW_OPT_FREE, _, MOTW_OPT_BRAKE, _, _, _, MOTW_OPT_HOLD) = range(9) __crc16_lutable = array.array('H',( 0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef, 0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de, 0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485,0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d, 0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4,0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc, 0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823,0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b, 0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12,0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a, 0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41,0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49, 0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70,0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78, 0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f,0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067, 0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e,0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256, 0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d,0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405, 0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c,0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634, 0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab,0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3, 0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a,0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92, 0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9,0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x01ce0,0xcc1, 0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8,0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0 )) def __init__(self, baudrate = 115200, timeout_ms = 50): """ Initalize parameters ------------- baudrate : int Serial baudrate[bps] timeout_ms : float Read timeout[s] """ self.__Error = 0 self.__baudrate = baudrate self.__timeout = timeout_ms self.__status = 0 self.__serial = UART(1, baudrate, tx = Pin(4), rx = Pin(5), txbuf = 500, rxbuf = 500, timeout = timeout_ms) @property def baudrate(self): return self.__baudrate @baudrate.setter def baudrate(self, baudrate): self.__baudrate = baudrate self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout) @property def timeout(self): return self.__timeout @timeout.setter def timeout(self, timeout_ms): self.__timeout = timeout_ms self.__serial = UART(1, baudrate = self.__baudrate, tx = Pin(4), rx = Pin(5), rxbuf = 500, timeout = self.__timeout) @property def status(self): return self.__status @micropython.viper def __crc16(self, dat : ptr8, l:int) -> int: crc:int = 0 pt = ptr16(self.__crc16_lutable) for i in range(l): crc = (crc << 8) ^ pt[((crc >> 8) ^ dat[i]) & 0xff] return crc & 0xffff @micropython.native def TxPacket(self, id : int, cmd : int, opt : int, param : bytes, echo = False) -> (bytes, bool): if ((id >= 0 and id <= 239) or id == BROADCASTING_ID) and len(param) <= (256 - 8): txp = bytes([0xfe,0xfe,id,len(param) + 8,cmd,opt]) + bytes(param) txp += W2Bs(self.__crc16(txp, len(txp))) _ = self.__serial.read(self.__serial.any()) if echo: print('TX:', txp.hex(':')) self.__serial.write(txp) return txp, True return None, False @micropython.native def __rx(self, length) -> bytes: s = self.__serial.read(length) if s: l = len(s) if l == length: return s else: r = s length -= l while self.__serial.any() > 0: s = self.__serial.read(length) r += s length -= len(s) if length == 0: break return r else: return bytes() @micropython.native def RxPacket(self, echo = False) -> (bytes, bool): self.__serial.flush() rxp = self.__rx(6) if rxp: if len(rxp) == 6: l = rxp[3] - 6 rxp += self.__rx(l) if len(rxp) == l + 6 and rxp[0] == 0xfe and rxp[1] == 0xfe and unpack('<H', rxp[-2:])[0] == self.__crc16(rxp[:-2], len(rxp[:-2])): self.__status = rxp[5] if echo: print('RX:', rxp.hex(':')) return bytes(rxp), True return None, False @micropython.native def MemWRITE(self, id : int, addr : int, data : bytes, echo = False) -> bool: if ((id >= 0 and id <= 239) or id == BROADCASTING_ID) and addr >= 0 and addr <= 0x4ff: if self.TxPacket(id, CMD_MemWRITE, 1, W2Bs(addr) + data, echo)[1]: if id != BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return (d[2] == id) and (d[4] == 0x21) else: return True return False @micropython.native def MemWRITE8(self, id : int, addr : int, data : int | list | tuple, echo = False) -> bool: return self.MemWRITE(id, addr, B2Bs(data), echo) @micropython.native def MemWRITE16(self, id : int, addr : int, data : int | list | tuple, echo = False) -> bool: return self.MemWRITE(id, addr, W2Bs(data), echo) @micropython.native def MemWRITE32(self, id : int, addr : int, data : int | list | tuple, echo = False) -> bool: return self.MemWRITE(id, addr, L2Bs(data), echo) @micropython.native def MemREAD(self, id : int, addr : int, length : int, echo = False) -> bytes: if addr >= 0 and id <= 239 and addr >= 0 and addr <= 0x4ff and length > 0 and length > 0 and length <= 247: if self.TxPacket(id, CMD_MemREAD, 0, W2Bs(addr) + B2Bs(length), echo)[1]: d, r = self.RxPacket(echo) if r: if d[4] == 0x20 and id == d[2] and (d[5] & (4 + 8 + 0x10 + 0x40)) == 0: return bytes(d[6:-2]) return None @micropython.native def MemREAD8(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.MemREAD(id, addr, length, echo) if r != None: n = sum(iter_unpack('b' if signed else 'B', r), ()) return n if length > 1 else n[0] return None @micropython.native def MemREAD16(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.MemREAD(id, addr, 2 * length, echo) if r != None: n = sum(iter_unpack('h' if signed else 'H', r), ()) return n if length > 1 else n[0] return None @micropython.native def MemREAD32(self, id : int, addr : int, length = 1, signed = False, echo = False) -> int: r = self.MemREAD(id, addr, 4 * length, echo) if r != None: n = sum(iter_unpack('i' if signed else 'I', r), ()) return n if length > 1 else n[0] return None @micropython.native def LOAD(self, id : int, echo = False) -> bool: if self.TxPacket(id, CMD_LOAD, 0, (), echo)[1]: if id != BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return id == d[2] and d[4] == 0x22 else: return True; return False @micropython.native def SAVE(self, id : int, echo = False) -> bool: if self.TxPacket(id, CMD_SAVE, 0, (), echo)[1]: if id != BROADCASTING_ID: d, r = self.RxPacket(echo) if r: return id == d[2] and d[4] == 0x23 else: return True; return False @micropython.native def MotorREAD(self, id : int, echo = False) -> tuple: if self.TxPacket(id, CMD_MotorREAD, 0, (), echo): if id != BROADCASTING_ID: dat, r = self.RxPacket(echo) if r: return tuple([dat[6], tuple(n[0] for n in iter_unpack('<h', dat[7:-2])) if len(dat[7:-2]) > 0 and dat[4] == 0x24 else ()]) else: return None else: return () return None @micropython.native def MotorWRITE(self, id : int, opt : int, dat : (), echo = False) -> tuple: if self.TxPacket(id, CMD_MotorWRITE, opt, W2Bs(dat), echo): if id != BROADCASTING_ID: dat, r = self.RxPacket(echo) if r: return tuple([dat[6], unpack('<'+'h'*(len(dat[7:-2])>>1), dat[7:-2]) if len(dat[7:-2]) > 0 and dat[4] == 0x25 else ()]) else: return None else: return () return None @micropython.native def SystemREAD(self, id : int, echo = False) -> tuple: if self.TxPacket(id, CMD_SystemREAD, 0, (), echo)[1]: d, r = self.RxPacket(echo) if r: if len(d[6:-2]) == 13 and d[4] == 0x3b: return unpack('<IIIB', d[6:-2]) return None @micropython.native def SystemWRITE(self, id : int, data : (), echo = False) -> bool: if data[0] >= 0 and data[0] <= 239 and data[1] >= 0 and data[1] <= 7 and data[2] >= 0 and data[2] <= 2 and data[3] >= 1 and data[3] <= 255: d = self.SystemREAD(id, echo) if d: with self.__mutex: if self.TxPacket(id, CMD_SystemWRITE, 0xf, L2Bs(d[0]) + B2Bs(data), echo)[1]: d, r = self.RxPacket(echo) if r: return id == d[2] and d[4] == 0x3c and (d[5] & (4 + 8 + 0x10 + 0x40)) == 0 return False @micropython.native def ReBoot(self, id : int, echo = False) -> bool: if id != BROADCASTING_ID: if self.TxPacket(id, CMD_ReBoot, 0, W2Bs(0), echo)[1]: d, r = self.RxPacket(echo) if r: return d[2] == id and d[4] == 0x3d return False @micropython.native def FactoryReset(self, id : int, echo = False) -> bool: if id != BROADCASTING_ID: d = self.SystemREAD(id, echo) if d != None: if self.TxPacket(id, CMD_FactoryReset, 0, L2Bs(d[0]), echo)[1]: d, r = self.RxPacket(echo) if r: return d[2] == id and d[4] == 0x3e return False @micropython.native def FullScan(self) -> tuple: orgtimeout = self.__timeout orgbaudrate = self.__baudrate #baud=(3000000,2000000,1500000,1250000,1000000,625000,115200,57600) baud=(57600,115200,625000,1000000,1250000,1500000,2000000,3000000) self.__serial.timeout = 0.001 for b in baud: for id in range(240): self.__serial.baudrate = b r = self.MemREAD8(id, 0) print(f'baud:{b:7} id:{id:3} :', end='find\n' if r else 'none\r') self.__serial.timeout = orgtimeout self.__baudrate = orgbaudrate ########################################################## # test code ########################################################## if __name__ == "__main__": import time from pyPMX import * machine.freq(250000000) # set the CPU frequency to 250 MHz try: pmx = PMXProtocol() except: pass else: ID = 0 # reboot print('ReBoot=', pmx.ReBoot(ID, echo=True)) time.sleep(0.5) ''' # factory reset print('FactoryReset=',pmx.FactoryReset(ID)) time.sleep(0.5) ''' # load print('LOAD=', pmx.LOAD(ID, echo=True)) # save print('SAVE=', pmx.SAVE(ID, echo=True)) ''' # sys write print('SysWrite=',pmx.SystemWRITE(ID, (ID, pmx.SYSW_BAUD_115_2k, pmx.SYSW_PARITY_NONE, 20))) time.sleep(0.5) ''' # sys read r = pmx.SystemREAD(ID, echo=True) print('SysRead=', f'${r[0]:08x} ${r[1]:08x} ${r[2]:08x} {r[3]:d} ' if r else 'False') # dump memory print('dump') for addr in range(0, 800, 20): r = pmx.MemREAD(ID, addr, 20) print(f' MemREAD({addr}:{20})=', r.hex(':') if r else '!!!!!!!!!!!!!!!!!!!!!!!!') # gain r = pmx.MemREAD(ID, 0, 64) print('gain\n MemREAD(0:64)=', unpack('<IIIIIIIIIIIIIIII', r) if r else '!!!!!!!!') # voltage limit r = pmx.MemREAD(ID, 76, 8) print('volt lim\n MemREAD(76:8)=', unpack('<HHHH', r) if r else '!!!!!!!!') # present value r = pmx.MemREAD(ID, 300, 24) print('present val\n MemREAD(300:20)=', unpack('<hhhhhhhhhHHH', r) if r else '!!!!!!!!') # error stat r = pmx.MemREAD(ID, 400, 6) print('err stat\n MemREAD(0:400:7)=', unpack('<BBBxH', r) if r else '!!!!!!!!') print('preparation for pos ctrl') # ctrl off print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) time.sleep(0.1) # 1:pos,2:speed,4:cur,8:torq,16:pwm,32:time print(' MemWRITE8(501)=', pmx.MemWRITE8(ID, 501, 1)) print(' MemWRITE8(502)=', pmx.MemWRITE8(ID, 502, 0b11111)) time.sleep(0.1) # goal angle print('start pos ctrl') print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_TORQUEON, ())) for ang in tuple(range(0, 32000, 500)) + tuple(range(32000, -32000, -500)) + tuple(range(-32000, 500, 500)): for n in range(10): r = pmx.MotorWRITE(ID, pmx.MOTW_OPT_NONE, (ang,)) if r != None: if len(r) > 0: print(f' MotorWRITE= {r[0]},', ''.join('{:6},'.format(k) for k in r[1]), 'stat=', pmx.status, end = '\r') pmx.MemREAD(ID, 400, 6) else: pass print(' MotorWRITE= !!!!!!!!!!!!!!!!!!!!!!!!') break time.sleep(0.005) else: continue pass break else: print() time.sleep(0.1) print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) time.sleep(0.1) # 1:pos,2:speed,4:cur,8:torq,16:pwm,32:time print(' MemWRITE8(501)=', pmx.MemWRITE8(ID, 501, 2)) time.sleep(0.1) # goal angle print('start speed ctrl') print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_TORQUEON, ())) for spd in tuple(range(0, 2000, 50)) + tuple(range(2000, -2000, -50)) + tuple(range(-2000, 50, 50)): for n in range(10): r = pmx.MotorWRITE(ID, pmx.MOTW_OPT_NONE, (spd,)) if r != None: if len(r) > 0: print(f' MotorWRITE= {r[0]},', ''.join('{:6},'.format(k) for k in r[1]), 'stat=', pmx.status, end = '\r') pmx.MemREAD(ID, 400, 6) else: pass print(' MotorWRITE= !!!!!!!!!!!!!!!!!!!!!!!!') break time.sleep(0.005) else: continue pass break else: print() time.sleep(0.1) print(' MotorWRITE=', pmx.MotorWRITE(ID, pmx.MOTW_OPT_FREE, ())) del pmx print('fin') </pre> }}
KONDO PMX のバックアップソース(No. All)
現: 2024-04-13 (土) 22:53:38
takaboo
Copyright© 1989,2024 BestTechnology Co.,Ltd.
メニュー
トップ
ニュース
お知らせ
新商品情報
更新情報
マニュアル類
技術情報
チュートリアル
FAQ
お問合せ
技術的なお問合せ
その他お問合せ
お見積り
ブログ
会社案内
ショッピング
ご利用案内
検索
カテゴリ 一覧
ロボットキット
Dynamixelシリーズ
マイコンボード
相撲ロボット
センサ
通信機器
お客様ページ