""" Functions for signal processing and statistics. Translated and vectorized from NOAA bulk model and flux processing MATLAB scripts. Uncomment code at the end of this file and execute '%run flux.py' from the iPython command line to test functions. Byron Blomquist, CU/CIRES, NOAA/ESRL/PSD3 v1: Oct 2014 """ def nanmean(x, dim=0): """ usage: m = nanmean(x) - returns means(s) along axis 0 m = nanmean(x, dim=n) - along axis n... Utilizes masked array for exclusion of NaN and Inf values. Returns ndarray float for any numeric object input. """ from numpy.ma import asarray, masked_invalid, mean, shape from numpy import copyto, zeros xm = asarray(x, dtype=float) # convert to masked array xm = masked_invalid(xm) # create mask for NaN and Inf mm = mean(xm, axis=dim) # compute medians along specified dim m = zeros(shape(mm)) # copy from masked back to ndarray copyto(m,mm) return m def nanmedian(x, dim=0): """ usage: m = nanmedian(x) - returns median(s) along axis 0 m = nanmedian(x, dim=n) - along axis n... Utilizes masked array for exclusion of NaN and Inf values. Returns ndarray float for any numeric object input. """ from numpy.ma import asarray, masked_invalid, median, shape from numpy import copyto, zeros xm = asarray(x, dtype=float) # convert to masked array xm = masked_invalid(xm) # create mask for NaN and Inf mm = median(xm, axis=dim) # compute medians along specified dim m = zeros(shape(mm)) # copy from masked back to ndarray copyto(m,mm) return m def nanvar(x, dim=0, df=1): """ usage: v = nanvar(x) - returns variance(s) along axis 0 v = nanvar(x, dim=n) - along axis n... Utilizes masked array for exclusion of NaN and Inf values. Default is for unbiased divisor: N-1 For division by N set kwarg df=0 Returns ndarray float for any numeric object input. """ from numpy.ma import asarray, masked_invalid, var, shape from numpy import copyto, zeros xm = asarray(x, dtype=float) # convert to masked array xm = masked_invalid(xm) # create mask for NaN and Inf vm = var(xm, axis=dim, ddof=df) # compute medians along specified dim v = zeros(shape(vm)) # copy from masked back to ndarray copyto(v,vm) return v def nanstd(x, dim=0, df=1): """ usage: v = nanvar(x) - returns std dev(s) along axis 0 v = nanvar(x, dim=n) - along axis n... Utilizes masked array for exclusion of NaN and Inf values. Default is for unbiased divisor: N-1 For division by N set kwarg df=0 Returns ndarray float for any numeric object input. """ from numpy import sqrt std = sqrt(nanvar(x, dim=dim, df=df)) return std def nancov(x, y=None): """ usage: C = nancov(x) - x is 2-D array with obs in rows and columns as variables (as in MATLAB) C_{ij} is the covariance of `x_i` and `x_j`. The element `C_{ii}` is the variance of `x_i`. If x is 1-D result is just variance of x. If x has only 2 columns, result is the same as nancov(x, y) for variables in separate arrays. C = nancov(x,y) - y is an additional set of variables and observations. `y` has the same form as `x`. Normally x and y will both be 1-D and C is the covariance matrix for x and y. Utilizes masked arrays for exclusion of NaN and Inf values. NaN and Inf are treated as missing values. The mask is common and a missing value in one vector is also masked in the other. Normalization is by (N-1). Returns ndarray float for any numeric object input. """ from numpy.ma import cov, asarray, masked_invalid, shape from numpy import copyto, zeros xm = asarray(x, dtype=float) # convert to masked float array xm = masked_invalid(xm) # create mask for NaN and Inf if y is None: # if no y do this C = cov(xm, rowvar=False) Cnd = zeros(shape(C)) # copy from masked back to ndarray copyto(Cnd,C) return Cnd else: # if y is given... ym = asarray(y, dtype=float) ym = masked_invalid(ym) C = cov(xm, ym, rowvar=False) Cnd = zeros(shape(C)) # copy from masked back to ndarray copyto(Cnd,C) return Cnd def despike(x, lim=4, alt=0.5): """ usage: xx, spks, nans = despike(x) - Returns despiked copy of x, number of spikes and number of nans. Detects spikes > 4 sigma and replaces with nearest lower neighbor. xx, spks, nans = despike(x, lim=s, alt=z) - Sets rejection limit to lim*sigma or alt, whichever is greater. Default is 4*sigma or 0.5. This function also replaces NaNs with nearest lower neighbor, except for NaNs at the start, which are replaced with the first finite value. Runs of NaN or multi-point spikes will thus be replaced with a constant value. Evaluate the number of spikes and NaNs to determine if the despiked array is usable. Returns tuple of despiked ndarray (xx), # of spikes and total # nans. """ import numpy as np xx = np.copy(x) # copy of x as ndarray if xx.ndim == 1: # add second row of indices to xx yy = np.arange(0, xx.size) xx = np.vstack((xx, yy)) # sort array with row 0 as key - NaNs go to the top xx = rowsort(xx) # count finite values and nans fins = np.sum(np.isfinite(xx[0,:])) nans = np.sum(np.isnan(xx[0,:])) # find median xxm = nanmedian(xx[0,:]) # compute 1-sigma estimate hi = int(0.84*fins) lo = int(0.16*fins) sig = (xx[0,hi] - xx[0,lo]) / 2 # set despike criterion dsig = max(lim*sig, alt) # count negative spikes neg = np.sum((xxm - xx[0,0:fins]) > dsig) # count positive spikes pos = np.sum((xx[0,0:fins] - xxm) > dsig) # number of spikes spks = neg + pos # NaN points at ends of sorted array that exceed limits xx[0,0:neg] = np.nan xx[0,fins-pos:fins] = np.nan # then resort to original order using the index row # and then eliminate the index row xx = rowsort(xx, row=1) xx = xx[0,:] nx = xx.size # identify all nans ii = np.isnan(xx) # boolean array same size as xx if ii.sum() > 0: # if there are NaNs... # first eliminate NaNs at the beginning for idx, val in np.ndenumerate(xx): if np.isfinite(val): xx[0:idx[0]] = xx[idx[0]] # idx is tuple (n,) break # then replace remaining NaNs with nearest lower neighbor for idx, val in np.ndenumerate(ii): if val: xx[idx[0]] = xx[idx[0]-1] return (xx, spks, nans) else: raise ValueError, 'despike(x): Input array should be 1-D.' def rowsort(x, row=0): """ usage: s= rowsort(x, row=n) - Sorts entire array using one row as key. Columns are swapped to yield ascending order in selected row. Array is 2-D but may have N rows. Default key is row 0. Sort order is low to high. """ from numpy import argsort, copy xx = copy(x) if xx.ndim == 2: ii = argsort(xx, axis=1) # sort indices array, same shape as x xx = xx[:,ii[row,:]] # swap rows according to ii return xx else: raise ValueError, 'rowsort(x): Input array should be 2-D' def interval_avg(t1,x1,t2): """ usage: x2 = interval_avg(t1,x1,t2) - averages values x1 obtained at timebase t1 over new (longer) time intervals t2. t1 and t2 are assumed to be 1-D. x2 may be 2-D, in which case averages are computed along columns, which must have the same length as t1. Times are assumed to mark the start point of each interval. HOWEVER, THE LAST VALUE OF T2 IS ASSUMED TO BE THE STOP POINT AND NO AVERAGE WILL BE COMPUTED FOR THIS VALUE. THE RESULT ARRAY WILL THEREFORE HAVE t2-1 ROWS. Uniform delta-t in either t1 or t2 is not required, although in most applications delta-t2 will be uniform. t1 and t2 must overlap, but the start and end points for each time series may be different. Intervals in t2 not covered by t1 are set to NaN. The average conserves integrals in the case where delta-t1 is variable with no missing data in x1. For intervals in t2 containing missing values in x1, the mean reflects only the valid data from x1 and the integral of x2 over t2 will be different from the integral of finite values of x1 over the same interval. For points in t1,x1 straddling an interval in t2, the value fall to the t2 interval covering the majority of the t1 interval. The average over intervals in t2 becomes more exact when delta-t2 is large compared to delta-t1. In this version time format is assumed to be a serial number such as decimal day-of-year, Julian date or elapsed seconds from some epoch. We assume only that t1 and t2 are using the same time format. Support for the python 'datetime' data type will be considered for a later version. Returns ndarray float for any numeric type input BWB Vectorized version after SPdeS 2007 MATLAB intavg.m """ import numpy as np from util import find tt1 = np.asarray(t1, dtype=float) # Make ndarray float if necessary tt2 = np.asarray(t2, dtype=float) # Make ndarray float if necessary xx1 = np.ma.asarray(x1, dtype=float) # Create masked array of x1 xx1 = np.ma.masked_invalid(xx1) # Mask NaNs and Infs # Check input for consistency if tt1.ndim is not 1 or tt2.ndim is not 1: raise ValueError, 'interval_avg: t1 & t2 should be 1-D' if xx1.ndim > 2: raise ValueError, 'interval_avg: x1 should be <= 2-D' if xx1.shape[0] != tt1.size: raise ValueError, 'interval_avg: x1 and t1 not equal length' if np.mean(np.diff(tt2)) < np.mean(np.diff(tt1)): raise ValueError, 'interval_avg: t2 interval should be longer than t1' if tt2[0] > tt1[-1] or tt2[-1] < tt1[0]: # Could also simply return nan result array for this case raise ValueError, 'interval_avg: time arrays do not overlap' # Create masked copy of xx1 and fill with delta t dt = np.ma.copy(xx1)*0 # Mask is identical to xx1. Fill with zeros. dt += tt1.reshape(-1,1) # Broadcast tt1 to all columns of dt dt = np.ma.diff(dt, axis=0) # compute masked delta t array # Note: dt now has one less row that tt1 or xx1, so duplicate and append # last row of dt, assuming the last pt in x1 has same dt as prior pt last = dt[-1,:].reshape(1,-1) dt = np.ma.vstack((dt,last)) # multiply xx1 by dt to yield matrix of integral values of x1 xx1 *= dt # Create output array with tt2 rows-1 and xx1 cols x2 = np.ndarray((tt2.size-1,xx1.shape[1])) * np.nan # Create accumulators for sums - same shape as x2 sum_all = np.ma.zeros(x2.shape) dtsum_all = np.ma.zeros(x2.shape) # We will average over intervals in t2, but begin and end times # of t2 may differ from begin and end times of t1 # Therefore, find the range of t2 over which we will average # Start point in t2: ii = find(tt2 <= tt1[0]) # find last value in tt2 that is <= tt1[0] if ii.size == 0: startPt = 0 # tt2 starts later than tt1, so start at tt2[0] else: startPt = ii[-1] # beginning of first interval is tt2[ii[-1]] # End point in t2 will be the end of the last interval ii = find(tt2 <= tt1[-1]) endPt = ii[-1] # end of last interval is ii[-1] # Compute sums from xx1 and dt over intervals in tt2 for intvl in range(startPt,endPt): # range excludes endPt in tt2 # First, decide which points fall mostly within the tt2 interval ii = find(tt1 < tt2[intvl])[-1] # tt1 just before start of intvl if np.abs(tt1[ii+1] - tt2[intvl]) < 0.5*np.abs(tt1[ii+1] - tt1[ii]): ii += 1 # intervals ~coincide, so start from next point kk = find(tt1 < tt2[intvl+1])[-1] # tt1 just before [intvl+1] if np.abs(tt1[kk+1] - tt2[intvl+1]) < 0.5*np.abs(tt1[kk] - tt1[kk-1]): kk += 1 # intervals ~coincide, so stop at next point # Now accumulate sums of xx1 and dt over intervals # Note: the ma.sum function sets masked values to zero, so NaNs # will not be counted in either accumulator. If all values of # xx1 in a particular interval are NaN, the accumulated value # in dtsum_all will be zero, indicating no data for that interval # Also, the data point straddling the end of the interval will be # included in the following interval. sum_all[intvl,:] = np.ma.sum(xx1[ii:kk,:], axis=0) dtsum_all[intvl,:] = np.ma.sum(dt[ii:kk,:], axis=0) # Now convert the accumulator arrays from masked back to ndarray. # This is necessary because the following division with # masked arrays returns zero rather than NaN for missing data sum = np.zeros(sum_all.shape) dtsum = np.zeros(dtsum_all.shape) np.copyto(sum, sum_all) np.copyto(dtsum, dtsum_all) # Now convert zeros in dtsum to NaN to avoid div by zero warning dtsum[dtsum==0] = np.nan # Finally, divide sum by dtsum to yield averages x2 = sum/dtsum return x2 # This code executes if 'run flux.py' is executed from iPython cmd line # Uncomment lines below to test particular functions if __name__ == '__main__': import numpy as np import matplotlib.pyplot as plt # TEST NAN FUNCTIONS # print 'testing nanmean(x, dim=n) and nanmedian(x, dim=n)' # x = np.array([[1,np.nan,3,4,5],[6,np.nan,np.nan,9,10],[21,22,23,24,25]]) # print x # print 'y = nanmean(x) - column means' # y = nanmean(x) # print 'column mean = ', y # print 'y = nanmean(x, dim=1) - row means' # y = nanmean(x, dim=1) # print 'row mean = ', y # print 'y = nanmedian(x) - column medians' # y = nanmedian(x) # print 'column median = ', y # print 'y = nanmedian(x, dim=1) - row medians' # y = nanmedian(x, dim=1) # print ' row median = ', y # print 'var = nanvar(x) & nanstd(x) - column var & std' # var = nanvar(x) # std = nanstd(x) # print 'column variance = ', var # print 'column std dev = ', std # print 'var = nanvar(x, dim=1) & nanstd(x,dim=1) - rows' # var = nanvar(x, dim=1) # std = nanstd(x, dim=1) # print 'row vairance = ', var # print 'row std dev = ', std # TEST OF NANCOV # x = np.random.randn(30,2) # 2 cols random data # x[:,1] = np.sum(x,1) # introduce correlation # x[3,0] = np.nan # throw in a few NaNs # x[10:12,0] = np.nan # x[17:19,1] = np.nan # print 'testing nancov(x) - cov matrix for 2-D array' # print x # C = nancov(x) # print '\n' # print C # x = np.arange(0,20, dtype=float) # y = np.arange(20,0,-1, dtype=float) # x[3:5] = np.nan # print 'testing nancov(x, y) - cov matrix for separate 1-D arrays' # print x # print y # C = nancov(x, y) # print '\n' # print C # TEST OF DESPIKE # x = np.random.randn(100) # s = x.std() # x[22] = np.nan # x[45:49] = np.nan # x[34] = 5*s # x[90] = -5*s # x[0] = 6*s # x[1:4] = np.nan # x[99] = -7*s # print 'testing despike(x)' # y, N, p = despike(x) # print 'number of spikes', N # print 'number of nans', p # TEST OF INTERVAL_AVG # t1 = np.arange(0,10,0.2) # high freq timebase # x1 = np.random.randn(50,4) # 4 variables at timebase t1 # x1[3:6,1]=np.nan # throw in a few NaNs # x1[25:40,3]=np.nan # t2 = np.arange(1,10) # low freq timebase to average over # x2 = interval_avg(t1,x1,t2) # print x2 # plt.figure(1) # dt2 = np.mean(np.diff(t2)) # plt.plot(t2[0:-1]+dt2/2, x2[:,0], 'd-') # plt.hold(True) # dt1 = np.mean(np.diff(t1)) # plt.plot(t1+dt1/2,x1[:,0], 'o-') # plt.xticks(np.arange(10)) # plt.grid(axis = 'x') # plt.hold(False) # plt.show() # # check shows integrals for columns with no nans are identical # # for both x1 and x2, but missing data biases the integrals # x1 = np.ma.asarray(x1) # x1 = np.ma.masked_invalid(x1) # x1 *= 0.2 # to check integrals, first multiply by delta-t # intx1 = np.ma.cumsum(x1[5:45,:],axis=0) # then sum # print intx1[-1,:] # x2 = np.ma.asarray(x2) # x2 = np.ma.masked_invalid(x2) # intx2 = np.ma.cumsum(x2,axis=0) # same for x2, delta-t = 1 # print intx2[-1,:]