• Home
  • About
    • Ping Pong Robotics photo

      Ping Pong Robotics

      Ping Pong Ping Pong

    • Learn More
  • Team
  • Design
  • Implementation
  • Result & Conclusion
  • Future Work & Plan
  • Code

Curve Fitting code

12 Dec 2023

Reading time ~5 minutes

Here’s our python implementation of curve fitting algorithm for live updates process:

from visual_servo import VisualServo
import numpy as np
from data_segmentation.src.data_structure.processed_frame import ProcessedFrame
from data_segmentation.src.utils import split_posthit_segment_on_bounce
from typing import List
from copy import deepcopy
import matplotlib.pyplot as plt


class VisualServoCurveFitting(VisualServo):
    """
    Visual servo model that computes trajectory by interpolating parabolas.
    If the ball has bounced, interpolates a parabola from the post-bounce trajectory
    and intersects it with the paddle plane.
    Otherwise, interpolates a parabola from the pre-bounce trajectory and compute a 
    post-bounce parabola through linear regression on x slope, y slope, height, and b 
    coefficient of the pre-bounce parabola.
    """

    # Coefficients for linear regression prediction model
    # These coefficients describe affine functions that predict how quantities update post-bounce
    # E.g., x_predictor_coeffs describes how dx/dt changes after the bounce
    # So mx2 (post-bounce slope) = ~0.6 * mx1 (pre-bounce slope) + 0.004
    # y_predictor_coeffs describes how dy/dt changes after the bounce
    # b_predictor_coeffs describes how the coefficient b in z = at^2 + bt + c changes after the bounce
    # and h_predictor_coeffs describes how the height of the curve changes after the bounce
    # These 4 quantities are sufficient to compute the 7-parameter curve because we know that the
    # pre-bounce and post-bounce curves intersect at the point of the bounce, which gives us 3 data points (x, y, z of bounce point)
    x_predictor_coeffs = np.array([0.64621385, 0.00466148])
    y_predictor_coeffs = np.array([0.61873083, 0.05157728])
    b_predictor_coeffs = np.array([1.68778697, 6.63926297])
    h_predictor_coeffs = np.array([0.58883477, 6.97740610])

    def __init__(
            self, 
            paddle_y, 
            paddle_start_position, 
            ball_z_offset, 
            min_window_size_of_pre_bounce_curve, 
            min_window_size_of_post_bounce_curve
        ) -> None:
        self.paddle_y = paddle_y
        self.ball_z_offset = ball_z_offset
        self.min_window_size_pre = min_window_size_of_pre_bounce_curve
        self.min_window_size_post = min_window_size_of_post_bounce_curve
        self.prev_target_position = paddle_start_position

    def get_coefficients(self, segment_frames, return_error=False):
        """
        Interpolate a curve based on the positions of the ball.
        The curve is linear in x and y and quadratic in z.
        
        """
        ball_traj = np.array([np.array([f.ball_position[0], f.ball_position[1], f.ball_position[2] + self.ball_z_offset, t]) for t, f in enumerate(segment_frames)])

        xs, ys, zs, ts = ball_traj[:, 0], ball_traj[:, 1], ball_traj[:, 2], ball_traj[:, 3]
        lin_A = np.vstack([ts, np.ones(len(ts))]).T
        x_coeffs, x_error, _, _ = np.linalg.lstsq(lin_A, xs, rcond=None)
        m_x, c_x = x_coeffs
        y_coeffs, y_error, _, _ = np.linalg.lstsq(lin_A, ys, rcond=None)
        m_y, c_y = y_coeffs

        quad_A = np.vstack([np.square(ts), ts, np.ones(len(ts))]).T
        z_coeffs, z_error, _, _ = np.linalg.lstsq(quad_A, zs, rcond=None)
        a_z, b_z, c_z = z_coeffs

        coeffs = m_x, c_x, m_y, c_y, a_z, b_z, c_z
        if return_error:
            error = (x_error + y_error + z_error)[0]
            return coeffs , error
        else:
            return coeffs

    def affine_predictor(self, old, coeffs):
        """
        Affine function described by an array of two real numbers.
        The first entry of coeffs is the linear coefficient and the second entry is the constant term.
        E.g., if coeffs = [3 5], then this returns 3 * old + 5 as the predicted value.
        """
        return coeffs[0] * old + coeffs[1]

    def get_z_coeffs_from_b_h_t0(self, b, h, t0):
        """
        Computes the coefficients of the curve z = at^2 + bt + c where
        b, the height of the curve h, and the smallest root of the parabola t0, are all given.
        Note that a satisfies the quadratic equation (t0 a)^2 + (h + b t0) a + b^2 / 4 = 0.
        If this equation has multiple solutions, we choose the a that has t0 as the smaller root of the parabola.
        And c can be computed from the equaiton c = -t0 (b + a t0).
        """
        x, y, z = t0 ** 2, h + t0 * b, (b ** 2) / 4
        a_values = np.roots([x, y, z])
        if len(a_values) == 0:
            return None
        elif len(a_values) == 1:
            a = a_values[0]
            return a, b, c
        else:
            a1, a2 = a_values
            if -b / (2 * a1) > t0:
                a = a1
            else:
                a = a2
        c = -t0 * (b + a * t0)
        return a, b, c

    def get_x_coeffs_from_m_c_t0(self, m, c, t0):
        """
        Computes the coefficeints of the linear function x = mt + c.
        """
        mx = self.affine_predictor(m, self.x_predictor_coeffs)
        return mx, -mx * t0 + m * t0 + c

    def get_y_coeffs_from_m_c_t0(self, m, c, t0):
        """
        Computes the coefficients of the linear function y = mt + c
        """
        my = self.affine_predictor(m, self.y_predictor_coeffs)
        return my, -my * t0 + m * t0 + c

    def get_new_coeffs_from_b_h(self, m_x, c_x, m_y, c_y, a_z, b_z, c_z):
        """
        Computes the coefficeints of a post-bounce curve linear in x and y, quadratic in z,
        given the coefficients of the pre-bounce curve.
        Uses affine predictors in mx, my, height of z parabola, and bz.
        Note that the height of a parabola is defined by h = (-b^2 / 4a) + c.
        """
        t0 = np.max(np.roots([a_z, b_z, c_z]))
        mx1, cx1 = self.get_x_coeffs_from_m_c_t0(m_x, c_x, t0)
        my1, cy1 = self.get_y_coeffs_from_m_c_t0(m_y, c_y, t0)
        b = self.affine_predictor(b_z, self.b_predictor_coeffs)
        h = self.affine_predictor((-b_z ** 2) / (4 * a_z) + c_z, self.h_predictor_coeffs)
        a, b, c = self.get_z_coeffs_from_b_h_t0(b, h, t0)
        return mx1, cx1, my1, cy1, a, b, c

    def get_target_from_post_bounce_curve(self, mx, cx, my, cy, az, bz, cz):
        """
        Interesects the given curve with the plane y = self.paddle_y.
        """
        t0 = (self.paddle_y - cy) / my
        x = mx * t0 + cx
        z = az * t0 * t0 + bz * t0 + cz
        return np.array([x, self.paddle_y, z])

    def predict_goal_state(self, posthit_segment_frames: List[ProcessedFrame]) -> np.ndarray:
        """
        Predicts the goal state of the ball by interpolating a parabola based on the given points,
        and using the linear regressor defined above if the ball has not yet bounced.
        """
        pre_bounce_frames, post_bounce_frames = split_posthit_segment_on_bounce(posthit_segment_frames)
        if post_bounce_frames:
            if len(post_bounce_frames) < self.min_window_size_post:
                return self.prev_target_position
            post_bounce_coeffs = self.get_coefficients(post_bounce_frames)
        else:
            if len(pre_bounce_frames) < self.min_window_size_pre:
                return self.prev_target_position
            pre_bounce_coeffs = self.get_coefficients(pre_bounce_frames)
            post_bounce_coeffs = self.get_new_coeffs_from_b_h(*pre_bounce_coeffs)
        target = self.get_target_from_post_bounce_curve(*post_bounce_coeffs)
        self.prev_target_position = target
        return target


Share Tweet +1