Source code for jsontas.jsontas

# Copyright 2020 Axis Communications AB.
#
# For a full list of individual contributors, please see the commit history.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""JSONTas module."""
import json
import logging
from collections import OrderedDict
from copy import deepcopy
from jsontas.dataset import Dataset


[docs]class JsonTas: """JSONTas resolver.""" logger = logging.getLogger("JSONTas") def __init__(self, dataset=None): """Initialize dataset. :param dataset: In order to provide a custom dataset class. :type dataset: :obj:`jsontas.dataset.Dataset` """ if dataset is not None: self.dataset = dataset else: self.dataset = Dataset() @staticmethod def __get_key(key, dictionary): """Get a key from parent dictionary if possible. If key does not exist, return either the dictionary supplied or an empty dictionary. :param key: Key to get from dictionary. :type key: any :param dictionary: Dictionary to get key from. :type dictionary: dict :return: Value from dictionary. :rtype: dict """ try: parameters = dictionary[key] except TypeError: parameters = dictionary if dictionary is None: parameters = {} return parameters def __resolve(self, query_string, parent=None): """Resolve a JSONTas query string against dataset. :param query_string: Query string to resolve. :type query_string: string :param parent: Parent dictionary where the query_string resides. :type parent: any :return: Resolved key and value pairs. Or the key and value pair already set. :rtype: tuple """ value = None key = query_string if isinstance(query_string, str) and query_string.startswith("$"): self.logger.debug("Executing JSONTas query %r", query_string) parameters = self.__get_key(query_string, parent) key, value = self.dataset.lookup(query_string, parameters) return key, value def __resolve_dict(self, json_data, query_tree): """Resolve a dictionary in the JSONTas resolver. Create a new dictionary. Iterate through all json_data items. Resolve all items and add to new dictionary. Return new dictionary. :param query_tree: Keep track of the current query_tree. I.e. the full, unresolved, JSON structure that is currently being resolved. :type query_tree: dict :param json_data: JSON dictionary to resolve. :type json_data: dict :return: Newly created dict with resolved values. :rtype: dict """ query_tree.update(**deepcopy(json_data)) new = json_data.__class__() for key, value in json_data.items(): # It's important that the 'resolve' call is before the '__resolve' call. # This will make sure that the lowest values in dictionary are resolved # first. # Example: {"$something": {"$somethingelse": "text"}} # In this case "$somethingelse" will be resolved before "$something". self.logger.debug("Resolve sub-elements.") value = self.resolve(value, query_tree[key]) self.logger.debug("Resolved value: %r", value) json_data[key] = value self.dataset.add("query_tree", query_tree[key]) key, new_value = self.__resolve(key, json_data) if key is None: new = new_value else: if new_value is None: new[key] = value else: new[key] = new_value return new def __resolve_list(self, json_data, query_tree): """Resolve a list in the JSONTas resolver. Create a new list. Iterate through all json_data items. Resolve all items and add to new list. Return new list. :param query_tree: Keep track of the current query_tree. I.e. the full, unresolved, JSON structure that is currently being resolved. :type query_tree: list :param json_data: JSON list to resolve. :type json_data: tuple, set or list :return: Newly created list with resolved values. :rtype: list """ return json_data.__class__(self.resolve(value, query_tree[index]) for index, value in enumerate(json_data))
[docs] def resolve(self, json_data, query_tree=None): """Resolve JSONTas queries. Takes a JSON structure and resolve all values against dataset. This is a recursive method. :param json_data: JSON data to iterate through and resolve. :type json_data: any :param query_tree: Used in recursion to keep track of query_tree. :type query_tree: any :return: New JSON structure with resolved values. :rtype: any """ if query_tree is None: query_tree = {} if isinstance(json_data, dict): self.logger.debug("Resolving dictionary %r.", json_data) new = self.__resolve_dict(json_data, query_tree) elif isinstance(json_data, (list, set, tuple)): self.logger.debug("Resolving list %r.", json_data) new = self.__resolve_list(json_data, query_tree) else: self.logger.debug("Resolving primitive %r.", json_data) key, new_value = self.__resolve(json_data) if new_value is None: return key return new_value return new
[docs] def run(self, json_data=None, json_file=None, copy=True): """Run JSONTas. This should be the main entry to JSONTas. :param json_data: JSON data to run JSONTas on. :type json_data: :obj:`OrderedDict` :param json_file: JSON file to run JSONTas on. :type json_data: file :return: Resolved JSON structure. :rtype: :obj:`OrderedDict` """ assert json_data is not None or json_file is not None, \ "Must supply either 'json_data' or 'json_file'" if json_file: self.logger.debug("Loading JSON file.") with open(json_file) as _file: json_data = json.load(_file, object_pairs_hook=OrderedDict) if copy: self.logger.debug("Deepcopy JSON.") json_data = deepcopy(json_data) assert isinstance(json_data, OrderedDict), "JSON data must be an OrderedDict" self.logger.debug("Adding JSON to dataset.") self.dataset.add("this", json_data) self.logger.debug("Starting resolver.") return self.resolve(json_data)