json_schema.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. """Functions that help us generate and use info.json files.
  2. """
  3. import json
  4. from collections.abc import Mapping
  5. from functools import lru_cache
  6. from pathlib import Path
  7. import hjson
  8. import jsonschema
  9. from milc import cli
  10. def _dict_raise_on_duplicates(ordered_pairs):
  11. """Reject duplicate keys."""
  12. d = {}
  13. for k, v in ordered_pairs:
  14. if k in d:
  15. raise ValueError("duplicate key: %r" % (k,))
  16. else:
  17. d[k] = v
  18. return d
  19. def json_load(json_file, strict=True):
  20. """Load a json file from disk.
  21. Note: file must be a Path object.
  22. """
  23. try:
  24. # Get the IO Stream for Path objects
  25. # Not necessary if the data is provided via stdin
  26. if isinstance(json_file, Path):
  27. json_file = json_file.open(encoding='utf-8')
  28. return hjson.load(json_file, object_pairs_hook=_dict_raise_on_duplicates if strict else None)
  29. except (json.decoder.JSONDecodeError, hjson.HjsonDecodeError) as e:
  30. cli.log.error('Invalid JSON encountered attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
  31. exit(1)
  32. except Exception as e:
  33. cli.log.error('Unknown error attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
  34. exit(1)
  35. @lru_cache(maxsize=0)
  36. def load_jsonschema(schema_name):
  37. """Read a jsonschema file from disk.
  38. """
  39. if Path(schema_name).exists():
  40. return json_load(schema_name)
  41. schema_path = Path(f'data/schemas/{schema_name}.jsonschema')
  42. if not schema_path.exists():
  43. schema_path = Path('data/schemas/false.jsonschema')
  44. return json_load(schema_path)
  45. @lru_cache(maxsize=0)
  46. def compile_schema_store():
  47. """Compile all our schemas into a schema store.
  48. """
  49. schema_store = {}
  50. for schema_file in Path('data/schemas').glob('*.jsonschema'):
  51. schema_data = load_jsonschema(schema_file)
  52. if not isinstance(schema_data, dict):
  53. cli.log.debug('Skipping schema file %s', schema_file)
  54. continue
  55. schema_store[schema_data['$id']] = schema_data
  56. return schema_store
  57. @lru_cache(maxsize=0)
  58. def create_validator(schema):
  59. """Creates a validator for the given schema id.
  60. """
  61. schema_store = compile_schema_store()
  62. resolver = jsonschema.RefResolver.from_schema(schema_store[schema], store=schema_store)
  63. return jsonschema.Draft202012Validator(schema_store[schema], resolver=resolver).validate
  64. def validate(data, schema):
  65. """Validates data against a schema.
  66. """
  67. validator = create_validator(schema)
  68. return validator(data)
  69. def deep_update(origdict, newdict):
  70. """Update a dictionary in place, recursing to do a depth-first deep copy.
  71. """
  72. for key, value in newdict.items():
  73. if isinstance(value, Mapping):
  74. origdict[key] = deep_update(origdict.get(key, {}), value)
  75. else:
  76. origdict[key] = value
  77. return origdict