Source code for heat.engine.snapshots

#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import tenacity
import time

from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils as oslo_timeutils
from oslo_utils import uuidutils

from heat.common import context as heat_context
from heat.common import exception
from heat.common import timeutils as heat_timeutils
from heat.engine import scheduler
from heat.engine import stack as parser
from heat.engine import sync_point
from heat.objects import resource_snapshot as rsrc_snapshot_objects
from heat.objects import snapshot as snapshot_object
from heat.rpc import worker_client as rpc_worker_client


LOG = logging.getLogger(__name__)


[docs] class Snapshot(object): """Operate snapshot actions under convergence.""" ACTIONS = ( CREATE, DELETE ) = ( 'CREATE', 'DELETE' ) STATUSES = ( IN_PROGRESS, FAILED, COMPLETE ) = ( 'IN_PROGRESS', 'FAILED', 'COMPLETE' ) def __init__(self, context, snapshot_id, stack_id, start_time, thread_group_mgr, resources=None, action=None, timeout_mins=None, is_stack_delete=False, current_traversal=None): self.context = context self.id = snapshot_id self.stack_id = stack_id self.action = self.DELETE if action is None else action self.is_stack_delete = is_stack_delete self.current_traversal = current_traversal if ( current_traversal is not None) else uuidutils.generate_uuid() self.thread_group_mgr = thread_group_mgr self._worker_client = None self.start_time = start_time self.timeout_mins = timeout_mins self.resources = resources
[docs] def delete_snapshot(self): snapshot_obj = snapshot_object.Snapshot.get_snapshot( self.context, self.id, load_rsrc_snapshot=True) # NOTE(tkajinam): Check None for old services without action field if (snapshot_obj.action in (None, self.CREATE) and snapshot_obj.status == self.IN_PROGRESS): msg = _('Deleting in-progress snapshot') raise exception.NotSupported(feature=msg) if self.resources is None: self.resources = snapshot_obj.data['resources'] # TODO(ricolin) send notification and add event LOG.debug("Start deleting snapshot %s.", self.id) self.thread_group_mgr.start(self.stack_id, self.do_delete_snapshot)
[docs] def do_delete_snapshot(self): if self.resources: sync_point.create(self.context, self.id, self.current_traversal, True, self.stack_id) LOG.debug("Create sync_point for (snapshot, traversal, " "is_update): %s.", (self.id, self.current_traversal, True)) start_time = self.start_time.strftime( heat_timeutils.str_duration_format) for rsrc_name in self.resources: self.worker_client.check_resource_delete_snapshot( self.context, self.id, rsrc_name, start_time, is_stack_delete=self.is_stack_delete, current_traversal=self.current_traversal) if scheduler.ENABLE_SLEEP: time.sleep(1) else: self.mark_complete()
[docs] def mark_complete(self, predecessors=None, input_data=None): if predecessors is None: predecessors = [] self.delete_snapshot_objs() # Clear sync_points for snapshot sync_point.delete_all(self.context, self.id, self.current_traversal) LOG.debug("Clear all sync_points for (snapshot, traversal): %s.", (self.id, self.current_traversal)) if self.is_stack_delete: if input_data is None: sender_key = parser.ConvergenceNode(self.id, True) input_data = {sender_key: None} try: update = sync_point.update_sync_point( self.context, self.stack_id, self.current_traversal, True, predecessors=predecessors, new_data=input_data) if not update: LOG.debug("Fail to update stack %(stack)s sync_point entry" "after snapshot %(snapshot_id)s competed", {'snapshot_id': self.id, 'stack': self.stack_id}) return False except exception.EntityNotFound: LOG.debug("Ignore EntityNotFound: Stack sync_point entity %s " "already deleted, this is because other snapshot " "delete already failed.", (self.stack_id, self.current_traversal, True)) LOG.info("Snapshot %(snapshot_id)s for stack %(stack_id)s %(action)s " "complete.", {'snapshot_id': self.id, 'stack_id': self.stack_id, 'action': self.action}) return True
[docs] def mark_failed(self, rsrc_name, failure_reason): LOG.info("Snapshot %(snapshot_id)s for Stack %(stack_id)s %(action)s " "failed. Snapshot resource: %(rsrc_name)s %(action)s failed " "with reason: %(reason)s.", {'snapshot_id': self.id, 'stack_id': self.stack_id, 'action': self.action, 'rsrc_name': rsrc_name, 'reason': failure_reason}) try: snapshot_object.Snapshot.update( self.context, self.id, {'status': self.FAILED, 'action': self.action, 'status_reason': failure_reason}) except Exception: LOG.debug("Failed to update snapshot %(snapshot_id)s status.", {'snapshot_id': self.id}) # Clear sync_points for snapshot sync_point.delete_all(self.context, self.id, self.current_traversal) LOG.debug("Clear all sync_points for (snapshot, traversal): %s.", (self.id, self.current_traversal)) if self.is_stack_delete: try: update = sync_point.update_sync_point( self.context, self.stack_id, self.current_traversal, True, predecessors=[], new_rsrc_failure=failure_reason) if not update: LOG.debug("Fail to update stack %(stack)s sync_point " "entry after snapshot %(snapshot_id)s failed.", {'snapshot_id': self.id, 'stack': self.stack_id}) return False except exception.EntityNotFound: LOG.debug("Ignore EntityNotFound: Stack sync_point entity %s " "already deleted, this is because other snapshot " "delete already failed.", (self.stack_id, self.current_traversal, True)) return True
[docs] def delete_snapshot_objs(self): LOG.debug("Start to clear objects for snapshot %(snapshot_id)s.", {'snapshot_id': self.id}) rsrc_snapshot_objects.ResourceSnapshot.delete_all_by_snapshot( self.context, self.id) snapshot_object.Snapshot.delete(self.context, self.id)
@property def worker_client(self): """Return a client for making engine RPC calls.""" if not self._worker_client: self._worker_client = rpc_worker_client.WorkerClient() return self._worker_client
[docs] def timeout_secs(self): """Return the action timeout in seconds.""" if self.timeout_mins is None: return cfg.CONF.stack_action_timeout return self.timeout_mins * 60
[docs] def time_elapsed(self): """Time elapsed in seconds since the Snapshot operation started.""" return (oslo_timeutils.utcnow() - self.start_time).total_seconds()
[docs] def time_remaining(self): """Time left before Snapshot times out.""" return self.timeout_secs() - self.time_elapsed()
[docs] def has_timed_out(self): """Returns True if this Snapshot has timed-out.""" return self.time_elapsed() > self.timeout_secs()
[docs] def delete_snapshots(context, snapshot_ids, stack_id, current_traversal, start_time, thread_group_mgr, run_till_success=True): if not snapshot_ids: return # create sync_point entry for stack. When snapshot delete completed/failed, # it will update to sync_point entry for stack. sync_point.create( context, stack_id, current_traversal, True, stack_id) LOG.debug("Create sync_point for (stack, traversal, is_update): %s", (stack_id, current_traversal, True)) # avoid reuse current db session to multiple threads. cnxt = context.to_dict() # Iterate over a copy to safely remove items for snapshot_id in list(snapshot_ids): snapshot = Snapshot( context=heat_context.RequestContext.from_dict(cnxt), snapshot_id=snapshot_id, stack_id=stack_id, start_time=start_time, thread_group_mgr=thread_group_mgr, action=Snapshot.DELETE, is_stack_delete=True, current_traversal=current_traversal) try: snapshot.delete_snapshot() except exception.NotFound: LOG.info("Snapshot %(snapshot)s for stack %(stack)s is " "already deleted.", {'snapshot': snapshot_id, 'stack': stack_id}) # Since it's not found in object, we don't need to update it to # stack sync_point entry. snapshot_ids.remove(snapshot_id) if run_till_success: allow_running_time = cfg.CONF.stack_action_timeout - ( oslo_timeutils.utcnow() - start_time).total_seconds() wait_strategy = tenacity.wait_random_exponential(max=60) count_num_snapshots = len(snapshot_ids) def init_jitter(existing_input_data): nconflicts = max( 0, count_num_snapshots - len(existing_input_data) - 1) # 10ms per potential conflict, up to a max of 10s in total return min(nconflicts, 1000) * 0.01 @tenacity.retry( retry=tenacity.retry_if_result(lambda r: r is False), wait=wait_strategy, stop=tenacity.stop_after_delay(allow_running_time) ) def _wait_for_complete(): s_p = sync_point.get(context, stack_id, current_traversal, True) input_data = sync_point.deserialize_input_data(s_p.input_data) extra_data = sync_point.deserialize_extra_data( s_p.extra_data) if s_p.extra_data is not None else {} rsrc_failures = extra_data.get("resource_failures", {}) if rsrc_failures: return str(rsrc_failures) snaps = set(parser.ConvergenceNode(s, True) for s in snapshot_ids) waiting = snaps - set(input_data) wait_strategy.multiplier = init_jitter(input_data) if not waiting: return return False failure_reason = _wait_for_complete() # Clear sync_points for stack sync_point.delete_all(context, stack_id, current_traversal) LOG.debug("Clear all sync_points for (stack, traversal): %s.", (stack_id, current_traversal)) return failure_reason