Package baxter_interface :: Module analog_io
[hide private]
[frames] | no frames]

Source Code for Module baxter_interface.analog_io

  1  # Copyright (c) 2013-2014, Rethink Robotics 
  2  # All rights reserved. 
  3  # 
  4  # Redistribution and use in source and binary forms, with or without 
  5  # modification, are permitted provided that the following conditions are met: 
  6  # 
  7  # 1. Redistributions of source code must retain the above copyright notice, 
  8  #    this list of conditions and the following disclaimer. 
  9  # 2. Redistributions in binary form must reproduce the above copyright 
 10  #    notice, this list of conditions and the following disclaimer in the 
 11  #    documentation and/or other materials provided with the distribution. 
 12  # 3. Neither the name of the Rethink Robotics nor the names of its 
 13  #    contributors may be used to endorse or promote products derived from 
 14  #    this software without specific prior written permission. 
 15  # 
 16  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
 17  # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 18  # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 19  # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 20  # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 21  # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 22  # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 23  # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 24  # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 25  # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 26  # POSSIBILITY OF SUCH DAMAGE. 
 27   
 28  import errno 
 29   
 30  import rospy 
 31   
 32  import baxter_dataflow 
 33   
 34  from baxter_core_msgs.msg import ( 
 35      AnalogIOState, 
 36      AnalogOutputCommand, 
 37  ) 
 38   
 39   
40 -class AnalogIO(object):
41 """ 42 Interface class for a simple Analog Input and/or Output on the 43 Baxter robot. 44 45 Input 46 - read input state 47 Output 48 - set new output state 49 - read current output state 50 """
51 - def __init__(self, component_id):
52 """ 53 Constructor. 54 55 @param component_id: unique id of analog component 56 """ 57 self._id = component_id 58 self._component_type = 'analog_io' 59 self._is_output = False 60 61 self._state = dict() 62 63 type_ns = '/robot/' + self._component_type 64 topic_base = type_ns + '/' + self._id 65 66 self._sub_state = rospy.Subscriber( 67 topic_base + '/state', 68 AnalogIOState, 69 self._on_io_state) 70 71 baxter_dataflow.wait_for( 72 lambda: len(self._state.keys()) != 0, 73 timeout=2.0, 74 timeout_msg="Failed to get current analog_io state from %s" \ 75 % (topic_base,), 76 ) 77 78 # check if output-capable before creating publisher 79 if self._is_output: 80 self._pub_output = rospy.Publisher( 81 type_ns + '/command', 82 AnalogOutputCommand)
83
84 - def _on_io_state(self, msg):
85 """ 86 Updates the internally stored state of the Analog Input/Output. 87 """ 88 self._is_output = not msg.isInputOnly 89 self._state['value'] = msg.value
90
91 - def state(self):
92 """ 93 Return the latest value of the Analog Input/Output. 94 """ 95 return self._state['value']
96
97 - def is_output(self):
98 """ 99 Accessor to check if IO is capable of output. 100 """ 101 return self._is_output
102
103 - def set_output(self, value, timeout=2.0):
104 """ 105 Control the state of the Analog Output. 106 107 @type value: uint16 108 @param value: new state of the Output. 109 @type timeout: float 110 @param timeout: Seconds to wait for the io to reflect command. 111 If 0, just command once and return. [0] 112 """ 113 if not self._is_output: 114 raise IOError(errno.EACCES, "Component is not an output [%s: %s]" % 115 (self._component_type, self._id)) 116 cmd = AnalogOutputCommand() 117 cmd.name = self._id 118 cmd.value = value 119 self._pub_output.publish(cmd) 120 121 if not timeout == 0: 122 baxter_dataflow.wait_for( 123 test=lambda: self.state() == value, 124 timeout=timeout, 125 rate=100, 126 timeout_msg=("Failed to command analog io to: %d" % (value,)), 127 body=lambda: self._pub_output.publish(cmd) 128 )
129