# Piecewise Interpolation in TensorFlow

January 29, 2022

NumPy's `interp`

is a handy function for generating an array from a piecewise linear
mapping defined by a set of control points. For example, here is a linear plot
of today's U.S. Treasury yield curve:

```
import typing
import hypothesis
import numpy as np
import scipy.signal
import tensorflow as tf
from hypothesis.extra import numpy as hypnum
from IPython.display import Audio, display
from matplotlib import pyplot as plt
from tabulate import tabulate
π = np.pi
```

```
xs = [1, 2, 3, 6, 12, 24, 36, 60, 84, 120, 240, 360]
ys = [0.05, 0.09, 0.19, 0.39, 0.58, 0.99, 1.25, 1.53, 1.69, 1.75, 2.15, 2.10]
x = np.linspace(min(xs), max(xs), 100)
y = np.interp(x, xs, ys)
```

This function can be useful in ML applications for thresholding ROC curves,
decaying learning rates, and performing other linear calculations.
Unfortunately, there is currently no vectorized implementation of
`np.interp`

in TensorFlow or PyTorch, but it is straightforward to implement
a piecewise interpolator using existing primitives.

### Vectorized Interpolation

For eager-mode TensorFlow/PyTorch applications, it is easy enough to call
`np.interp`

directly, and NumPy will do the right thing with tensor inputs.
However, these calls cannot be made in graph contexts, such as `tf.function`

or TorchScript. It is also possible to use `tf.numpy_function`

to wrap
`np.interp`

, but doing so limits parallelism, since the TensorFlow runtime
must call back into Python.

Instead, the following function can be used as a fully-vectorized analog of
`np.interp`

for TensorFlow that is compatible with graph mode.

```
def tf_interp(x: typing.Any, xs: typing.Any, ys: typing.Any) -> tf.Tensor:
# determine the output data type
ys = tf.convert_to_tensor(ys)
dtype = ys.dtype
# normalize data types
ys = tf.cast(ys, tf.float64)
xs = tf.cast(xs, tf.float64)
x = tf.cast(x, tf.float64)
# pad control points for extrapolation
xs = tf.concat([[xs.dtype.min], xs, [xs.dtype.max]], axis=0)
ys = tf.concat([ys[:1], ys, ys[-1:]], axis=0)
# compute slopes, pad at the edges to flatten
ms = (ys[1:] - ys[:-1]) / (xs[1:] - xs[:-1])
ms = tf.pad(ms[:-1], [(1, 1)])
# solve for intercepts
bs = ys - ms*xs
# search for the line parameters at each input data point
# create a grid of the inputs and piece breakpoints for thresholding
# rely on argmax stopping on the first true when there are duplicates,
# which gives us an index into the parameter vectors
i = tf.math.argmax(xs[..., tf.newaxis, :] > x[..., tf.newaxis], axis=-1)
m = tf.gather(ms, i, axis=-1)
b = tf.gather(bs, i, axis=-1)
# apply the linear mapping at each input data point
y = m*x + b
return tf.cast(tf.reshape(y, tf.shape(x)), dtype)
```

This function first converts all inputs to `float64`

(like NumPy), and then
pads the control points to cover the whole domain of the mapping. The
slope/intercept values of the linear parts are then solved directly, using the
familiar `y = mx + b`

equation.

At this point, the implementation diverges from the NumPy version. For each
input `x`

, NumPy locates the corresponding line equation using a binary
search. In the vectorized implementation, we construct the full grid of
`[xs, x]`

values and locate the index of the first value in `xs`

that is
greater than the input x using the `argmax`

function. After that, we simply
apply the linear equation at that index to compute the result.

To see how this works, consider a lookup of the `x`

vector `[5, 65]`

in the
treasury yield example from above. The `xs[..., tf.newaxis, :] > x[..., tf.newaxis]`

comparison broadcasts the vectors into the following Boolean matrix:

