-
Notifications
You must be signed in to change notification settings - Fork 533
New SpecifyEvents interface #1555
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
b44ef0d
first pass at working event transformer interface
tyarkoni 772d280
adds Transformation class to represent one or more chained operations…
tyarkoni e68ad8c
drop sklearn dependency in orthogonalization
tyarkoni 78f7b70
replace sklearn preproc dependencies
tyarkoni 8b9cd56
renamed interface; minor tweaks
tyarkoni 095154a
working data transformations
tyarkoni bb4f6f4
moved import exception raising into _run_interface
tyarkoni 2878f47
added column renaming
tyarkoni c17af77
removed Transformation class
tyarkoni db642fa
better approach to transformation mapping
tyarkoni File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,397 @@ | ||
from __future__ import division | ||
|
||
import numpy as np | ||
from nipype.external.six import string_types | ||
from nipype.interfaces.base import (BaseInterface, TraitedSpec, InputMultiPath, | ||
traits, File, Bunch, BaseInterfaceInputSpec, | ||
isdefined, OutputMultiPath) | ||
from nipype import logging | ||
import re | ||
from glob import glob | ||
from os.path import basename | ||
import json | ||
iflogger = logging.getLogger('interface') | ||
|
||
|
||
have_pandas = True | ||
try: | ||
import pandas as pd | ||
except: | ||
have_nipy = False | ||
|
||
|
||
class Transformations(object): | ||
|
||
@staticmethod | ||
def standardize(data, demean=True, rescale=True): | ||
if demean: | ||
data -= data.mean(0) | ||
if rescale: | ||
data /= data.std(0) | ||
return data | ||
|
||
@staticmethod | ||
def orthogonalize(data, other): | ||
''' Orthogonalize each of the variables in cols with respect to all | ||
of the variables in x_cols. | ||
''' | ||
y = data.values | ||
X = other.values | ||
_aX = np.c_[np.ones(len(y)), X] | ||
coefs, resids, rank, s = np.linalg.lstsq(_aX, y) | ||
return y - X.dot(coefs[1:]) | ||
|
||
@staticmethod | ||
def binarize(data, threshold=0.0): | ||
above = data > threshold | ||
data[above] = 1 | ||
data[~above] = 0 | ||
return data | ||
|
||
|
||
def alias(target, append=False): | ||
def decorator(func): | ||
def wrapper(self, cols, groupby=None, output=None, *args, **kwargs): | ||
|
||
cols = self._select_cols(cols) | ||
data = self.data[cols] | ||
|
||
if 'other' in kwargs: | ||
kwargs['other'] = self.data[kwargs['other']] | ||
|
||
# groupby can be either a single column, in which case it's | ||
# interpreted as a categorical variable to groupby directly, or a | ||
# list of column output, in which case it's interpreted as a set of | ||
# dummy columns to reconstruct a categorical from. | ||
if groupby is not None: | ||
groupby = self._select_cols(groupby) | ||
if len(groupby) > 1: | ||
group_results = [] | ||
output = ['%s_%s' % (cn, gn)for cn in cols for gn in groupby] | ||
for i, col in enumerate(groupby): | ||
_result = target(data, *args, **kwargs) | ||
group_results.extend(_result.T) | ||
result = np.c_[group_results].squeeze().T | ||
result = pd.DataFrame(result, columns=output) | ||
else: | ||
result = self.data.groupby(groupby).apply(target, data, *args, **kwargs) | ||
else: | ||
result = target(data, *args, **kwargs) | ||
|
||
if append: | ||
output = result.columns | ||
|
||
if output is not None: | ||
result = pd.DataFrame(result, columns=output) | ||
self.data = self.data.join(result) | ||
else: | ||
self.data[cols] = result | ||
return wrapper | ||
return decorator | ||
|
||
|
||
class EventTransformer(object): | ||
|
||
def __init__(self, events, orig_hz=1, target_hz=1000): | ||
self.events = events | ||
self.orig_hz = orig_hz | ||
self.target_hz = target_hz | ||
self._to_dense() | ||
|
||
### Aliased functions ### | ||
@alias(np.log) | ||
def log(): pass | ||
|
||
@alias(np.logical_or) | ||
def or_(): pass | ||
|
||
@alias(np.logical_and) | ||
def and_(): pass | ||
|
||
@alias(np.logical_not) | ||
def not_(): pass | ||
|
||
@alias(pd.get_dummies, append=True) | ||
def factor(): pass | ||
|
||
@alias(Transformations.standardize) | ||
def standardize(): pass | ||
|
||
@alias(Transformations.binarize) | ||
def binarize(): pass | ||
|
||
@alias(Transformations.orthogonalize) | ||
def orthogonalize(): pass | ||
|
||
### Standard instance methods ### | ||
def select(self, cols): | ||
# Always retain onsets | ||
if 'onset' not in cols: | ||
cols.insert(0, 'onset') | ||
self.data = self.data[self._select_cols(cols)] | ||
|
||
def formula(self, f, target=None, replace=False, *args, **kwargs): | ||
from patsy import dmatrix | ||
result = dmatrix(f, self.data, return_type='dataframe', *args, **kwargs) | ||
if target is not None: | ||
self.data[target] = result | ||
elif replace: | ||
self.data[result.columns] = result | ||
else: | ||
raise ValueError("Either a target column must be passed or replace" | ||
" must be True.") | ||
|
||
def multiply(self, cols, x_cols): | ||
x_cols = self._select_cols(x_cols) | ||
result = self.data[x_cols].apply(lambda x: np.multiply(x, self.data[cols])) | ||
output = ['%s_%s' % (cols, x) for x in x_cols] | ||
self.data[output] = result | ||
|
||
def rename(self, cols, output): | ||
rename = dict(zip(cols, output)) | ||
self.data = self.data.rename(columns=rename) | ||
|
||
def query(self, q, *args, **kwargs): | ||
self.data = self.data.query(filter) | ||
|
||
def apply(self, func, *args, **kwargs): | ||
if isinstance(func, string_types): | ||
func = getattr(self, func) | ||
func(*args, **kwargs) | ||
|
||
def _select_cols(self, cols): | ||
if isinstance(cols, string_types) and '*' in cols: | ||
patt = re.compile(cols.replace('*', '.*')) | ||
cols = [l for l in self.data.columns.tolist() for m in [patt.search(l)] if m] | ||
return cols | ||
|
||
def _to_dense(self): | ||
""" Convert the sparse [onset, duration, amplitude] representation | ||
typical of event files to a dense matrix where each row represents | ||
a fixed unit of time. """ | ||
end = int((self.events['onset'] + self.events['duration']).max()) | ||
|
||
targ_hz, orig_hz = self.target_hz, self.orig_hz | ||
len_ts = end * targ_hz | ||
conditions = self.events['condition'].unique().tolist() | ||
n_conditions = len(conditions) | ||
ts = np.zeros((len_ts, n_conditions)) | ||
|
||
_events = self.events.copy().reset_index() | ||
_events[['onset', 'duration']] = _events[['onset', 'duration']] * targ_hz / orig_hz | ||
|
||
cond_index = [conditions.index(c) for c in _events['condition']] | ||
ev_end = np.round(_events['onset'] + _events['duration']).astype(int) | ||
onsets = np.round(_events['onset']).astype(int) | ||
|
||
for i, row in _events.iterrows(): | ||
ts[onsets[i]:ev_end[i], cond_index[i]] = row['amplitude'] | ||
|
||
self.data = pd.DataFrame(ts, columns=conditions) | ||
onsets = np.arange(len(ts)) / self.target_hz | ||
self.data.insert(0, 'onset', onsets) | ||
|
||
def resample(self, sampling_rate): | ||
""" | ||
Resample the design matrix to the specified sampling rate. Primarily | ||
useful for downsampling to match the TR, so as to export the design as | ||
a n(TR) x n(conds) matrix. | ||
""" | ||
sampling_rate = np.round(sampling_rate * 1000) | ||
self.data.index = pd.to_datetime(self.data['onset'], unit='s') | ||
self.data = self.data.resample('%dL' % sampling_rate).mean() | ||
self.data['onset'] = self.data.index.astype(np.int64) / int(1e9) | ||
self.data = self.data.reset_index(drop=True) | ||
|
||
|
||
class EventReader(object): | ||
|
||
def __init__(self, columns=None, header='infer', sep=None, | ||
default_duration=0., default_amplitude=1., | ||
condition_pattern=None, subject_pattern=None, | ||
run_pattern=None): | ||
''' | ||
Args: | ||
columns (list): Optional list of column output to use. If passed, | ||
number of elements must match number of columns in the text | ||
files to be read. If omitted, column output are inferred by | ||
pandas (depending on value of header). | ||
header (str): passed to pandas; see pd.read_table docs for details. | ||
sep (str): column separator; see pd.read_table docs for details. | ||
default_duration (float): Optional default duration to set for all | ||
events. Will be ignored if a column named 'duration' is found. | ||
default_amplitude (float): Optional default amplitude to set for | ||
all events. Will be ignored if an amplitude column is found. | ||
condition_pattern (str): regex with which to capture condition | ||
output from input text file fileoutput. Only the first captured | ||
group will be used. | ||
subject_pattern (str): regex with which to capture subject | ||
output from input text file fileoutput. Only the first captured | ||
group will be used. | ||
run_pattern (str): regex with which to capture run output from input | ||
text file fileoutput. Only the first captured group will be used. | ||
''' | ||
|
||
self.columns = columns | ||
self.header = header | ||
self.sep = sep | ||
self.default_duration = default_duration | ||
self.default_amplitude = default_amplitude | ||
self.condition_pattern = condition_pattern | ||
self.subject_pattern = subject_pattern | ||
self.run_pattern = run_pattern | ||
|
||
def read(self, path, condition=None, subject=None, run=None, rename=None): | ||
|
||
dfs = [] | ||
|
||
if isinstance(path, string_types): | ||
path = glob(path) | ||
|
||
for f in path: | ||
_data = pd.read_table(f, names=self.columns, header=self.header, | ||
sep=self.sep) | ||
|
||
if rename is not None: | ||
_data = _data.rename(rename) | ||
|
||
# Validate and set CODA columns | ||
cols = _data.columns | ||
|
||
if 'onset' not in cols: | ||
raise ValueError( | ||
"DataFrame is missing mandatory 'onset' column.") | ||
|
||
if 'duration' not in cols: | ||
if self.default_duration is None: | ||
raise ValueError( | ||
'Event file "%s" is missing \'duration\'' | ||
' column, and no default_duration was provided.' % f) | ||
else: | ||
_data['duration'] = self.default_duration | ||
|
||
if 'amplitude' not in cols: | ||
_data['amplitude'] = self.default_amplitude | ||
|
||
if condition is not None: | ||
_data['condition'] = condition_name | ||
elif 'condition' not in cols: | ||
cp = self.condition_pattern | ||
if cp is None: | ||
cp = '(.*)\.[a-zA-Z0-9]{3,4}' | ||
m = re.search(cp, basename(f)) | ||
if m is None: | ||
raise ValueError( | ||
"No condition column found in event file, no " | ||
"condition_name argument passed, and attempt to " | ||
"automatically extract condition from filename failed." | ||
" Please make sure a condition is specified.") | ||
_data['condition'] = m.group(1) | ||
|
||
if subject is not None: | ||
_data['subject'] = subject | ||
elif self.subject_pattern is not None: | ||
m = re.search(self.subject_pattern, f) | ||
if m is None: | ||
raise ValueError( | ||
"Subject pattern '%s' failed to match any part of " | ||
"filename '%s'." % (self.subject_pattern, f)) | ||
_data['subject'] = m.group(1) | ||
|
||
if run is not None: | ||
_data['run'] = run | ||
elif self.run_pattern is not None: | ||
m = re.search(self.run_pattern, f) | ||
if m is None: | ||
raise ValueError( | ||
"Run pattern '%s' failed to match any part of " | ||
"filename '%s'." % (self.run_pattern, f)) | ||
_data['run'] = m.group(1) | ||
|
||
dfs.append(_data) | ||
|
||
return pd.concat(dfs, axis=0) | ||
|
||
|
||
class SpecifyEventsInputSpec(BaseInterfaceInputSpec): | ||
subject_info = InputMultiPath(Bunch, mandatory=True, xor=['subject_info', | ||
'event_files'], | ||
desc=("Bunch or List(Bunch) subject specific condition information. " | ||
"see :ref:`SpecifyModel` or SpecifyModel.__doc__ for details")) | ||
event_files = InputMultiPath(traits.List(File(exists=True)), mandatory=True, | ||
xor=['subject_info', 'event_files'], | ||
desc=('list of event description files in 1, 2, 3, or 4 column format ' | ||
'corresponding to onsets, durations, amplitudes, and output')) | ||
input_units = traits.Enum('secs', 'scans', mandatory=True, | ||
desc=("Units of event onsets and durations (secs or scans). Output " | ||
"units are always in secs")) | ||
time_repetition = traits.Float(mandatory=True, | ||
desc=("Time between the start of one volume to the start of " | ||
"the next image volume.")) | ||
transformations = traits.File(exists=True, mandatory=False, | ||
desc=("JSON specification of the transformations to perform.")) | ||
|
||
|
||
class SpecifyEventsOutputSpec(TraitedSpec): | ||
subject_info = OutputMultiPath(Bunch, mandatory=True, | ||
desc=("Bunch or List(Bunch) subject specific condition information. " | ||
"see :ref:`SpecifyModel` or SpecifyModel.__doc__ for details")) | ||
|
||
|
||
class SpecifyEvents(BaseInterface): | ||
|
||
input_spec = SpecifyEventsInputSpec | ||
output_spec = SpecifyEventsOutputSpec | ||
|
||
def _get_event_data(self): | ||
if isdefined(self.inputs.subject_info): | ||
info = self.inputs.subject_info | ||
return pd.from_records(info) | ||
else: | ||
info = self.inputs.event_files | ||
reader = EventReader(columns=['onset', 'duration', 'amplitude']) | ||
return reader.read(info[0]) | ||
|
||
def _transform_events(self): | ||
events = self._get_event_data() | ||
self.transformer = EventTransformer(events) | ||
if isdefined(self.inputs.transformations): | ||
tf = json.load(open(self.inputs.transformations)) | ||
for t in tf['steps']: | ||
name = t.pop('name') | ||
cols = t.pop('input', None) | ||
self.transformer.apply(name, cols, **t) | ||
|
||
self.transformer.resample(self.inputs.time_repetition) | ||
|
||
def _run_interface(self, runtime): | ||
if not have_pandas: | ||
raise ImportError("The SpecifyEvents interface requires pandas. " | ||
"Please make sure that pandas is installed.") | ||
self._transform_events() | ||
return runtime | ||
|
||
def _df_to_bunch(self): | ||
|
||
if not hasattr(self, 'transformer'): | ||
self._transform_events() | ||
|
||
_data = self.transformer.data | ||
|
||
info = Bunch(conditions=[], onsets=[], durations=[], amplitudes=[]) | ||
cols = [c for c in _data.columns if c not in {'onset'}] | ||
onsets = _data['onset'].values.tolist() | ||
info.conditions = cols | ||
|
||
for col in _data.columns: | ||
info.onsets.append(onsets) | ||
info.durations.append(self.inputs.time_repetition) | ||
info.amplitudes.append(_data[col].values.tolist()) | ||
|
||
return info | ||
|
||
def _list_outputs(self): | ||
outputs = self._outputs().get() | ||
outputs['subject_info'] = self._df_to_bunch() | ||
return outputs | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be
have_pandas
?