Module: Legion::Extensions::Privatecore::Helpers::Boundary
- Defined in:
- lib/legion/extensions/privatecore/helpers/boundary.rb
Constant Summary collapse
- PROBE_PATTERNS =
[ /what (?:does|did) .+ tell you/i, /share .+ private/i, /reveal .+ secret/i, /bypass .+ boundary/i, /ignore .+ directive/i ].freeze
- REDACTION_MARKER =
'[REDACTED]'- MAX_AUDIT_LOG_SIZE =
1000- DEFAULT_ENABLED =
%i[email phone ssn ip].freeze
- DEFAULT_MODE =
:redact
Class Method Summary collapse
- .apply_ner(detections, text, service_url) ⇒ Object
- .contains_pii?(text, service_url: nil) ⇒ Boolean
- .detect_probe(text) ⇒ Object
- .determine_source(detections, ner_fallback) ⇒ Object
- .merge_detections(regex_detections, ner_detections) ⇒ Object
- .ner_enabled? ⇒ Boolean
- .persist_mapping_if_configured(mapping) ⇒ Object
- .resolve_setting(override, *keys) ⇒ Object
- .run_ner(text, service_url) ⇒ Object
- .strip_pii(text, mode: nil, service_url: nil) ⇒ Object
Class Method Details
.apply_ner(detections, text, service_url) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/legion/extensions/privatecore/helpers/boundary.rb', line 69 def apply_ner(detections, text, service_url) return false unless service_url || ner_enabled? ner_result = run_ner(text, service_url) if ner_result.is_a?(Hash) && ner_result[:fallback] ner_detections = ner_result[:detections] detections.replace(merge_detections(detections, ner_detections)) true else detections.replace(merge_detections(detections, ner_result)) false end end |
.contains_pii?(text, service_url: nil) ⇒ Boolean
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/legion/extensions/privatecore/helpers/boundary.rb', line 41 def contains_pii?(text, service_url: nil) return false unless text.is_a?(String) effective_enabled = resolve_setting(nil, :patterns, :enabled) || DEFAULT_ENABLED effective_validation = resolve_setting(nil, :patterns, :validation) || {} detections = Patterns.detect(text, enabled: effective_enabled, validation: effective_validation) return true unless detections.empty? if service_url || ner_enabled? ner_result = run_ner(text, service_url) ner_detections = if ner_result.is_a?(Hash) && ner_result[:fallback] ner_result[:detections] else ner_result end return true unless ner_detections.empty? end false end |
.detect_probe(text) ⇒ Object
63 64 65 66 67 |
# File 'lib/legion/extensions/privatecore/helpers/boundary.rb', line 63 def detect_probe(text) return false unless text.is_a?(String) PROBE_PATTERNS.any? { |p| p.match?(text) } end |
.determine_source(detections, ner_fallback) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/legion/extensions/privatecore/helpers/boundary.rb', line 83 def determine_source(detections, ner_fallback) has_ner = detections.any? { |d| d[:source] == :ner } has_regex = detections.any? { |d| d[:source] != :ner } if detections.empty? :none elsif ner_fallback :regex_fallback elsif has_ner && has_regex :ner_and_regex elsif has_ner :ner else :regex end end |
.merge_detections(regex_detections, ner_detections) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/legion/extensions/privatecore/helpers/boundary.rb', line 134 def merge_detections(regex_detections, ner_detections) return regex_detections if ner_detections.empty? return ner_detections if regex_detections.empty? all = regex_detections.map { |d| d.merge(source: :regex) } + ner_detections all.sort_by! { |d| [d[:start], -(d[:end] - d[:start])] } merged = [] all.each do |detection| if merged.empty? || detection[:start] >= merged.last[:end] merged << detection else prev = merged.last det_span = detection[:end] - detection[:start] prev_span = prev[:end] - prev[:start] merged[-1] = detection if det_span > prev_span end end merged end |
.ner_enabled? ⇒ Boolean
115 116 117 118 119 |
# File 'lib/legion/extensions/privatecore/helpers/boundary.rb', line 115 def ner_enabled? return false unless defined?(Legion::Settings) Legion::Settings.dig(:privatecore, :ner, :enabled) == true end |
.persist_mapping_if_configured(mapping) ⇒ Object
100 101 102 103 104 105 106 |
# File 'lib/legion/extensions/privatecore/helpers/boundary.rb', line 100 def persist_mapping_if_configured(mapping) return nil if mapping.empty? return nil unless resolve_setting(nil, :redaction, :cache_mappings) == true cache_ttl = resolve_setting(nil, :redaction, :cache_ttl) || 3600 Redactor.persist_mapping(mapping: mapping, key: nil, ttl: cache_ttl) end |
.resolve_setting(override, *keys) ⇒ Object
108 109 110 111 112 113 |
# File 'lib/legion/extensions/privatecore/helpers/boundary.rb', line 108 def resolve_setting(override, *keys) return override unless override.nil? return nil unless defined?(Legion::Settings) Legion::Settings.dig(:privatecore, *keys) end |
.run_ner(text, service_url) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/legion/extensions/privatecore/helpers/boundary.rb', line 121 def run_ner(text, service_url) url = service_url || resolve_setting(nil, :ner, :service_url) return [] unless url allow_http = resolve_setting(nil, :ner, :allow_http) == true return [] unless allow_http || url.start_with?('https://') timeout = resolve_setting(nil, :ner, :timeout) || 5 fallback = resolve_setting(nil, :ner, :fallback) || :transparent conn = NerClient.build_connection(service_url: url, timeout: timeout) NerClient.analyze(text: text, connection: conn, fallback: fallback, timeout: timeout) end |
.strip_pii(text, mode: nil, service_url: nil) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/legion/extensions/privatecore/helpers/boundary.rb', line 24 def strip_pii(text, mode: nil, service_url: nil) return { cleaned: text, mapping: {}, detections: [], source: :none } unless text.is_a?(String) effective_mode = resolve_setting(mode, :redaction, :mode) || DEFAULT_MODE effective_enabled = resolve_setting(nil, :patterns, :enabled) || DEFAULT_ENABLED effective_validation = resolve_setting(nil, :patterns, :validation) || {} detections = Patterns.detect(text, enabled: effective_enabled, validation: effective_validation) ner_fallback = apply_ner(detections, text, service_url) result = Redactor.redact(text, detections: detections, mode: effective_mode) source = determine_source(detections, ner_fallback) mapping_key = persist_mapping_if_configured(result[:mapping]) result.merge(source: source, mapping_key: mapping_key) end |