Source code for ElementsKernel.Temporary

#
# Copyright (C) 2012-2020 Euclid Science Ground Segment
#
# This library is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3.0 of the License, or (at your option)
# any later version.
#
# This library is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this library; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#

''' create temporary structure for remote builds

:author: Hubert Degaudenzi

'''

from shutil import rmtree

import os
import tempfile
import sys

from ElementsKernel import Logging

if not 'mkdtemp' in dir(tempfile):
    # mkdtemp has been introduced in python 2.3, I simulate it
    import warnings
    warnings.filterwarnings(
        action='ignore', message='.*tmpnam.*', category=RuntimeWarning)

    def mkdtemp():
        """Replacement for the missing function in the tempfile module"""
        # pylint: disable=no-member
        name = os.tmpnam()  # @UndefinedVariable
        os.mkdir(name, 0o700)
        return name

    def mkstemp():
        """Replacement for the missing function in the tempfile module"""
        # pylint: disable=no-member
        name = os.tmpnam()  # @UndefinedVariable
        return (os.open(name, os.O_CREAT | os.O_RDWR | os.O_EXCL, 0o600),
                name)

else:
    # use the real mkdtemp
    from tempfile import mkdtemp, mkstemp

DEFAULT_TMP_KEEP_VAR = "KEEPTEMPDIR"
LOGGER = Logging.getLogger(__name__)


[docs]class TempResource(object): """ Base class for the temporary file and temporary directory""" def __init__(self, keep_var=DEFAULT_TMP_KEEP_VAR): """ Constructor """ self._name = None self._keep_var = keep_var self._getResource()
[docs] def getName(self): """Returns the name of the temporary directory""" return self._name
[docs] def path(self): """ Retuns the path to the resource """ return self._name
def __str__(self): """Convert to string.""" return self.getName() def _destruct(self): """Internal function to remove the resource""" if self._name: if self._keep_var in os.environ: LOGGER.info("%s set: I do not remove the '%s' temporary path", self._keep_var, self._name) else: self._delResource() self._name = None def __del__(self): """Destructor. Remove the temporary directory. """ self._destruct() def __enter__(self): """ To work with the context""" self._getResource() return self def __exit__(self, *_): """ function called at the end of a context """ self._destruct() def __getattr__(self, attr): return getattr(self._file, attr)
[docs]class TempDir(TempResource): """Class to create a temporary directory.""" def __init__(self, suffix="", prefix="tmp", base_dir=None, keep_var=DEFAULT_TMP_KEEP_VAR): """Constructor. 'keep_var' is used to define which environment variable will prevent the deletion of the directory. The other arguments are the same as tempfile.mkdtemp. """ self._suffix = suffix self._prefix = prefix self._base_dir = base_dir super(TempDir, self).__init__(keep_var) def _getResource(self): """ Internal function to get the resource""" if not self._name: # pylint: disable=too-many-function-args self._name = mkdtemp(self._suffix, self._prefix, self._base_dir) def _delResource(self): """Internal function to remove the resource""" rmtree(self._name)
[docs]class TempFile(TempResource): """ class to create a temporary file """ def __init__(self, keep_var=DEFAULT_TMP_KEEP_VAR): """ Constructor """ self._file = None super(TempFile, self).__init__(keep_var) def _getResource(self): """ Internal function to get the resource""" if not self._name: fd, self._name = mkstemp() self._file = os.fdopen(fd, "w+") def _delResource(self): """ Internal function to remove the resource """ self._file.close() self._file = None os.remove(self._name)
[docs]class Environment(object): """ Class to changes the environment temporarily. """ def __init__(self, orig=None, keep_same=False): """ Create a temporary environment on top of the one specified (it can be another TemporaryEnvironment instance). """ if orig is None: orig = os.environ self.old_values = {} self._orig = orig self.env = None self._keep_same = keep_same self._aqEnv() # the keys of the environment dictionary are case insensitive on # windows if sys.platform.startswith("win"): self._fix_key = lambda key: key.upper() else: self._fix_key = lambda key: key def _aqEnv(self): if not self.env: self.env = self._orig def __setitem__(self, key, value): """ Set an environment variable recording the previous value. """ key = self._fix_key(key) if key not in self.old_values: if key in self.env: if not self._keep_same or self.env[key] != value: self.old_values[key] = self.env[key] else: self.old_values[key] = None self.env[key] = value def __getitem__(self, key): """ Get an environment variable. Needed to provide the same interface as os.environ. """ key = self._fix_key(key) return self.env[key] def __delitem__(self, key): """ Unset an environment variable. Needed to provide the same interface as os.environ. """ key = self._fix_key(key) if key not in self.env: raise KeyError(key) self.old_values[key] = self.env[key] del self.env[key] LOGGER.info("removed %s from environment", key)
[docs] def keys(self): """ Return the list of defined environment variables. Needed to provide the same interface as os.environ. """ return self.env.keys()
[docs] def has_key(self, key): # pylint: disable=invalid-name """ return True if the key is present """ key = self._fix_key(key) return key in self.env.keys()
[docs] def items(self): """ Return the list of (name,value) pairs for the defined environment variables. Needed to provide the same interface as os.environ. """ return self.env.items()
def __contains__(self, key): """ Operator 'in'. Needed to provide the same interface as os.environ. """ key = self._fix_key(key) return key in self.env
[docs] def restore(self): """ Revert all the changes done to the original environment. """ for key, value in self.old_values.items(): if value is None: del self.env[key] else: self.env[key] = value self.old_values = {}
def __del__(self): """ Revert the changes on destruction. """ self.restore() def __enter__(self): """ To work with the context""" self._aqEnv() return self def __exit__(self, *_): """ Revert the changes on the end of the context. """ self.restore()
[docs] def get(self, key, default=None): """ Implementation of the standard get method of a dictionary: return the value associated to "key" if present, otherwise return the default. """ key = self._fix_key(key) return self.env.get(key, default)
[docs] def commit(self): """ Forget the old values for the changes done so far (avoids that the changes are rolled-back when the instance goes out of scope). """ self.old_values = {}
[docs] def gen_script(self, shell_type): # pylint: disable=invalid-name """ Generate a shell script to reproduce the changes in the environment. """ shells = ['csh', 'sh', 'bat'] if shell_type not in shells: raise RuntimeError( "Shell type '%s' unknown. Available: %s" % (shell_type, shells)) out = "" for key in self.old_values: if key not in self.env: # unset variable if shell_type == 'csh': out += 'unsetenv %s\n' % key elif shell_type == 'sh': out += 'unset %s\n' % key elif shell_type == 'bat': out += 'set %s=\n' % key else: # set variable if shell_type == 'csh': out += 'setenv %s "%s"\n' % (key, self.env[key]) elif shell_type == 'sh': out += 'export %s="%s"\n' % (key, self.env[key]) elif shell_type == 'bat': out += 'set %s=%s\n' % (key, self.env[key]) return out
TempEnv = Environment