Source code for copt.stochastic

import concurrent.futures
from datetime import datetime
import numpy as np
from scipy import sparse, optimize
from numba import njit
from copt import utils


[docs]def minimize_SAGA( f, g=None, x0=None, step_size=None, n_jobs: int=1, max_iter=100, tol=1e-6, verbose=False, trace=False) -> optimize.OptimizeResult: """Stochastic average gradient augmented (SAGA) algorithm. The SAGA algorithm can solve optimization problems of the form argmin_x f(x) + g(x) Parameters ---------- f, g loss functions. g can be none x0: np.ndarray or None, optional Starting point for optimization. step_size: float or None, optional Step size for the optimization. If None is given, this will be estimated from the function f. n_jobs: int Number of threads to use in the optimization. A number higher than 1 will use the Asynchronous SAGA optimization method described in [Leblond et al., 2017] max_iter: int Maximum number of passes through the data in the optimization. tol: float Tolerance criterion. The algorithm will stop whenever the norm of the gradient mapping (generalization of the gradient for nonsmooth optimization) is below tol. verbose: bool Verbosity level. True might print some messages. trace: bool Whether to trace convergence of the function, useful for plotting and/or debugging. If ye, the result will have extra members trace_func, trace_time. Returns ------- opt: OptimizeResult The optimization result represented as a ``scipy.optimize.OptimizeResult`` object. Important attributes are: ``x`` the solution array, ``success`` a Boolean flag indicating if the optimizer exited successfully and ``message`` which describes the cause of the termination. See `scipy.optimize.OptimizeResult` for a description of other attributes. References ---------- Aaron Defazio, Francis Bach, and Simon Lacoste-Julien. `SAGA: A fast incremental gradient method with support for non-strongly convex composite objectives. <https://arxiv.org/abs/1407.0202>`_ Advances in Neural Information Processing Systems. 2014. Rémi Leblond, Fabian Pedregosa, Simon Lacoste-Julien. `ASAGA: Asynchronous parallel SAGA <https://arxiv.org/abs/1606.04809>`_. Proceedings of the 20th International Conference on Artificial Intelligence and Statistics (AISTATS). 2017. """ if x0 is None: x = np.zeros(f.n_features) else: x = np.ascontiguousarray(x0).copy() if step_size is None: step_size = 1. / (3 * f.lipschitz_constant()) if g is None: g = utils.DummyLoss() success = False epoch_iteration = _factory_sparse_SAGA(f, g) n_samples, n_features = f.A.shape # .. memory terms .. memory_gradient = np.zeros(n_samples) gradient_average = np.zeros(n_features) if trace: trace_x = np.zeros((max_iter, n_features)) else: trace_x = np.zeros((0, 0)) stop_flag = np.zeros(1, dtype=np.bool) trace_func = [] trace_time = [] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for job_id in range(n_jobs): futures.append(executor.submit( epoch_iteration, x, memory_gradient, gradient_average, step_size, max_iter, job_id, tol, stop_flag, trace, trace_x, np.random.permutation(n_samples), True)) start_time = datetime.now() n_iter, certificate = futures[0].result() delta = (datetime.now() - start_time).total_seconds() concurrent.futures.wait(futures) if trace: trace_time = np.linspace(0, delta, n_iter) if verbose: print('Computing trace') # .. compute function values .. trace_func = [] for i in range(n_iter): # TODO: could be parallelized trace_func.append(f(trace_x[i]) + g(trace_x[i])) if certificate < tol: success = True return optimize.OptimizeResult( x=x, success=success, nit=n_iter, trace_func=trace_func, trace_time=trace_time, certificate=certificate)
@njit(nogil=True) def _support_matrix( A_indices, A_indptr, g_blocks, n_blocks): """ """ if n_blocks == 1: # XXX FIXME do something smart pass BS_indices = np.zeros(A_indices.size, dtype=np.int64) BS_indptr = np.zeros(A_indptr.size, dtype=np.int64) seen_blocks = np.zeros(n_blocks, dtype=np.int64) BS_indptr[0] = 0 counter_indptr = 0 for i in range(A_indptr.size - 1): low = A_indptr[i] high = A_indptr[i + 1] for j in range(low, high): g_idx = g_blocks[A_indices[j]] if seen_blocks[g_idx] == 0: # if first time we encouter this block, # add to the index and mark as seen BS_indices[counter_indptr] = g_idx seen_blocks[g_idx] = 1 counter_indptr += 1 BS_indptr[i+1] = counter_indptr # cleanup for j in range(BS_indptr[i], counter_indptr): seen_blocks[BS_indices[j]] = 0 BS_data = np.ones(counter_indptr) return BS_data, BS_indices[:counter_indptr], BS_indptr def _factory_sparse_SAGA(f, g): A = sparse.csr_matrix(f.A) b = f.b f_alpha = f.alpha A_data = A.data A_indices = A.indices A_indptr = A.indptr n_samples, n_features = A.shape partial_gradient = f.partial_gradient_factory() prox = g.prox_factory() # .. compute the block support .. if g.is_separable: g_blocks = np.arange(n_features) else: raise NotImplementedError # g_blocks is a map from n_features -> n_features unique_blocks = np.unique(g_blocks) n_blocks = np.unique(g_blocks).size assert np.all(unique_blocks == np.arange(n_blocks)) BS_data, BS_indices, BS_indptr = _support_matrix( A_indices, A_indptr, g_blocks, n_blocks) BS = sparse.csr_matrix((BS_data, BS_indices, BS_indptr), (n_samples, n_blocks)) d = np.array(BS.sum(0), dtype=np.float).ravel() idx = (d != 0) d[idx] = n_samples / d[idx] d[~idx] = 0. @njit(nogil=True) def _saga_algorithm( x, memory_gradient, gradient_average, step_size, max_iter, job_id, tol, stop_flag, trace, trace_x, sample_indices, async): # .. SAGA estimate of the gradient .. x_old = x.copy() cert = np.inf it = 0 if job_id == 0 and trace: trace_x[0, :] = x # .. inner iteration .. for it in range(1, max_iter): np.random.shuffle(sample_indices) for i in sample_indices: p = 0. for j in range(A_indptr[i], A_indptr[i+1]): j_idx = A_indices[j] p += x[j_idx] * A_data[j] grad_i = partial_gradient(p, b[i]) # .. update coefficients .. for j in range(A_indptr[i], A_indptr[i+1]): j_idx = A_indices[j] incr = (grad_i - memory_gradient[i]) * A_data[j] incr += d[j_idx] * (gradient_average[j_idx] + f_alpha * x[j_idx]) x[j_idx] = prox(x[j_idx] - step_size * incr, step_size * d[j_idx]) # .. update memory terms .. for j in range(A_indptr[i], A_indptr[i+1]): j_idx = A_indices[j] gradient_average[j_idx] += ( grad_i - memory_gradient[i]) * A_data[j] / n_samples memory_gradient[i] = grad_i if job_id == 0: if trace: trace_x[it, :] = x # .. convergence check .. cert = np.linalg.norm(x - x_old) / step_size x_old[:] = x if cert < tol: stop_flag[0] = True break if async: # .. recompute alpha bar .. grad_tmp = np.zeros(n_features) for i in sample_indices: for j in range(A_indptr[i], A_indptr[i + 1]): j_idx = A_indices[j] grad_tmp[j_idx] += memory_gradient[i] * A_data[j] / n_samples # .. copy back to shared memory .. gradient_average[:] = grad_tmp if stop_flag[0]: break # .. if any job has finished, stop the whole algorithm .. stop_flag[0] = True return it, cert return _saga_algorithm def minimize_BCD( f, g=None, x0=None, step_size=None, max_iter=500, trace=False, verbose=False, n_jobs=1): """Block Coordinate Descent Parameters ---------- f g x0 Returns ------- """ if x0 is None: xk = np.zeros(f.n_features) else: xk = np.array(x0, copy=True) if g is None: g = utils.DummyLoss() if step_size is None: step_size = 2. / f.lipschitz_constant() Ax = f.A.dot(xk) f_alpha = f.alpha n_samples, n_features = f.A.shape success = False partial_gradient = f.partial_gradient_factory() prox = g.prox_factory() start_time = datetime.now() if trace: trace_x = np.zeros((max_iter, n_features)) else: trace_x = np.zeros((0, 0)) stop_flag = np.zeros(1, dtype=np.bool) @njit(nogil=True) def _bcd_algorithm( x, Ax, A_csr_data, A_csr_indices, A_csr_indptr, A_csc_data, A_csc_indices, A_csc_indptr, b, trace_x, job_id): feature_indices = np.arange(n_features) if job_id == 0: # .. recompute Ax (TODO: do only in async) .. for i in range(n_samples): p = 0. for j in range(A_csr_indptr[i], A_csr_indptr[i + 1]): j_idx = A_csr_indices[j] p += x[j_idx] * A_csr_data[j] # .. copy back to shared memory .. Ax[i] = p for it in range(1, max_iter): np.random.shuffle(feature_indices) for j in feature_indices: grad_j = 0. for i_indptr in range(A_csc_indptr[j], A_csc_indptr[j+1]): # get the current sample i_idx = A_csc_indices[i_indptr] grad_j += partial_gradient(Ax[i_idx], b[i_idx]) * A_csc_data[i_indptr] / n_samples x_new = prox(x[j] - step_size * (grad_j + f_alpha * x[j]), step_size) # .. update Ax .. for i_indptr in range(A_csc_indptr[j], A_csc_indptr[j+1]): i_idx = A_csc_indices[i_indptr] Ax[i_idx] += A_csc_data[i_indptr] * (x_new - x[j]) x[j] = x_new if job_id == 0: if trace: trace_x[it, :] = x # .. recompute Ax .. for i in range(n_samples): p = 0. for j in range(A_csr_indptr[i], A_csr_indptr[i+1]): j_idx = A_csr_indices[j] p += x[j_idx] * A_csr_data[j] # .. copy back to shared memory .. Ax[i] = p return it, None X_csc = sparse.csc_matrix(f.A) X_csr = sparse.csr_matrix(f.A) trace_func = [] start = datetime.now() trace_time = [(start - datetime.now()).total_seconds()] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for job_id in range(n_jobs): futures.append(executor.submit( _bcd_algorithm, xk, Ax, X_csr.data, X_csr.indices, X_csr.indptr, X_csc.data, X_csc.indices, X_csc.indptr, f.b, trace_x, job_id)) concurrent.futures.wait(futures) n_iter, certificate = futures[0].result() if trace: delta = (datetime.now() - start_time).total_seconds() trace_time = np.linspace(0, delta, n_iter) if verbose: print('Computing trace') # .. compute function values .. trace_func = [] for i in range(n_iter): # TODO: could be parallelized trace_func.append(f(trace_x[i]) + g(trace_x[i])) return optimize.OptimizeResult( x=xk, success=success, nit=n_iter, trace_func=trace_func, trace_time=trace_time, certificate=certificate)