# Source code for datascience.util

"""Utility functions"""

__all__ = ['make_array', 'percentile', 'plot_cdf_area', 'plot_normal_cdf',
'table_apply', 'proportions_from_distribution',
'sample_proportions', 'minimize', 'is_non_string_iterable']

import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
from scipy import stats
from scipy import optimize
import functools
import math
import collections

# Change matplotlib formatting. TODO incorporate into a style?
plt.rcParams['patch.force_edgecolor'] = True

[docs]
def make_array(*elements):
"""Returns an array containing all the arguments passed to this function.
A simple way to make an array with a few elements.

As with any array, all arguments should have the same type.

Args:
elements (variadic): elements
Returns:
A NumPy array of same length as the provided varadic argument elements

>>> make_array(0)
array([0])
>>> make_array(2, 3, 4)
array([2, 3, 4])
>>> make_array("foo", "bar")
array(['foo', 'bar'],
dtype='<U3')
>>> make_array()
array([], dtype=float64)
"""
if elements and all(isinstance(item, (int, np.integer)) for item in elements):
# Specifically added for Windows machines where the default
# integer is int32 - see GH issue #339.
return np.array(elements, dtype="int64")

# Manually cast elements as an object due to this: https://github.com/data-8/datascience/issues/458
if any(is_non_string_iterable(el) for el in elements):
return np.array(elements, dtype=object)

return np.array(elements)

[docs]
def percentile(p, arr=None):
"""Returns the pth percentile of the input array (the value that is at
least as great as p% of the values in the array).

If arr is not provided, percentile returns itself curried with p

>>> percentile(74.9, [1, 3, 5, 9])
5
>>> percentile(75, [1, 3, 5, 9])
5
>>> percentile(75.1, [1, 3, 5, 9])
9
>>> f = percentile(75)
>>> f([1, 3, 5, 9])
5
"""
if arr is None:
return lambda arr: percentile(p, arr)
if hasattr(p, '__iter__'):
return np.array([percentile(x, arr) for x in p])
if p == 0:
return min(arr)
assert 0 < p <= 100, 'Percentile requires a percent'
i = (p/100) * len(arr)
return sorted(arr)[math.ceil(i) - 1]

[docs]
def plot_normal_cdf(rbound=None, lbound=None, mean=0, sd=1):
"""Plots a normal curve with specified parameters and area below curve shaded
between lbound and rbound.

Args:
rbound (numeric): right boundary of shaded region

lbound (numeric): left boundary of shaded region; by default is negative infinity

mean (numeric): mean/expectation of normal distribution

sd (numeric): standard deviation of normal distribution
"""
shade = rbound is not None or lbound is not None
shade_left = rbound is not None and lbound is not None
inf = 3.5 * sd
step = 0.1
rlabel = rbound
llabel = lbound
if rbound is None:
rbound = inf + mean
rlabel = r"$\infty$"
if lbound is None:
lbound = -inf + mean
llabel = r"-$\infty$"
pdf_range = np.arange(-inf + mean, inf + mean, step)
plt.plot(pdf_range, stats.norm.pdf(pdf_range, loc=mean, scale=sd), color='k', lw=1)
cdf_range = np.arange(lbound, rbound + step, step)
plt.fill_between(cdf_range, stats.norm.pdf(cdf_range, loc=mean, scale=sd), color='gold')
cdf_range = np.arange(-inf+mean, lbound + step, step)
plt.fill_between(cdf_range, stats.norm.pdf(cdf_range, loc=mean, scale=sd), color='darkblue')
plt.ylim(0, stats.norm.pdf(0, loc=0, scale=sd) * 1.25)
plt.xlabel('z')
plt.ylabel(r'$\phi$(z)', rotation=90)
plt.title(r"Normal Curve ~ ($\mu$ = {0}, $\sigma$ = {1}) "
"{2} < z < {3}".format(mean, sd, llabel, rlabel), fontsize=16)
plt.show()

# Old name
plot_cdf_area = plot_normal_cdf

[docs]
def sample_proportions(sample_size: int, probabilities):
"""Return the proportion of random draws for each outcome in a distribution.

This function is similar to np.random.Generator.multinomial, but returns proportions

Args:
sample_size: The size of the sample to draw from the distribution.

probabilities: An array of probabilities that forms a distribution.

Returns:
An array with the same length as probability that sums to 1.
"""
rng = np.random.default_rng()
return rng.multinomial(sample_size, probabilities) / sample_size

[docs]
def proportions_from_distribution(table, label, sample_size,
column_name='Random Sample'):
"""
Adds a column named column_name containing the proportions of a random
draw using the distribution in label.

This method uses np.random.Generator.multinomial to draw sample_size samples
from the distribution in table.column(label), then divides by
sample_size to create the resulting column of proportions.

Args:
table: An instance of Table.

label: Label of column in table. This column must contain a
distribution (the values must sum to 1).

sample_size: The size of the sample to draw from the distribution.

column_name: The name of the new column that contains the sampled
proportions. Defaults to 'Random Sample'.

Returns:
A copy of table with a column column_name containing the
sampled proportions. The proportions will sum to 1.

Throws:
ValueError: If the label is not in the table, or if
table.column(label) does not sum to 1.
"""
proportions = sample_proportions(sample_size, table.column(label))
return table.with_column('Random Sample', proportions)

[docs]
def table_apply(table, func, subset=None):
"""Applies a function to each column and returns a Table.

Args:
table: The table to apply your function to.

func: The function to apply to each column.

subset: A list of columns to apply the function to; if None,
the function will be applied to all columns in table.

Returns:
A table with the given function applied. It will either be the
shape == shape(table), or shape (1, table.shape[1])
"""
from . import Table
df = table.to_df()

if subset is not None:
# Iterate through columns
subset = np.atleast_1d(subset)
if any([i not in df.columns for i in subset]):
err = np.where([i not in df.columns for i in subset])[0]
err = "Column mismatch: {0}".format(
[subset[i] for i in err])
raise ValueError(err)
for col in subset:
df[col] = df[col].apply(func)
else:
df = df.apply(func)
if isinstance(df, pd.Series):
# Reshape it so that we can easily convert back
df = pd.DataFrame(df).T
tab = Table.from_df(df)
return tab

[docs]
def minimize(f, start=None, smooth=False, log=None, array=False, **vargs):
"""Minimize a function f of one or more arguments.

Args:
f: A function that takes numbers and returns a number

start: A starting value or list of starting values

smooth: Whether to assume that f is smooth and use first-order info

log: Logging function called on the result of optimization (e.g. print)

vargs: Other named arguments passed to scipy.optimize.minimize

Returns either:
(a) the minimizing argument of a one-argument function
(b) an array of minimizing arguments of a multi-argument function
"""
if start is None:
assert not array, "Please pass starting values explicitly when array=True"
arg_count = f.__code__.co_argcount
assert arg_count > 0, "Please pass starting values explicitly for variadic functions"
start = [0] * arg_count
if not hasattr(start, '__len__'):
start = [start]

if array:
objective = f
else:
@functools.wraps(f)
def objective(args):
return f(*args)

if not smooth and 'method' not in vargs:
vargs['method'] = 'Powell'
result = optimize.minimize(objective, start, **vargs)
if log is not None:
log(result)
if len(start) == 1:
return result.x.item(0)
else:
return result.x

[docs]
def is_non_string_iterable(value):
"""Returns a boolean value representing whether a value is iterable."""
if isinstance(value, str):
return False
if hasattr(value, '__iter__'):
return True
return False