1 | 2 | 3 | 6 | 12 | 24 | 36 | 60 | 84 | 120 | 240 | 360 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|

5 | 0 | 0 | 0 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |

65 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 1 | 1 | 1 |

The `argmax`

function has a convention to return the index of the first match
if the maximum value is duplicated in the array, which results in an index of
3 for an input of 5 and 8 for 65. These indexes are then used to select the
appropriate linear parameters for interpolation.

The tradeoff in this approach is that it creates a tensor containing `size(x) * size(xs)`

elements to perform the lookup, instead of using NumPy's CPU-parallel binary
search. This intermediate tensor can become large if there are a lot of
points to interpolate or control points. However, the lookup is performed in
effectively constant time on a GPU, which can be much faster than NumPy when
there are a lot of points to interpolate.

#### Testing

How do we know it works? Since we have a baseline implementation in NumPy, we can simply compute our outputs for the same inputs and compare. The Hypothesis property testing framework lets us do this without hard-coding any example arrays.

```
@hypothesis.given(
x=hypnum.arrays(
dtype=hypnum.floating_dtypes(),
shape=hypnum.array_shapes(),
elements=dict(min_value=-1, max_value=1)
),
xs=hypnum.arrays(
dtype=hypnum.floating_dtypes(),
shape=hypnum.array_shapes(max_dims=1),
unique=True,
elements=dict(min_value=-1, max_value=1)
),
ys=hypnum.arrays(
dtype=hypnum.floating_dtypes(),
shape=hypnum.array_shapes(max_dims=1),
elements=dict(min_value=-1, max_value=1)
),
)
def test_interp(x, xs, ys):
xs, ys = xs[:len(ys)], ys[:len(xs)]
xs = np.sort(xs)
expect = np.interp(x, xs, ys).astype(ys.dtype)
actual = tf_interp(x, xs, ys).numpy()
assert np.allclose(expect, actual), f"expect={expect} actual={actual}"
test_interp()
```

The test first sorts the interpolation domain (`xs`

) and ensures it contains
unique values, which is an assumption made by all of these `interp`

functions.
We also restrict the ranges of all values to `[-1, 1]`

, in order to avoid
rounding/overflow inconsistencies between the frameworks.

### Application: Piecewise Learning Rate Schedule

As an example, we'll implement a piecewise linear learning rate decay
for TensorFlow. This learning rate scheduler can be used with any `tf.keras`

optimizer. It is configured with a sorted list of training steps and
corresponding learning rates. The learning rate returned to the optimizer will
be interpolated based on this schedule.

```
class PiecewiseLinearSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, steps: typing.List[int], rates: typing.List[int]):
if steps != sorted(steps):
raise ValueError("steps should be listed in ascending order")
self._steps = steps
self._rates = rates
def __call__(self, step: tf.Tensor) -> tf.Tensor:
return tf_interp(step, self._steps, self._rates)
learning_rate = PiecewiseLinearSchedule([100, 500, 1000], [3e-4, 5e-5, 1e-5])
```

For an example usage, we'll train a simple DNN logistic regression model on
the XOR function, a classic "Hello World" problem for neural networks. Note
that the training step function `train`

uses the `tf.function`

decorator for
graph mode training, which allows us to take advantage of the `tf_interp`

function we implemented above.

```
model = tf.keras.Sequential(
[
tf.keras.layers.Dense(units=256, activation="relu"),
tf.keras.layers.Dense(units=1)
]
)
criterion = tf.keras.losses.BinaryCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
@tf.function
def train(inputs: tf.Tensor, targets: tf.Tensor) -> None:
with tf.GradientTape() as tape:
outputs = model(inputs, training=True)
loss = criterion(targets, outputs)
optimizer.minimize(loss, model.trainable_variables, tape=tape)
# fit on random batches of binary inputs and XOR targets
for _ in range(1000):
x = tf.random.uniform([32, 2]) < 0.5
y = x[:, :1] ^ x[:, 1:]
train(x, y)
```

