Source code for ffnet

########################################################################
## Copyright (C) 2006 by Marek Wojciechowski
## <mwojc@p.lodz.pl>
##
## Distributed under the terms of LGPL-3.0 license
## https://opensource.org/licenses/LGPL-3.0
########################################################################

"""
--------------------------------------
Main ffnet class and utility functions
--------------------------------------
"""

from _version import version
from scipy import zeros, ones, random, optimize, sqrt, ndarray, array
import networkx as NX
from fortran import _ffnet as netprop
from pikaia import pikaia
import sys

[docs]def mlgraph(arch, biases = True): """ Creates standard multilayer network architecture. :Parameters: arch : tuple Tuple of integers - numbers of nodes in subsequent layers. biases : bool, optional Indicates if bias (node numbered 0) should be added to hidden and output neurons. Default is *True*. :Returns: conec : list List of tuples -- network connections. :Examples: Basic calls: >>> from ffnet import mlgraph >>> mlgraph((2,2,1)) [(1, 3), (2, 3), (0, 3), (1, 4), (2, 4), (0, 4), (3, 5), (4, 5), (0, 5)] >>> mlgraph((2,2,1), biases = False) [(1, 3), (2, 3), (1, 4), (2, 4), (3, 5), (4, 5)] Exemplary plot: .. plot:: :include-source: from ffnet import mlgraph, ffnet import networkx as NX import pylab conec = mlgraph((3,6,3,2), biases=False) net = ffnet(conec) NX.draw_graphviz(net.graph, prog='dot') pylab.show() """ nofl = len(arch) conec = [] if nofl: trg = arch[0] for l in xrange(1, nofl): layer = arch[l] player = arch[l-1] srclist = range(trg-player+1, trg+1) if biases: srclist += [0] for i in xrange(layer): trg += 1 for src in srclist: conec.append((src, trg)) return conec
[docs]def imlgraph(arch, biases = True): """ Creates multilayer architecture with independent outputs. This function uses `mlgraph` to build independent multilayer architectures for each output neuron. Then it merges them into one graph with common input nodes. :Parameters: arch : tuple Tuple of length 3. The first element is number of network inputs, last one is number of outputs and the middle one is interpreted as the hidden layers definition (it can be an *integer* or a *list* -- see examples) biases : bool, optional Indicates if bias (node numbered 0) should be added to hidden and output neurons. Default is *True*. :Returns: conec : list List of tuples -- network connections. :Raises: TypeError If *arch* cannot be properly interpreted. :Examples: The following *arch* definitions are possible: >>> from ffnet import imlgraph >>> arch = (2, 2, 2) >>> imlgraph(arch, biases=False) [(1, 3), (2, 3), (1, 4), (2, 4), (3, 5), (4, 5), (1, 6), (2, 6), (1, 7), (2, 7), (6, 8), (7, 8)] In this case two multilayer networks (for two outputs) of the architectures (2,2,1), (2,2,1) are merged into one graph. >>> arch = (2, [(2,), (2,2)], 2) >>> imlgraph(arch, biases=False) [(1, 3), (2, 3), (1, 4), (2, 4), (3, 5), (4, 5), (1, 6), (2, 6), (1, 7), (2, 7), (6, 8), (7, 8), (6, 9), (7, 9), (8, 10), (9, 10)] I this case networks of the architectures (2,2,1) and (2,2,2,1) are merged. Exemplary plot: .. plot:: :include-source: from ffnet import imlgraph, ffnet import networkx as NX import pylab conec = imlgraph((3, [(3,), (6, 3), (3,)], 3), biases=False) net = ffnet(conec) NX.draw_graphviz(net.graph, prog='dot') pylab.show() """ #Checks of the architecture if len(arch) < 3: raise TypeError("Wrong architecture definition (at least 3 layers needed).") if isinstance(arch[1], int): arch = tuple(arch) arch = arch[:1] + tuple([[ arch[1:-1] ] * arch[-1]]) + arch[-1:] elif isinstance(arch[1], (list, tuple)): if len(arch[1]) != arch[2]: raise TypeError("Length of arch[1] should be equal to arch[2].") else: raise TypeError("Wrong architecture definition.") #Merging function def merge(conec, conec_tmp, nofi): from scipy import array, where try: trans = array(conec).max() - nofi tmp = array(conec_tmp) tmp = where(tmp > nofi, tmp + trans, tmp) conec_tmp = [ tuple(c) for c in tmp ] return conec + conec_tmp except ValueError: return conec_tmp nofi = arch[0] inps = arch[:1] outs = (1,) conec = [] for hids in arch[1]: arch_tmp = tuple(inps) + tuple(hids) + tuple(outs) conec_tmp = mlgraph(arch_tmp, biases=biases) conec = merge(conec, conec_tmp, nofi) return conec
[docs]def tmlgraph(arch, biases = True): """ Creates multilayer network full connectivity list. Similar to `mlgraph`, but now layers are fully connected with all preceding layers. :Parameters: arch : tuple Tuple of integers - numbers of nodes in subsequent layers. biases : bool, optional Indicates if bias (node numbered 0) should be added to hidden and output neurons. Default is *True*. :Returns: conec : list List of tuples -- network connections. :Examples: Basic calls: >>> from ffnet import tmlgraph >>> tmlgraph((2,2,1)) [(0, 3), (1, 3), (2, 3), (0, 4), (1, 4), (2, 4), (0, 5), (1, 5), (2, 5), (3, 5), (4, 5)] >>> tmlgraph((2,2,1), biases = False) [(1, 3), (2, 3), (1, 4), (2, 4), (1, 5), (2, 5), (3, 5), (4, 5)] Exemplary plot: .. plot:: :include-source: from ffnet import tmlgraph, ffnet import networkx as NX import pylab conec = tmlgraph((3, 8, 3), biases=False) net = ffnet(conec) NX.draw_graphviz(net.graph, prog='dot') pylab.show() """ nofl = len(arch) conec = []; srclist = [] if biases: srclist = [0] if nofl: trg = arch[0] for l in xrange(1, nofl): layer = arch[l] player = arch[l-1] srclist += range(trg-player+1, trg+1) for i in xrange(layer): trg += 1 for src in srclist: conec.append((src, trg)) return conec
def _dependency(G, source): """ Returns subgraph of G connecting source with all sinks. """ H = G.copy() node_removal = 1 while node_removal: node_removal = 0 for node in H.nodes(): if not H.in_degree(node) and node != source: H.remove_node(node) node_removal = 1 return H def _linear(a, b, c, d): ''' Returns coefficients of linear map from range (a,b) to (c,d) ''' #if b == a: raise ValueError("Mapping not possible due to equal limits") if b == a: c1 = 0.0 c2 = ( c + d ) / 2. else: c1 = ( d - c ) / ( b - a ) c2 = c - a * c1 return c1, c2 def _norms(inarray, lower = 0., upper = 1.): ''' Gets normalization information from an array, for use in ffnet class. (lower, upper) is a range of normalization. inarray is 2-dimensional, normalization parameters are computed for each column... ''' limits = []; en = []; de = [] inarray = array(inarray).transpose() for col in inarray: maxarr = max(col) minarr = min(col) limits += [(minarr, maxarr)] en += [_linear(minarr, maxarr, lower, upper)] de += [_linear(lower, upper, minarr, maxarr)] return array(limits), array(en), array(de) def _normarray(inarray, coeff): ''' Normalize 2-dimensional array linearly column by column. coeff -- linear map coefficiens. ''' #if coeff is not None: inarray = array(inarray).transpose() coeff = array(coeff) i = inarray.shape[0] for ii in xrange(i): inarray[ii] = inarray[ii] * coeff[ii,0] + coeff[ii,1] return inarray.transpose() #else: print "Lack of normalization parameters. Nothing done." def _ffconec(conec): """ Generates forward propagation informations from conec. Checks if conec is acyclic, sorts it if necessary and returns tuple: (graph, conec, inno, hidno, outno) where: graph - NX.DiGraph() conec - topologically sorted conec inno/hidno/outno - lists of input/hidden/ouput units """ if len(conec) == 0: raise ValueError("Empty connectivity list") graph = NX.DiGraph() graph.add_edges_from(conec) snodes = NX.topological_sort(graph) # raises NetworkXUnfeasible if cycles are found conec = []; inno = []; hidno = []; outno = [] for node in snodes: ins = graph.in_edges(node) outs = graph.out_edges(node) if not ins and node != 0 : # biases handling inno += [node] else: conec += ins #Maybe + [(0,node)] i.e. bias if not outs: outno += [node] else: if node != 0: hidno += [node] #bias handling again return graph, conec, inno, hidno, outno def _bconec(conec, inno): """ Generates back propagation informations from conec. Returns positions of edges of reversed graph in conec (for backprop). Conec is assumed to be acyclic. """ bgraph = NX.DiGraph() bgraph.add_edges_from(conec) bgraph = bgraph.reverse() bgraph.remove_nodes_from(inno) try: bgraph.remove_node(0) #handling biases except: pass bsnodes = NX.topological_sort(bgraph) bconecno = [] for bnode in bsnodes: for bedge in bgraph.in_edges(bnode): edge = (bedge[1], bedge[0]) idx = conec.index(edge) + 1 bconecno.append(idx) return bgraph, bconecno def _dconec(conec, inno): """ Generates derivative propagation informations from conec. Return positions of edges (in conec) of graphs for derivative calculation, all packed in one list (dconecno). Additionaly beginings of each graph in this list is returned (dconecmk) """ dgraphs = []; dconecno = []; dconecmk = [0] for idx, i in enumerate(inno): dgraph = NX.DiGraph() dgraph.add_edges_from(conec) dgraph = _dependency(dgraph, i) dsnodes = NX.topological_sort(dgraph) for dnode in dsnodes: for dedge in dgraph.in_edges(dnode): idx = conec.index(dedge) + 1 dconecno.append(idx) dgraphs.append(dgraph) dconecmk.append(len(dconecno)) return dgraphs, dconecno, dconecmk
[docs]class ffnet: """ Feed-forward neural network main class. :Parameters: conec : list of tuples List of network connections lazy_derivative : bool If *True* all data necessary for derivatives calculation (see `ffnet.derivative` method) are generated only on demand. :Returns: net Feed forward network object :Raises: TypeError If *conec* is not directed acyclic graph :Instance attributes: conec : array Topologically sorted network connections weights : array Weights in order of topologically sorted connections renormalize : bool If *True* normalization ranges will be recreated from training data at next training call. Default is *True* :Examples: >>> from ffnet import mlgraph, ffnet >>> conec = mlgraph((2,2,1)) >>> net = ffnet(conec) :See also: `mlgraph`, `tmlgraph`, `imlgraph` """ def __init__(self, conec, lazy_derivative = True): graph, conec, inno, hidno, outno = _ffconec(conec) self.graph = graph self.conec = array(conec) self.inno = array(inno) self.hidno = array(hidno) self.outno = array(outno) bgraph, bconecno = _bconec(conec, self.inno) self.bgraph = bgraph self.bconecno = array(bconecno) # Ommit creating data for derivatives here (which is expensive for large nets) if lazy_derivative: self.dgraphs = None self.dconecno = None self.dconecmk = None else: self._set_dconec() #max(graph) below needed if graph lacks some numbers self.units = zeros(max(graph), 'd') # initialize weights self.randomweights() # set initial normalization parameters self._setnorm() def __repr__(self): info = "Feed-forward neural network: \n" + \ "inputs: %4i \n" %(len(self.inno)) + \ "hiddens: %4i \n" %(len(self.hidno)) + \ "outputs: %4i \n" %(len(self.outno)) + \ "connections and biases: %4i" %(len(self.conec)) return info def __call__(self, inp): ## Something more sophisticated is needed here? return self.call(inp) def _set_dconec(self): conec = [tuple(c) for c in self.conec] # maybe rather some changes in _dconec? dgraphs, dconecno, dconecmk = _dconec(conec, self.inno) self.dgraphs = dgraphs self.dconecno = array(dconecno) self.dconecmk = array(dconecmk)
[docs] def call(self, inp): """ Calculates network answer to given input. :Parameters: inp : array 2D array of input patterns (or 1D for single pattern) :Returns: ans : array 1D or 2D array of calculated network outputs :Raises: TypeError If *inp* is invalid """ if not isinstance(inp, ndarray): inp = array(inp, 'd') if inp.ndim == 1: output = netprop.normcall(self.weights, self.conec, self.units, \ self.inno, self.outno, self.eni, self.deo, inp) return output if inp.ndim == 2: output = netprop.normcall2(self.weights, self.conec, self.units, \ self.inno, self.outno, self.eni, self.deo, inp) return output raise TypeError("Input is not valid")
[docs] def derivative(self, inp): """ Returns partial derivatives of the network's output vs its input. For each input pattern an array of the form:: | o1/i1, o1/i2, ..., o1/in | | o2/i1, o2/i2, ..., o2/in | | ... | | om/i1, om/i2, ..., om/in | is returned. :Parameters: inp : array 2D array of input patterns (or 1D for single pattern) :Returns: ans : array 1D or 2D array of calculated network outputs :Examples: >>> from ffnet import mlgraph, ffnet >>> conec = mlgraph((3,3,2)) >>> net = ffnet(conec); net.weights[:] = 1. >>> net.derivative([0., 0., 0.]) array([[ 0.02233658, 0.02233658, 0.02233658], [ 0.02233658, 0.02233658, 0.02233658]]) """ if self.dconecno is None: #create dconecno (only od demand) self._set_dconec() if not isinstance(inp, ndarray): inp = array(inp, 'd') if inp.ndim == 1: deriv = netprop.normdiff(self.weights, self.conec, self.dconecno, self.dconecmk, \ self.units, self.inno, self.outno, self.eni, self.ded, inp) return deriv if inp.ndim == 2: deriv = netprop.normdiff2(self.weights, self.conec, self.dconecno, self.dconecmk, \ self.units, self.inno, self.outno, self.eni, self.ded, inp) return deriv raise TypeError("Input is not valid")
[docs] def sqerror(self, input, target): """ Calculates sum of squared errors at network output. Error is calculated for **normalized** input and target arrays. :Parameters: input : 2-D array Array of input patterns target : 2-D array Array of network targets :Returns: err : float 0.5*(sum of squared errors at network outputs) .. note:: This function might be slow in frequent use, because data normalization is performed at each call. Usually there's no need to use this function, unless you need to adopt your own training strategy. """ input, target = self._setnorm(input, target) err = netprop.sqerror(self.weights, self.conec, self.units, \ self.inno, self.outno, input, target) return err
[docs] def sqgrad(self, input, target): """ Returns gradient of network error vs. network weights. Error is calculated for **normalized** input and target arrays. :Parameters: input : 2-D array Array of input patterns target : 2-D array Array of network targets :Returns: grad : 1-D array Array of the same length as *net.weights* containing gradient values. .. note:: This function might be slow in frequent use, because data normalization is performed at each call. Usually there's no need to use this function, unless you need to adopt your own training strategy. """ input, target = self._setnorm(input, target) g = netprop.grad(self.weights, self.conec, self.bconecno, self.units, \ self.inno, self.outno, input, target) return g
[docs] def randomweights(self): """ Randomize network weights due to Bottou proposition. If *n* is a number of incoming connections to the node, weights of these connections are chosen randomly from range *(-2.38/sqrt(n), 2.38/sqrt(n))* """ nofw = len(self.conec) weights = zeros(nofw, 'd') for w in xrange(nofw): trg = self.conec[w,1] n = len(self.graph.predecessors(trg)) bound = 2.38 / sqrt(n) weights[w] = random.uniform(-bound, bound) self.weights = weights self.trained = False
def _testdata(self, input, target): """ Tests input and target data. """ # Test conversion try: if not isinstance(input, ndarray): input = array(input, 'd') #input = array(input, 'd') except: raise ValueError("Input cannot be converted to numpy array") try: if not isinstance(target, ndarray): target = array(target, 'd') #target = array(target, 'd') except: raise ValueError("Target cannot be converted to numpy array") #if input.dtype.char != 'd': input = array(input, 'd') #Convert 1-d arrays to 2-d (this allows to put 1-d arrays #for training if we have one input and/or one output if len(self.inno) == 1 and len(input.shape) == 1: input = input.reshape( (input.shape[0], 1) ) if len(self.outno) == 1 and len(target.shape) == 1: target = target.reshape( (target.shape[0], 1) ) #Test some sizes numip = input.shape[0]; numop = target.shape[0] if numip != numop: raise ValueError \ ("Data not aligned: input patterns %i, target patterns: %i" %(numip, numop)) numi = len(self.inno); numiv = input.shape[1] if numiv != numi: raise ValueError \ ("Inconsistent input data, input units: %i, input values: %i" %(numi, numiv)) numo = len(self.outno); numov = target.shape[1] if numov != numo: raise ValueError \ ("Inconsistent target data, target units: %i, target values: %i" %(numo, numov)) return input, target def _setnorm(self, input = None, target = None): """ Retrieves normalization info from training data and normalizes data. This method sets self.renormalize attribute to control normalization. """ numi = len(self.inno); numo = len(self.outno) if input is None and target is None: self.inlimits = array( [[0.15, 0.85]]*numi ) #informative only self.outlimits = array( [[0.15, 0.85]]*numo ) #informative only self.eni = self.dei = array( [[1., 0.]] * numi ) self.eno = self.deo = array( [[1., 0.]] * numo ) self.ded = ones((numo, numi), 'd') self.renormalize = True # this is set by __init__ else: input, target = self._testdata(input, target) # Warn if any input or target node takes a one single value # I'm still not sure where to put this check.... for i, col in enumerate(input.transpose()): if max(col) == min(col): print "Warning: %ith input node takes always a single value of %f." %(i+1, max(col)) for i, col in enumerate(target.transpose()): if max(col) == min(col): print "Warning: %ith target node takes always a single value of %f." %(i+1, max(col)) #limits are informative only, eni,dei/eno,deo are input/output coding-decoding if self.renormalize: self.inlimits, self.eni, self.dei = _norms(input, lower=0.15, upper=0.85) self.outlimits, self.eno, self.deo = _norms(target, lower=0.15, upper=0.85) self.ded = zeros((numo,numi), 'd') for o in xrange(numo): for i in xrange(numi): self.ded[o,i] = self.eni[i,0] * self.deo[o,0] self.renormalize = False return _normarray(input, self.eni), _normarray(target, self.eno)
[docs] def train_momentum(self, input, target, eta = 0.2, momentum = 0.8, \ maxiter = 10000, disp = 0): """ Simple backpropagation training with momentum. :Parameters: input : 2-D array Array of input patterns target : 2-D array Array of network targets eta : float, optional Learning rate momentum : float, optional Momentum coefficient maxiter : integer, optional Maximum number of iterations disp : bool If True convergence method is displayed """ input, target = self._setnorm(input, target) if disp: err = netprop.sqerror(self.weights, self.conec, self.units, \ self.inno, self.outno, input, target) print "Initial error --> 0.5*(sum of squared errors at output): %.15f" %err self.weights = netprop.momentum(self.weights, self.conec, self.bconecno, \ self.units, self.inno, self.outno, input, \ target, eta, momentum, maxiter) if disp: err = netprop.sqerror(self.weights, self.conec, self.units, \ self.inno, self.outno, input, target) print "Final error --> 0.5*(sum of squared errors at output): %.15f" %err
[docs] def train_rprop(self, input, target, \ a = 1.2, b = 0.5, mimin = 0.000001, mimax = 50., \ xmi = 0.1, maxiter = 10000, disp = 0): """ Rprop training algorithm. :Parameters: input : 2-D array Array of input patterns target : 2-D array Array of network targets a : float, optional Training step increasing parameter b : float, optional Training step decreasing parameter mimin : float, optional Minimum training step mimax : float, optional Maximum training step xmi : array (or float), optional Array containing initial training steps for weights. If *xmi* is a scalar then its value is set for all weights maxiter : integer, optional Maximum number of iterations disp : bool If True convergence method is displayed. Default is *False* :Returns: xmi : array Computed array of training steps to be used in eventual further training calls. """ input, target = self._setnorm(input, target) if type(xmi).__name__ in ['float', 'int']: xmi = [ xmi ]*len(self.conec) if disp: err = netprop.sqerror(self.weights, self.conec, self.units, \ self.inno, self.outno, input, target) print "Initial error --> 0.5*(sum of squared errors at output): %.15f" %err self.weights, xmi = netprop.rprop(self.weights, self.conec, self.bconecno, \ self.units, self.inno, self.outno, input, \ target, a, b, mimin, mimax, xmi, maxiter) if disp: err = netprop.sqerror(self.weights, self.conec, self.units, \ self.inno, self.outno, input, target) print "Final error --> 0.5*(sum of squared errors at output): %.15f" %err return xmi
[docs] def train_genetic(self, input, target, **kwargs): """ Global weights optimization with genetic algorithm. :Parameters: input : 2-D array Array of input patterns target : 2-D array Array of network targets lower : float, optional Lower bound of weights values (default is -25.) upper : float, optional Upper bound of weights values (default is 25.) individuals : integer, optional Number of individuals in a population (default is 20) generations : integer, optional Number of generations over which solution is to evolve (default is 500) verbosity : {0, 1, 2}, optional Printed output 0/1/2=None/Minimal/Verbose (default is 0) .. seealso:: See description of `pikaia.pikaia` optimization function for other parameters. """ input, target = self._setnorm(input, target) lower = -25. upper = 25. if 'lower' in kwargs: lower = kwargs['lower']; del kwargs['lower'] if 'upper' in kwargs: upper = kwargs['upper']; del kwargs['upper'] if lower >= upper: raise ValueError("Wrong weights range: (%f, %f)" %(lower, upper)) if 'individuals' not in kwargs: kwargs['individuals'] = 20 func = netprop.pikaiaff n = len(self.weights) extra_args = (self.conec, self.units, self.inno, self.outno, input, target, lower, upper) self.weights = pikaia(func, n, extra_args, **kwargs) self.weights = netprop.vmapa(self.weights, 0., 1., lower, upper) self.trained = 'genetic'
[docs] def train_cg(self, input, target, **kwargs): """ Train network with conjugate gradient algorithm. :Parameters: input : 2-D array Array of input patterns target : 2-D array Array of network targets maxiter : integer, optional Maximum number of iterations (default is 10000) disp : bool If True convergence method is displayed (default) .. seealso:: `scipy.optimize.fmin_cg` optimizer is used in this method. Look at its documentation for possible other useful parameters. """ if 'maxiter' not in kwargs: kwargs['maxiter'] = 10000 input, target = self._setnorm(input, target) func = netprop.func fprime = netprop.grad extra_args = (self.conec, self.bconecno, self.units, \ self.inno, self.outno, input, target) self.weights = optimize.fmin_cg(func, self.weights, fprime=fprime, \ args=extra_args, **kwargs) self.trained = 'cg'
[docs] def train_bfgs(self, input, target, **kwargs): """ Train network with constrained version of BFGS algorithm. :Parameters: input : 2-D array Array of input patterns target : 2-D array Array of network targets maxfun : int Maximum number of function evaluations (default is 15000) bounds : list, optional *(min, max)* pairs for each connection weight, defining the bounds on that weight. Use None for one of *min* or *max* when there is no bound in that direction. By default all bounds ar set to (-100, 100) disp : int, optional If 0, then no output (default). If positive number then convergence messages are dispalyed. .. seealso:: `scipy.optimize.fmin_l_bfgs_b` optimizer is used in this method. Look at its documentation for possible other useful parameters. """ if sys.platform.startswith('aix'): return input, target = self._setnorm(input, target) if 'bounds' not in kwargs: kwargs['bounds'] = ((-100., 100.),)*len(self.conec) func = netprop.func fprime = netprop.grad extra_args = (self.conec, self.bconecno, self.units, \ self.inno, self.outno, input, target) self.weights = optimize.fmin_l_bfgs_b(func, self.weights, fprime=fprime, \ args=extra_args, **kwargs)[0] self.trained = 'bfgs'
[docs] def train_tnc(self, input, target, nproc = 1, **kwargs): """ :Parameters: input : 2-D array Array of input patterns target : 2-D array Array of network targets nproc : int or 'ncpu', optional Number of processes spawned for training. If nproc='ncpu' nproc will be set to number of avilable processors maxfun : int Maximum number of function evaluation. If None, maxfun is set to max(100, 10*len(weights)). Defaults to None. bounds : list, optional *(min, max)* pairs for each connection weight, defining the bounds on that weight. Use None for one of *min* or *max* when there is no bound in that direction. By default all bounds ar set to (-100, 100) messages : int, optional If 0, then no output (default). If positive number then convergence messages are dispalyed. .. note:: On Windows using *ncpu > 1* might be memory hungry, because each process have to load its own instance of network and training data. This is not the case on Linux platforms. .. seealso:: `scipy.optimize.fmin_tnc` optimizer is used in this method. Look at its documentation for possible other useful parameters. """ input, target = self._setnorm(input, target) if 'messages' not in kwargs: kwargs['messages'] = 0 if 'bounds' not in kwargs: kwargs['bounds'] = ((-100., 100.),)*len(self.conec) # multiprocessing version if nproc > 1 if (isinstance(nproc, int) and nproc > 1) or nproc in (None, 'ncpu'): if nproc == 'ncpu': nproc = None self._train_tnc_mp(input, target, nproc = nproc, **kwargs) return # single process version func = netprop.func fprime = netprop.grad extra_args = (self.conec, self.bconecno, self.units, \ self.inno, self.outno, input, target) res = optimize.fmin_tnc(func, self.weights, fprime=fprime, \ args=extra_args, **kwargs) self.weights = array( res[0] ) self.trained = 'tnc'
def _train_tnc_mp(self, input, target, nproc = None, **kwargs): """ Parallel training with TNC algorithm Standard multiprocessing package is used here. """ #register training data at mpprop module level # this have to be done *BEFORE* creating pool import _mpprop as mpprop try: key = max(mpprop.nets) + 1 except ValueError: key = 0 # uniqe identifier for this training mpprop.nets[key] = self mpprop.inputs[key] = input mpprop.targets[key] = target # create processing pool from multiprocessing import Pool, cpu_count if nproc is None: nproc = cpu_count() if sys.platform.startswith('win'): # we have to initialize processes in pool on Windows, because # each process reimports mpprop thus the registering # made above is not enough # WARNING: this might be slow and memory hungry # (no shared memory, all is serialized and copied) initargs = [key, self, input, target] pool = Pool(nproc, initializer = mpprop.initializer, initargs=initargs) else: pool = Pool(nproc) # save references for later cleaning self._mppool = pool self._mpprop = mpprop self._mpkey = key # generate splitters for training data splitters = mpprop.splitdata(len(input), nproc) # train func = mpprop.mpfunc fprime = mpprop.mpgrad #if 'messages' not in kwargs: kwargs['messages'] = 0 #if 'bounds' not in kwargs: kwargs['bounds'] = ((-100., 100.),)*len(self.conec) res = optimize.fmin_tnc(func, self.weights, fprime = fprime, \ args = (pool, splitters, key), **kwargs) self.weights = res[0] # clean mpprop and pool self._clean_mp() def _clean_mp(self): pool = self._mppool mpprop = self._mpprop key = self._mpkey # clean mpprop del mpprop.nets[key] del mpprop.inputs[key] del mpprop.targets[key] del self._mpprop # we do not want to keep this del self._mpkey # terminate and remove pool pool.terminate() del pool del self._mppool # if not removed this class couldn't be pickled!
[docs] def test(self, input, target, iprint = 1, filename = None): """ Calculates output and parameters of regression. :Parameters: input : 2-D array Array of input patterns target : 2-D array Array of network targets iprint : {0, 1, 2}, optional Verbosity level: 0 -- print nothing, 1 -- print regression parameters for each output node (default), 2 -- print additionaly general network info and all targets vs. outputs filename : str Path to the file where printed messages are redirected Default is None :Returns: out : tuple *(output, regress)* tuple where: *output* is an array of network answers on input patterns and *regress* contains regression parameters for each output node. These parameters are: *slope, intercept, r-value, p-value, stderr-of-slope, stderr-of-estimate*. :Examples: >>> from ffnet import mlgraph, ffnet >>> from numpy.random import rand >>> conec = mlgraph((3,3,2)) >>> net = ffnet(conec) >>> input = rand(50,3); target = rand(50,2) >>> output, regress = net.test(input, target) Testing results for 50 testing cases: OUTPUT 1 (node nr 8): Regression line parameters: slope = -0.000649 intercept = 0.741282 r-value = -0.021853 p-value = 0.880267 slope stderr = 0.004287 estim. stderr = 0.009146 . OUTPUT 2 (node nr 7): Regression line parameters: slope = 0.005536 intercept = 0.198818 r-value = 0.285037 p-value = 0.044816 slope stderr = 0.002687 estim. stderr = 0.005853 Exemplary plot: .. plot:: :include-source: from ffnet import mlgraph, ffnet from numpy.random import rand from numpy import linspace import pylab # Create and train net on random data conec = mlgraph((3,10,2)) net = ffnet(conec) input = rand(50,3); target = rand(50,2) net.train_tnc(input, target, maxfun = 400) output, regress = net.test(input, target, iprint = 0) # Plot results for first output pylab.plot(target.T[0], output.T[0], 'o', label='targets vs. outputs') slope = regress[0][0]; intercept = regress[0][1] x = linspace(0,1) y = slope * x + intercept pylab.plot(x, y, linewidth = 2, label = 'regression line') pylab.legend() pylab.show() """ # Check if we dump stdout to the file if filename: import sys file = open(filename, 'w') saveout = sys.stdout sys.stdout = file # Print network info if iprint == 2: print self print # Test data and get output input, target = self._testdata(input, target) nump = len(input) output = self(input) #array([self(inp) for inp in input]) # Calculate regression info from scipy.stats import linregress numo = len(self.outno) target = target.transpose() output = output.transpose() regress = [] if iprint: print "Testing results for %i testing cases:" % nump for o in xrange(numo): if iprint: print "OUTPUT %i (node nr %i):" %(o+1, self.outno[o]) if iprint == 2: print "Targets vs. outputs:" for p in xrange(nump): print "%4i % 13.6f % 13.6f" %(p+1, target[o,p], output[o,p]) x = target[o]; y = output[o] r = linregress(x, y) # linregress calculates stderr of the slope instead of the estimate, even # though the docs say something else. we calculate the thing here manually sstd = r[-1] estd = sstd * sqrt( ( ( x-x.mean() )**2 ).sum() ) r += (estd,) if iprint: print "Regression line parameters:" print "slope = % f" % r[0] print "intercept = % f" % r[1] print "r-value = % f" % r[2] print "p-value = % f" % r[3] print "slope stderr = % f" % r[4] print "estim. stderr = % f" % r[5] regress.append(r) if iprint: print # Close file and restore stdout if filename: file.close() sys.stdout = saveout return output.transpose(), regress
[docs]def savenet(net, filename): """ Dumps network to a file using cPickle. :Parameters: net : ffnet Intance of the network filename : str Path to the file where network is dumped """ import cPickle if cPickle.format_version >= '3.0': #Py3k. Cannot pickle numpy array in 3 and have it readable by 2, #so don't even try for compatibility. Force binary mode. import warnings warnings.warn( 'Network files written with Python 3 not readable on Python 2') file = open(filename, 'wb') else: file = open(filename, 'w') cPickle.dump(net, file) file.close() return
[docs]def loadnet(filename): """ Loads network pickled previously with `savenet`. :Parameters: filename : str Path to the file with saved network """ import cPickle if cPickle.format_version >= '3.0': #Py3k, need to read in binary format to unpickle file = open(filename, 'rb') net = cPickle.load(file, encoding='latin-1') else: try: file = open(filename, 'rU') net = cPickle.load(file) except ImportError: # when reading Windows \r\n endlines on Linux import pickle # cPickle seems to not work with universal endlines file = open(filename, 'rU') net = pickle.load(file) return net
def _exportfortran(net, filename, name, derivative = True): """ Exports network to Fortran source """ import _py2f as py2f f = open( filename, 'w' ) f.write( py2f.fheader( net, version = version ) ) f.write( py2f.fcomment() ) f.write( py2f.ffnetrecall(net, name) ) if derivative: if net.dconecno is None: net._set_dconec() #set on demand f.write( py2f.fcomment() ) f.write( py2f.ffnetdiff(net, 'd' + name) ) f.close()
[docs]def exportnet(net, filename, name = 'ffnet', lang = None, derivative = True): """ Exports network to a compiled language source code. :Parameters: filename : str Path to the file where network is exported name : str Name of the exported function lang : str Language to which network is to be exported. Currently only Fortran is supported derivative : bool If *True* a function for derivative calculation is also exported. It is named as *name* with prefix 'd' .. note:: You need 'ffnet.f' file distributed with ffnet sources to get the exported Fortran routines to work. """ # Determine language if not specified if not lang: import os.path fname, ext = os.path.splitext(filename) if ext in ['.f', '.for', '.f90']: lang = 'fortran' if lang == 'fortran': _exportfortran(net, filename, name, derivative = derivative) else: if lang: raise TypeError("Unsupported language " + lang) else: raise TypeError("Unspecified language") return
[docs]def readdata(filename, **kwargs): """ Reads arrays from ASCII files. .. note:: This function just calls `numpy.loadtxt` passing to it all keyword arguments. Refer to this function for possible options. """ from numpy import loadtxt data = loadtxt(filename, **kwargs) return data