13: 2023-09-07 (木) 18:49:45 takaboo | 14: 2024-01-08 (月) 13:54:25 takaboo | ||
---|---|---|---|
Line 992: | Line 992: | ||
***他の言語 [#d5eca280] | ***他の言語 [#d5eca280] | ||
+ | ****Pythonその1 [#t51150f3] | ||
[[DX2LIBのページ>DX2LIB#u3ff03d9]]に紹介されている通りだが、その中でも流行の[[Python>https://www.python.jp/]]で遊んでみても面白いかと思う。~ | [[DX2LIBのページ>DX2LIB#u3ff03d9]]に紹介されている通りだが、その中でも流行の[[Python>https://www.python.jp/]]で遊んでみても面白いかと思う。~ | ||
GCC Developer LiteにはPythonが含まれており、コンパイルオプションで「Python(32bit)」や「Python(64bit)」を選ぶ事でライブラリに同梱されるPythonのサンプルソースコードが利用できる。しかし追加APIを利用したものばかりで、コントロールテーブルを読み書きする基本的なAPIは使われていない。 | GCC Developer LiteにはPythonが含まれており、コンパイルオプションで「Python(32bit)」や「Python(64bit)」を選ぶ事でライブラリに同梱されるPythonのサンプルソースコードが利用できる。しかし追加APIを利用したものばかりで、コントロールテーブルを読み書きする基本的なAPIは使われていない。 | ||
Line 1190: | Line 1191: | ||
}} | }} | ||
追加APIに無い機能は基本APIを使ってコントロールテーブルへ直接アクセスするしかないため、参考にしてもらえればと思う。 | 追加APIに無い機能は基本APIを使ってコントロールテーブルへ直接アクセスするしかないため、参考にしてもらえればと思う。 | ||
+ | |||
+ | ****Pythonその2 [#k91ab321] | ||
+ | 全てPythonで書いてしまえば、Cで作られたライブラリをリンクしたり、ctypesの有象無象に呵まれる事も無くスッキリする。~ | ||
+ | 突如気が向いたので、インストラクションパケットの生成と送信、ステータスパケットの受信、インストラクションの種類によってパラメータ部分を結合・分解するクラスを作ってみた。前記の内容とは一線を画すので、互換性は一切考えていない。 | ||
+ | [[pySerial>https://github.com/pyserial/pyserial]]は必須。~ | ||
+ | 単体で一通りのAPIを試せるようにデモコードを付けておいた。ターゲットはXM/XH/XDシリーズ、IDは1~4(少なくともID=1は必須)、ボーレートは3Mbpsを想定、デバッグ用にpsutilが必要といったところだ。 | ||
+ | #html{{ | ||
+ | <pre class="brush: python;" title="dx2lib.py"> | ||
+ | #!/usr/bin/python3 | ||
+ | # -*- coding: utf-8 -*- | ||
+ | # | ||
+ | # dx2lib.py | ||
+ | # (C) 2024 BestTechnology | ||
+ | |||
+ | import serial, threading, array, struct | ||
+ | from collections import namedtuple | ||
+ | from struct import pack, unpack | ||
+ | |||
+ | class dx2lib: | ||
+ | BROADCASTING_ID = 0xfe | ||
+ | INST_PING = 0x01 | ||
+ | INST_READ = 0x02 | ||
+ | INST_WRITE = 0x03 | ||
+ | INST_REG_WRITE = 0x04 | ||
+ | INST_ACTION = 0x05 | ||
+ | INST_RESET = 0x06 | ||
+ | INST_REBOOT = 0x08 | ||
+ | INST_SYS_WRITE = 0x0d | ||
+ | INST_CLEAR = 0x10 | ||
+ | INST_CONTROL_TABLE_BACKUP = 0x20 | ||
+ | INST_STATUS = 0x55 | ||
+ | INST_SYNC_READ = 0x82 | ||
+ | INST_SYNC_WRITE = 0x83 | ||
+ | INST_SYNG_REG_WRITE = 0x85 | ||
+ | INST_FAST_SYNC_READ = 0x8a | ||
+ | INST_BULK_READ = 0x92 | ||
+ | INST_BULK_WRITE = 0x93 | ||
+ | INST_FAST_BULK_READ = 0x9a | ||
+ | |||
+ | TSyncW = namedtuple("TSyncW", ("id", ("data"))) | ||
+ | TBulkW = namedtuple("TBulkW", ("id", "addr", ("data"))) | ||
+ | TBulkR = namedtuple("TBulkR", ("id", "addr", "length")) | ||
+ | |||
+ | __crc16_lutable = array.array('H',[]) | ||
+ | |||
+ | def __init__(self, port : str, baudrate : int = 57600, timeout : float = 0.05): | ||
+ | """ | ||
+ | Initalize | ||
+ | |||
+ | parameters | ||
+ | ------------- | ||
+ | port : str | ||
+ | Device name | ||
+ | baudrate : int | ||
+ | Serial baudrate[bps] | ||
+ | timeout : float | ||
+ | Read timeout[s] | ||
+ | """ | ||
+ | self.__serial = serial.Serial(port, baudrate, timeout = timeout) | ||
+ | self.__baudrate = baudrate | ||
+ | self.__timeout = timeout; | ||
+ | self.__mutex = threading.Lock() | ||
+ | for i in range(256): | ||
+ | nData = i << 8 | ||
+ | nAccum = 0 | ||
+ | poly = 0x8005 | ||
+ | for j in range(8): | ||
+ | if ((nData ^ nAccum) & 0x8000): | ||
+ | nAccum = (nAccum << 1) ^ poly | ||
+ | else: | ||
+ | nAccum <<= 1 | ||
+ | nData <<= 1 | ||
+ | self.__crc16_lutable.append(nAccum & 0xffff) | ||
+ | |||
+ | @property | ||
+ | def mutex(self): | ||
+ | return self.__mutex | ||
+ | |||
+ | @property | ||
+ | def baudrate(self): | ||
+ | return self.__ser.baudrate | ||
+ | |||
+ | @baudrate.setter | ||
+ | def baudrate(self, baudrate): | ||
+ | self.__baudrate = baudrate | ||
+ | self.__serial.baudrate = baudrate | ||
+ | |||
+ | @property | ||
+ | def timeout(self): | ||
+ | return self.__serial.timeout | ||
+ | |||
+ | @timeout.setter | ||
+ | def timeout(self, timeout): | ||
+ | self.__timeout = timeout | ||
+ | self.__serial.timeout = timeout | ||
+ | |||
+ | def __crc16(self, data : bytes) -> int: | ||
+ | crc = 0 | ||
+ | for d in data: | ||
+ | crc = (crc << 8) ^ self.__crc16_lutable[(((crc >> 8) ^ d) & 0xff)]; | ||
+ | return crc & 0xffff | ||
+ | |||
+ | def TxPacket(self, id : int, inst : int, param : bytes, echo = False) -> (bytes, bool): | ||
+ | """ | ||
+ | Sending packets | ||
+ | |||
+ | parameters | ||
+ | ------------- | ||
+ | id : int | ||
+ | Target ID | ||
+ | inst : int | ||
+ | Instruction command | ||
+ | param : bytearray | ||
+ | Packet parameters | ||
+ | |||
+ | Returns | ||
+ | ------- | ||
+ | bytes | None | ||
+ | Packets sent | ||
+ | bool | ||
+ | Success or failure | ||
+ | """ | ||
+ | if (id == self.BROADCASTING_ID) or (id <= 252): | ||
+ | instp = bytearray([0xff,0xff,0xfd,0x00,id,0,0,inst]) + bytearray(param).replace(b'\xff\xff\xfd',b'\xff\xff\xfd\xfd') | ||
+ | instp[5:7] = bytes(tuple(pack('<H', len(instp) - 5))) | ||
+ | instp += bytes(tuple(pack('<H', self.__crc16(instp)))) | ||
+ | self.__serial.reset_input_buffer() | ||
+ | if echo: print('TX:', instp.hex(',')) | ||
+ | self.__serial.write(instp) | ||
+ | return bytes(instp), True | ||
+ | return None, False | ||
+ | |||
+ | def RxPacket(self, echo = False) -> (bytes, bool): | ||
+ | """ | ||
+ | Receiving packets | ||
+ | |||
+ | Returns | ||
+ | ------- | ||
+ | bytes | None | ||
+ | Packets received | ||
+ | bool | ||
+ | Success or failure | ||
+ | """ | ||
+ | statp = self.__serial.read(9) | ||
+ | if statp: | ||
+ | if len(statp) == 9: | ||
+ | l = unpack('<H', statp[5:7])[0] - 2 | ||
+ | statp += self.__serial.read(l) | ||
+ | if len(statp) == l + 9: | ||
+ | if unpack('<H', statp[-2:])[0] == self.__crc16(statp[:-2]): | ||
+ | statp = statp[0:9] + statp[9:].replace(b'\xff\xff\xfd\xfd',b'\xff\xff\xfd') | ||
+ | if echo: print('RX:', statp.hex(',')) | ||
+ | return bytes(statp), True | ||
+ | return None, False | ||
+ | |||
+ | def WriteBlockData(self, id : int, addr : int, data : bytes, echo = False) -> bool: | ||
+ | """ | ||
+ | Write instruction | ||
+ | |||
+ | parameters | ||
+ | ------------- | ||
+ | id : int | ||
+ | Target ID | ||
+ | addr : int | ||
+ | Target item address | ||
+ | data : bytes | ||
+ | Data to be written | ||
+ | |||
+ | Returns | ||
+ | ------- | ||
+ | result : bool | ||
+ | Success or failure | ||
+ | """ | ||
+ | with self.__mutex: | ||
+ | if (id <= 252) or (id == self.BROADCASTING_ID): | ||
+ | _, r = self.TxPacket(id, self.INST_WRITE, memoryview(pack('<H', addr) + data), echo) | ||
+ | if r: | ||
+ | d, r = self.RxPacket(echo) | ||
+ | return r and (d[4] == id) | ||
+ | return False | ||
+ | |||
+ | def WriteByteData(self, id : int, addr : int, data : int, echo = False) -> bool: | ||
+ | return self.WriteBlockData(id, addr, bytes((data,)), echo) | ||
+ | |||
+ | def WriteWordData(self, id : int, addr : int, data : int, echo = False) -> bool: | ||
+ | return self.WriteBlockData(id, addr, memoryview(bytes(tuple(pack('<H', data)))), echo) | ||
+ | |||
+ | def WriteLongData(self, id : int, addr : int, data : int, echo = False) -> bool: | ||
+ | return self.WriteBlockData(id, addr, memoryview(bytes(tuple(pack('<l', data)))), echo) | ||
+ | |||
+ | def ReadBlockData(self, id : int, addr : int, length : int, echo = False) -> (bytes, bool): | ||
+ | """ | ||
+ | Read instruction | ||
+ | |||
+ | parameters | ||
+ | ------------- | ||
+ | id : int | ||
+ | Target ID | ||
+ | addr : int | ||
+ | Target item address | ||
+ | length : int | ||
+ | Number of bytes to read | ||
+ | |||
+ | Returns | ||
+ | ------- | ||
+ | bytes | None | ||
+ | Data read | ||
+ | bool | ||
+ | Success or failure | ||
+ | """ | ||
+ | with self.__mutex: | ||
+ | if (id <= 252) and (length > 0): | ||
+ | _, r = self.TxPacket(id, self.INST_READ, memoryview(bytes(list(pack('<H', addr))) + bytes(list(pack('<H', length)))), echo) | ||
+ | if r: | ||
+ | dat, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | return bytes(dat[9:-2]), (id == dat[4]) | ||
+ | return None, False | ||
+ | |||
+ | def ReadByteData(self, id : int, addr : int, echo = False) -> int: | ||
+ | dat, r = self.ReadBlockData(id, addr, 1, echo) | ||
+ | if r: | ||
+ | return int(dat[0]) | ||
+ | return None | ||
+ | |||
+ | def ReadWordData(self, id : int, addr : int, echo = False) -> int: | ||
+ | dat, r = self.ReadBlockData(id, addr, 2, echo) | ||
+ | if r: | ||
+ | return unpack('<H', dat[0:2])[0] | ||
+ | return None | ||
+ | |||
+ | def ReadLongData(self, id : int, addr : int, echo = False) -> int: | ||
+ | dat, r = self.ReadBlockData(id, addr, 4, echo) | ||
+ | if r: | ||
+ | return unpack('<l', dat[0:4])[0] | ||
+ | return None | ||
+ | |||
+ | def WriteSyncData(self, addr : int, length : int, id_datas : (TSyncW), echo = False) -> bool: | ||
+ | with self.__mutex: | ||
+ | param = bytes(pack('<H', addr)) + bytes(pack('<H', length)) | ||
+ | for d in id_datas: | ||
+ | param += bytes((d.id,)) + d.data | ||
+ | _, r = self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_WRITE, memoryview(param), echo) | ||
+ | del param | ||
+ | return r | ||
+ | |||
+ | def ReadSyncData(self, addr : int, length : int, ids : (int), echo = False) -> tuple: | ||
+ | """ | ||
+ | Sync Read instruction | ||
+ | |||
+ | parameters | ||
+ | ------------- | ||
+ | addr : int | ||
+ | Target item address | ||
+ | length : int | ||
+ | Number of bytes to read | ||
+ | ids : Tuple[int] | ||
+ | Data to be written | ||
+ | |||
+ | Returns | ||
+ | ------- | ||
+ | bytes | None | ||
+ | Data read | ||
+ | bool | ||
+ | Success or failure | ||
+ | """ | ||
+ | with self.__mutex: | ||
+ | result = () | ||
+ | _, r = self.TxPacket(self.BROADCASTING_ID, self.INST_SYNC_READ, memoryview(bytes(pack('<H', addr)) + bytes(pack('<H', length)) + bytes(ids)), echo) | ||
+ | if r: | ||
+ | for id in ids: | ||
+ | dat, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | result += (id, bytes(dat[9:9 + length])), | ||
+ | else: | ||
+ | result += (id, bytes([])), | ||
+ | return result | ||
+ | return None | ||
+ | |||
+ | def WriteBulkData(self, data : (TBulkW), echo = False) -> bool: | ||
+ | with self.__mutex: | ||
+ | param = bytes() | ||
+ | for d in data: | ||
+ | param += pack('B', d.id) + pack('<H', d.addr) + pack('<H', len(d.data)) + d.data | ||
+ | _, r = self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_WRITE, memoryview(param), echo) | ||
+ | del param | ||
+ | return r | ||
+ | |||
+ | def ReadBulkData(self, data:(TBulkR), echo = False) -> tuple: | ||
+ | with self.__mutex: | ||
+ | result = () | ||
+ | param = bytes() | ||
+ | for d in data: | ||
+ | param += bytes(pack('B', d.id)) + bytes(pack('<H', d.addr)) + bytes(pack('<H', d.length)) | ||
+ | _, r = self.TxPacket(self.BROADCASTING_ID, self.INST_BULK_READ, memoryview(param), echo) | ||
+ | del param | ||
+ | if r: | ||
+ | for d in data: | ||
+ | rxd, r = self.RxPacket(echo) | ||
+ | if r: | ||
+ | if(d.id == rxd[4]): | ||
+ | result += (d.id, bytes(rxd[9:-2])), | ||
+ | else: | ||
+ | result += (d.id, bytes([])), | ||
+ | else: | ||
+ | result += (d.id, bytes([])), | ||
+ | return result | ||
+ | |||
+ | def Ping(self, id : int, echo = False) -> bool: | ||
+ | with self.__mutex: | ||
+ | _, r = self.TxPacket(id, self.INST_PING, bytes(), echo) | ||
+ | if r: | ||
+ | rxd, r = self.RxPacket(echo) | ||
+ | return r and (id == rxd[4]) | ||
+ | return False | ||
+ | |||
+ | def Reset(self, id : int, echo = False) -> bool: | ||
+ | with self.__mutex: | ||
+ | _, r = self.TxPacket(id, self.INST_RESET, bytes(0xff), echo) | ||
+ | if r: | ||
+ | dat, r = self.RxPacket(echo) | ||
+ | return r and (id == dat[4]) and ((dat[8] & 0x7f) == 0) | ||
+ | return False | ||
+ | |||
+ | def Reboot(self, id : int, echo = False) -> bool: | ||
+ | with self.__mutex: | ||
+ | _, r = self.TxPacket(id, self.INST_REBOOT, bytes(), echo) | ||
+ | if r: | ||
+ | dat, r = self.RxPacket(echo) | ||
+ | return r and (id == dat[4]) and ((dat[8] & 0x7f) == 0) | ||
+ | return False | ||
+ | |||
+ | def B2B(self, d): | ||
+ | if isinstance(d, list) or isinstance(d, tuple): | ||
+ | r = () | ||
+ | for d in d: | ||
+ | r += tuple(pack('<B', d)) | ||
+ | return bytes(r) | ||
+ | else: | ||
+ | return bytes(pack('<B', d)) | ||
+ | |||
+ | def W2B(self, d): | ||
+ | if isinstance(d, list) or isinstance(d, tuple): | ||
+ | r = () | ||
+ | for d in d: | ||
+ | r += tuple(pack('<H', d)) | ||
+ | return bytes(r) | ||
+ | else: | ||
+ | return bytes(pack('<H', d)) | ||
+ | |||
+ | def L2B(self, d): | ||
+ | if isinstance(d, list) or isinstance(d, tuple): | ||
+ | r = () | ||
+ | for d in d: | ||
+ | r += tuple(pack('<I', d)) | ||
+ | return bytes(r) | ||
+ | else: | ||
+ | return bytes(pack('<I', d)) | ||
+ | |||
+ | if __name__ == "__main__": | ||
+ | from contextlib import contextmanager | ||
+ | from threading import Thread | ||
+ | import time, gc, psutil, os | ||
+ | |||
+ | |||
+ | #dx = dx2lib('/dev/ttySC2', 1000000) | ||
+ | dx = dx2lib('\\\\.\\COM12', 3000000) | ||
+ | echo = False | ||
+ | ID = 1 | ||
+ | fin = False | ||
+ | |||
+ | @contextmanager | ||
+ | def stopwatch(): | ||
+ | start_time = time.time() | ||
+ | yield | ||
+ | print('..proc time={:.1f}ms {}'.format((time.time() - start_time)*1000, psutil.Process(os.getpid()).memory_info().rss)) | ||
+ | |||
+ | def func1(): | ||
+ | global dx, ID, fin, echo | ||
+ | |||
+ | try: | ||
+ | ''' | ||
+ | """ reset dxl """ | ||
+ | with stopwatch(): | ||
+ | r = dx.Reset(ID, echo) | ||
+ | print(f'Reset({ID}): {r}') | ||
+ | time.sleep(0.5) | ||
+ | ''' | ||
+ | |||
+ | """ ping dxl """ | ||
+ | print(f'Ping({ID}):', dx.Ping(ID, echo)) | ||
+ | |||
+ | """ reboot dxl """ | ||
+ | with stopwatch(): | ||
+ | r = dx.Reboot(ID, echo) | ||
+ | print(f'Reboot({ID}): {r}') | ||
+ | time.sleep(0.5) | ||
+ | |||
+ | """ basic packet proc """ | ||
+ | with stopwatch(): | ||
+ | with dx.mutex: | ||
+ | r0 = dx.TxPacket(ID, dx.INST_WRITE, (65, 0, 1)) | ||
+ | r1 = dx.RxPacket() | ||
+ | print(f'TxPacket({ID}): {r0[1]}', r0[0].hex(',')) | ||
+ | print(f'RxPacket({ID}): {r1[1]}', r1[0].hex(',')) | ||
+ | |||
+ | """ read byte item """ | ||
+ | with stopwatch(): | ||
+ | r = dx.ReadByteData(ID, 65, echo) | ||
+ | print(f'ReadByteData({ID}): {r}') | ||
+ | |||
+ | """ sync read inst. """ | ||
+ | print('ReadSyncData:') | ||
+ | d = dx.ReadSyncData(0, 4, (1, 2, 3), True) | ||
+ | for d in d: | ||
+ | print(f' ({d[0]})', d[1].hex(',')) | ||
+ | |||
+ | """ sync write inst. """ | ||
+ | time.sleep(2) | ||
+ | for i in range(20): | ||
+ | with stopwatch(): | ||
+ | dx.WriteSyncData(65, 1, (dx.TSyncW(1,dx.B2B(1)),dx.TSyncW(2,dx.B2B(1)),dx.TSyncW(3,dx.B2B(1))), echo) | ||
+ | time.sleep(0.05) | ||
+ | with stopwatch(): | ||
+ | dx.WriteSyncData(65, 1, (dx.TSyncW(1,dx.B2B(0)),dx.TSyncW(2,dx.B2B(0)),dx.TSyncW(3,dx.B2B(0))), echo) | ||
+ | time.sleep(0.05) | ||
+ | |||
+ | """ set goal position """ | ||
+ | #torque off | ||
+ | if dx.WriteByteData(ID, 64, 0, echo): | ||
+ | #multi turn | ||
+ | if dx.WriteByteData(ID, 11, 4, echo): | ||
+ | #torque on | ||
+ | if dx.WriteByteData(ID, 64, 1, echo): | ||
+ | #set goal position | ||
+ | for gp in tuple(range(2047, 2047 + 4096, 64)) + tuple(range(2047 + 4096, 2047 - 4096, -64)) + tuple(range(2047 - 4096, 2047, 64)): | ||
+ | if dx.WriteLongData(ID, 116, gp, echo): | ||
+ | #get present position | ||
+ | with stopwatch(): | ||
+ | pp = unpack('>l', pack('>l', dx.ReadLongData(ID, 132, echo)))[0] | ||
+ | print(f'{gp:6} {pp:6}') | ||
+ | time.sleep(0.05) | ||
+ | else: | ||
+ | break | ||
+ | #torque off | ||
+ | dx.WriteByteData(ID, 64, 0, echo) | ||
+ | #normal turn | ||
+ | dx.WriteByteData(ID, 11, 3, echo) | ||
+ | print('') | ||
+ | |||
+ | """ test add/remove suffixes """ | ||
+ | fffffd = array.array('B',( | ||
+ | 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, | ||
+ | 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, | ||
+ | 0xff,0xff,0xfd, 0xff,0xff,0xfd, 0xff,0xff,0xfd, | ||
+ | )) | ||
+ | with stopwatch(): | ||
+ | r = dx.WriteBlockData(ID, 634, fffffd, echo) | ||
+ | print(f'WriteBlockData({ID}): {r}') | ||
+ | with stopwatch(): | ||
+ | r = dx.ReadBlockData(ID, 634, len(fffffd), echo) | ||
+ | print(f'ReadBlockData({ID}): {r[1]}', r[0].hex(',')) | ||
+ | |||
+ | """ bulk read inst. """ | ||
+ | with stopwatch(): | ||
+ | d = dx.ReadBulkData((dx.TBulkR(1,0,10), dx.TBulkR(2,0,20), dx.TBulkR(3,0,30), dx.TBulkR(4,0,40)), echo) | ||
+ | for d in d: | ||
+ | print(f' ({d[0]},0)', d[1].hex(',')) | ||
+ | |||
+ | """ bulk write inst. """ | ||
+ | dx.WriteByteData(ID, 64, 1, echo) | ||
+ | with stopwatch(): | ||
+ | dx.WriteBulkData((dx.TBulkW(1,104, dx.L2B((0,0,0,1024))), dx.TBulkW(2,65,dx.B2B(0)), dx.TBulkW(3,65,dx.B2B(0))), echo) | ||
+ | time.sleep(0.5) | ||
+ | with stopwatch(): | ||
+ | dx.WriteBulkData((dx.TBulkW(1,104, dx.L2B(0)+dx.L2B(0)+dx.L2B(0)+dx.L2B(2048)), dx.TBulkW(2,65,dx.B2B(1)), dx.TBulkW(3,65,dx.B2B(1))), echo) | ||
+ | time.sleep(0.5) | ||
+ | with stopwatch(): | ||
+ | dx.WriteBulkData((dx.TBulkW(1,104, dx.L2B(0)+dx.L2B(0)+dx.L2B(0)+dx.L2B(1024)), dx.TBulkW(2,65,dx.B2B(0)), dx.TBulkW(3,65,dx.B2B(0))), echo) | ||
+ | time.sleep(0.5) | ||
+ | with stopwatch(): | ||
+ | dx.WriteBulkData((dx.TBulkW(1,116, dx.L2B(2048)), dx.TBulkW(2,65,dx.B2B(1)), dx.TBulkW(3,65,dx.B2B(1))), echo) | ||
+ | time.sleep(0.5) | ||
+ | with stopwatch(): | ||
+ | dx.WriteBulkData((dx.TBulkW(1,65,bytes(0)),dx.TBulkW(2,65,bytes(0)),dx.TBulkW(3,65,bytes(0))), echo) | ||
+ | dx.WriteByteData(ID, 64, 0, echo) | ||
+ | |||
+ | except Exception as ex: | ||
+ | import traceback | ||
+ | print('--- Caught Exception ---') | ||
+ | traceback.print_exc() | ||
+ | print('------------------------') | ||
+ | |||
+ | fin = True | ||
+ | |||
+ | def func2(): | ||
+ | global dx, ID, fin, echo | ||
+ | |||
+ | time.sleep(1) | ||
+ | while not fin: | ||
+ | try: | ||
+ | dx.WriteByteData(ID, 65, dx.ReadByteData(ID, 65, echo) ^ 1, echo) | ||
+ | time.sleep(0.05) | ||
+ | except Exception as ex: | ||
+ | import traceback | ||
+ | print('--- Caught Exception ---') | ||
+ | traceback.print_exc() | ||
+ | print('------------------------') | ||
+ | return | ||
+ | |||
+ | th1 = Thread(target=func1) | ||
+ | th2 = Thread(target=func2) | ||
+ | th1.start() | ||
+ | th2.start() | ||
+ | th1.join() | ||
+ | th2.join() | ||
+ | print('fin') | ||
+ | </pre> | ||
+ | }} |