We can then evaluate the trained model on the truth table to verify it has learned XOR.

```
x = np.array([[0, 0], [0, 1], [1, 0], [1, 1]])
y = tf.nn.sigmoid(model(x)).numpy()[..., 0]
for (a, b), c in zip(x, y):
print(f"{a:^} ⊕ {b:^} = {c > 0.5:^}")
```

```
0 ⊕ 0 = 0
0 ⊕ 1 = 1
1 ⊕ 0 = 1
1 ⊕ 1 = 0
```

### Application: ADSR Envelope Generator

Piecewise linear functions are also commonly used to model ADSR envelopes for music synthesis. For example, an envelope can be used to control the timbre of an instrument as keys are pressed, held, and released, by shaping the amplitude of the audio signal.

The **attack**, **decay**, and **release** parameters specify the amount of
time for the envelope to transition between stages, corresponding to the
`xs`

parameter in the interpolator. The **sustain** parameter indicates
the gain that the envelope should decay to and hold while the instrument's
key remains pressed.

The envelope generator below demonstrates how the ADSR parameters correspond to piecewise interpolator control points. The envelope begins at 0 gain, rises through the attack stage to full gain, decays to the sustain level, and then decays back to 0 in the release stage. All time values are represented in seconds.

```
def adsr(attack, decay, sustain, release):
def gen(time):
hold = max(time[-1] - (attack + decay + release), 0)
xs = np.cumsum([0, attack, decay, hold, release])
ys = [0, 1, sustain, sustain, 0]
return tf_interp(time, xs, ys)
return gen
envelope = adsr(attack=0.1, decay=0.05, sustain=0.6, release=0.15)
```

We can now use this envelope to shape some audio samples. To synthesize a
note, we first generate a triangle wave (using `scipy.signal.sawtooth`

with
a `width`

of 0.5) of the desired frequency and duration. Then we just multiply
the signal by the envelope to modulate its amplitude.

```
# audio sample rate
FS = 44100
def note(frequency, duration, envelope):
t = np.linspace(0, duration, int(duration*FS))
e = envelope(t)
y = scipy.signal.sawtooth(2*π*frequency*t, width=0.5)
return e * y
def rest(duration):
return np.zeros(int(duration*FS))
Audio(note(261.63, 0.5, envelope), rate=44100)
```

Now we can play a simple melody by concatenating notes/rests into a vector
of audio samples. In the non-enveloped clip below, you can hear the clicks
between notes where the signal boundaries are discontinuous. The **attack**
and **release** stages fade smoothly between notes in the enveloped clip.

```
# quarter note duration
QUARTER = 0.3
# note frequencies
REST = 0
G3 = 196.0
A3 = 220.0
B3 = 246.94
C4 = 261.63
D4 = 293.66
E4 = 329.63
F4 = 349.23
G4 = 392.00
A4 = 440.00
B4 = 493.88
C5 = 523.25
# a simple tune
NOTES = [
(A3, 1), (C4, 1), (E4, 1), (REST, 1),
(D4, 1), (C4, 1), (D4, 1), (E4, 1), (REST, 1),
(E4, 1), (D4, 1), (C4, 1), (B3, 1), (C4, 2), (A3, 1), (REST, 1),
(E4, 1), (D4, 1), (C4, 1), (B3, 1), (C4, 2), (A3, 1), (REST, 1),
(G3, 1), (A3, 1), (C4, 1)
]
def melody(notes, envelope):
ys = [note(f, d*QUARTER, envelope) if f != REST
else rest(d*QUARTER)
for f, d in NOTES]
return np.hstack(ys)
# a no-op rectangular envelope
y = melody(NOTES, lambda _: 1)
display(Audio(y, rate=FS))
# ADSR envelope from above
y = melody(NOTES, envelope)
display(Audio(y, rate=FS))
```

If you have any feedback or comments, please reply to this post on Twitter.