Week 6 Object Detection - tom-howard/com2009 GitHub Wiki
Week 6 OpenCV Basics
Object Detection Node
The Code
Copy all of the code below into your object_detection.py file. Then, review the explainer below to understand how this all works.
#!/usr/bin/env python3
import rospy
from pathlib import Path
import cv2
from cv_bridge import CvBridge, CvBridgeError
from sensor_msgs.msg import Image
node_name = "object_detection_node"
rospy.init_node(node_name)
print(f"Launched the '{node_name}' node. Currently waiting for an image...")
rate = rospy.Rate(5)
base_image_path = Path("/home/student/myrosdata/week6_images")
base_image_path.mkdir(parents=True, exist_ok=True)
cvbridge_interface = CvBridge()
waiting_for_image = True
def show_and_save_image(img, img_name):
full_image_path = base_image_path.joinpath(f"{img_name}.jpg")
cv2.imshow(img_name, img)
cv2.waitKey(0)
cv2.imwrite(str(full_image_path), img)
print(f"Saved an image to '{full_image_path}'\n"
f"image dims = {img.shape[0]}x{img.shape[1]}px\n"
f"file size = {full_image_path.stat().st_size} bytes")
def camera_cb(img_data):
global waiting_for_image
try:
cv_img = cvbridge_interface.imgmsg_to_cv2(img_data, desired_encoding="bgr8")
except CvBridgeError as e:
print(e)
if waiting_for_image == True:
height, width, channels = cv_img.shape
print(f"Obtained an image of height {height}px and width {width}px.")
show_and_save_image(cv_img, img_name = "step1_original")
waiting_for_image = False
rospy.Subscriber("/camera/rgb/image_raw", Image, camera_cb)
while waiting_for_image:
rate.sleep()
cv2.destroyAllWindows()
The Code Explained:
Do I really need to say it by now?... DFTS!
First, let's have a look at the Python libraries that we are importing for this node:
import rospy
from pathlib import Path
import cv2
from cv_bridge import CvBridge, CvBridgeError
from sensor_msgs.msg import Image
Of course, we always need to import rospy so that Python can work with ROS. What we are also importing here is the Python Path class from the pathlib module, which will be used to do a few file operations. Then, we are importing the OpenCV library for Python (remember the Python API that we talked about earlier), which is called cv2, and also that ROS-to-OpenCV bridge interface that we talked about earlier too: cv_bridge. From this, we are importing the CvBridge and CvBridgeError classes from the cv_bridge library specifically.
Finally, we need to subscribe to an image topic in order to obtain the messages being published to it. We've already identified the type of message that is published to the /camera/rgb/image_raw topic that we are interested in, so we import that message type here, from the sensor_msgs package, so that we can build a subscriber to the topic (using this message type) later.
To start with we are doing a number of initialisations that should be very familiar to you by now:
- Give our node a name.
- Initialise the node (i.e. register it on the ROS network using
rospy.init_node()). - Specify a rate at which we want the node to run:
node_name = "object_detection_node"
rospy.init_node(node_name)
print(f"Launched the '{node_name}' node. Currently waiting for an image...")
rate = rospy.Rate(5)
Then, we define a filesystem location that we will use to save images to. We use the Pathlib Path.mkdir() method to create this directory, if it doesn't exist already.
base_image_path = Path("/home/student/myrosdata/week6_images")
base_image_path.mkdir(parents=True, exist_ok=True)
Next, we are creating an instance of the CvBridge class that we imported earlier, and which we will use later on to convert ROS image data into a format that OpenCV can use. We're then also creating a flag to indicate whether the node has obtained an image yet or not. For this exercise, we only want to obtain a single image, so we will set the waiting_for_image flag to False in our camera callback function once an image has been obtained, to avoid capturing any more:
cvbridge_interface = CvBridge()
waiting_for_image = True
Next, we're creating a convenience function called show_and_save_image(), within which we contain some image operations that we will need to repeat multiple times (this will become apparent later in this exercise when you start adding further functionality to the camera callback function):
def show_and_save_image(img, img_name):
...
We'll come back to this in a minute, but for now let's skip past this function and on to the next one:
def camera_cb(img_data):
...
Here, we are defining a callback function for a rospy.Subscriber(). Let's skip past this function for now as well, and first keep moving down the code to where this subscriber is actually defined:
rospy.Subscriber("/camera/rgb/image_raw", Image, camera_cb)
while waiting_for_image:
rate.sleep()
cv2.destroyAllWindows()
This is in fact the end of the code, but establishes the main behaviour that the object_detection.py node will enact:
- As discussed above, we create subscriber to the
/camera/rgb/image_rawtopic first, telling therospy.Subscriber()function the message type that is used by this topic (sensor_msgs/Image- as imported above), and we point it to a callback function (camera_cb, in this case), to define the processes that should be performed every time a message is obtained on this topic (in this case, the messages will be our camera images) - Then we go into a
whileloop, and use therate.sleep()method to maintain this loop at a speed of 5 Hz (as defined earlier) whilst checking thewaiting_for_imageflag to see if an image has been obtained by our subscriber yet. We only really want to obtain a single image here, so once thewaiting_for_imageflag changes toFalse, thewhileloop will stop. - Finally, before terminating the node, we're running the
cv2.destroyAllWindows()method to make sure that any OpenCV image pop-up windows that may still be active or in memory are destroyed before the node shuts down.
Let's then go back to the camera callback function, and see what we are telling the node to do once an image message is received:
def camera_cb(img_data):
global waiting_for_image
try:
cv_img = cvbridge_interface.imgmsg_to_cv2(img_data, desired_encoding="bgr8")
except CvBridgeError as e:
print(e)
if waiting_for_image == True:
height, width, channels = cv_img.shape
print(f"Obtained an image of height {height}px and width {width}px.")
show_and_save_image(cv_img, img_name = "step1_original")
waiting_for_image = False
Let's talk through this step-by-step:
-
First we want to make changes to the
waiting_for_imageflag inside this function, but make sure that these changes are also observed outside the function too (i.e. by thewhileloop that we talked about above). So, we change the scope of the variable to global here using theglobalstatement. -
Next we're using the CvBridge interface to take our ROS image data and convert it to a format that OpenCV will be able to understand. In this case we are specifying conversion (or "encoding") to an 8-bit BGR (Blue-Green-Red) image format:
"bgr8".We contain this within a
try-exceptblock though, which is the recommended procedure when doing this. Here we try to convert an image using the desired encoding, and if aCvBridgeErroris raised then we print this error to the terminal. Should this happen, this particular execution of the camera callback function will stop. -
Then we check the
waiting_for_imageflag to see if this is the first image that has been received by the node. If so, then we:- Obtain the height and width of the image (in pixels), as well as the number of colour channels,
- Print the image dimensions to the terminal,
- Pass the image data to the
show_and_save_image()function, which we mentioned earlier and which is explained in more detail below. Here we also pass a descriptive name for the image to this function too (img_name). - Finally, we set the
waiting_for_imageflag toFalseso that we only ever perform these processing steps once (we only want to capture one image remember!). This will then trigger the mainwhileloop to stop, thus causing the overall execution of the node to stop too.
Finally, let's talk about the show_and_save_image() function, and see what is happening inside this:
def show_and_save_image(img, img_name):
full_image_path = base_image_path.joinpath(f"{img_name}.jpg")
cv2.imshow(img_name, img)
cv2.waitKey(0)
cv2.imwrite(str(full_image_path), img)
print(f"Saved an image to '{full_image_path}'\n"
f"image dims = {img.shape[0]}x{img.shape[1]}px\n"
f"file size = {full_image_path.stat().st_size} bytes")
Essentially, what we are doing here is:
-
Constructing a full file path for an image (using the
Path.joinpath()method) from:- The
base_image_paththat we defined earlier and - An image name that is passed into this function via the
img_nameargument.
We'll use this to save the file to our filesystem later on.
- The
-
We then use the
cv2.imshow()function to display the actual image in a pop-up window:- The image data is passed into the function via the
imgargument, - We need to give the pop-up window a name, so in this case we are using the
img_nameargument that has also been passed into the function.
- The image data is passed into the function via the
-
Next, we're using the
cv2.waitKey()function. We're supplying a value of0here, which tells this function to wait indefinitely before allowing the rest of our code to execute. If we had supplied a value here (say:1) then the function would simply wait 1 millisecond and then close the pop-up window down. In our case however, we want some time to actually look at the image and then close the window down ourselves, manually. Once we do this, thecv2.waitKey()function allows the execution of our code to continue... -
Then we are using the
cv2.imwrite()function to save the image to a.jpgfile. We're supplying thefull_image_paththat was created above, and also the actual image data (img) so that the function knows what image we want to save. -
And to finish off, we're printing a message to the terminal to inform us of (a) where the image has been saved to, (b) how big the image was (in terms of its pixel dimensions) and (c) how big the image file is (in bytes).