001package org.jpos.metrics.iso; 002 003import io.micrometer.core.instrument.*; 004import org.jpos.core.Configurable; 005import org.jpos.core.Configuration; 006import org.jpos.core.ConfigurationException; 007import org.jpos.core.Environment; 008import org.jpos.iso.ISOMsg; 009import org.jpos.metrics.MeterFactory; 010import org.jpos.metrics.MeterInfo; 011import org.jpos.util.LogEvent; 012import org.jpos.util.LogSource; 013import org.jpos.util.Logger; 014 015import java.util.*; 016import java.util.concurrent.ConcurrentHashMap; 017import java.util.function.Function; 018import java.util.stream.Collectors; 019import java.util.stream.Stream; 020 021public class ISOMsgCounter implements ISOMsgMetrics, LogSource, Configurable { 022 private final static Map<String, Function<ISOMsg,String>> aliases= Map.of( 023 "mti", (m) -> m.getString(0), 024 "rc", (m) -> m.getString(39), 025 "scheme", (m) -> m.getString("113.66"), // jPOS-CMF field 026 "isemv", (m) -> Boolean.toString(m.hasField(55)), 027 "ttype", ISOMsgCounter::getTtype, 028 "itc", ISOMsgCounter::getITC 029 ); 030 031 private MeterRegistry registry; 032 033 // Store my meters for safe removal later on. Meters are uniquely identified by their Meter.Id. 034 private final Set<Meter> meters = ConcurrentHashMap.newKeySet(); 035 private boolean frozen = false; 036 037 // custom properties 038 private String metricName; 039 private String metricDescription; 040 private Tags tags = Tags.empty(); 041 private final Map<String,String> fieldSet = new HashMap<>(); 042 043 private Logger logger; 044 private String realm; 045 046 public ISOMsgCounter() throws ConfigurationException { 047 // Configure initial **default global** tags and fieldsets from env vars. 048 049 // Custom tags are added to the Meter. 050 // Syntax: comma/space separated entries of the form "tag:value" or just "tag" . 051 var envTags = Environment.get("${"+ENV_CHANNEL_TAGS+"}", DEFAULT_TAGS); 052 var tagMap = parseTagPairs(envTags, false); 053 tagMap.forEach((k,v) -> tags = tags.and(k,v)); 054 055 // Fieldsets are tags and values taken from the ISOMsg 056 // Syntax: comma/space separated entries of the form "alias, tag:alias, tag:isofield". 057 var envFields = Environment.get("${"+ENV_CHANNEL_FIELDS+"}", DEFAULT_CHANNEL_FIELDS); 058 var fieldsMap = parseTagPairs(envFields, true); 059 validateFieldSetMap(fieldsMap); 060 fieldSet.putAll(fieldsMap); 061 } 062 063 064 /** 065 * @return This overrides the default implementation, also including the keys from the internal field set. 066 */ 067 @Override 068 public String getMetricSignature() { 069 List<String> keys = new ArrayList<>(fieldSet.size()*2); 070 tags.forEach(t -> keys.add(t.getKey())); 071 fieldSet.forEach((k,_) -> keys.add(k)); 072 return getMetricName()+"|"+ 073 keys.stream().sorted().collect(Collectors.joining(",")); 074 } 075 076 077 @Override 078 public String getMetricName() { 079 return metricName != null ? metricName : DEFAULT_CHANNEL_METRIC_NAME; 080 } 081 @Override 082 public void setMetricName(String metricName) { 083 throwIfFrozen(true); 084 Objects.requireNonNull(metricName, "Metric name can't be null"); 085 this.metricName = metricName; 086 } 087 088 089 public String getMetricDescription() { 090 return metricDescription != null ? metricDescription : ""; 091 } 092 public void setMetricDescription(String metricDescription) { 093 this.metricDescription = metricDescription; 094 } 095 096 @Override 097 public Tags addTags(Tags tags) { 098 throwIfFrozen(true); 099 if (!meters.isEmpty()) { 100 String name = tags.stream() 101 .filter(t->"name".equals(t.getKey())) 102 .map(Tag::getValue).findAny() 103 .orElse(getMetricName()); 104 throw new IllegalStateException("ISOMsgCounter "+name+" can't add tags after started"); 105 } 106 return (this.tags= this.tags.and(tags)); 107 } 108 public Tags getTags() { 109 return Tags.of(tags); 110 } 111 112 @Override 113 public void recordMessage(ISOMsg m) { 114 if (registry != null && m != null) { 115 throwIfFrozen(false); 116 Tags ft = resolveFieldTags(m, fieldSet); 117 Counter c = MeterFactory.updateCounter(registry, 118 getMetricName(), 119 tags.and(ft), 120 getMetricDescription()); 121 meters.add(c); 122 c.increment(); 123 } 124 } 125 126 @Override 127 public void recordMessage(ISOMsg m, MeterInfo meterInfo) { 128 if (registry != null && m != null) { 129 throwIfFrozen(false); 130 Tags ft = resolveFieldTags(m, fieldSet); 131 String myName = getMetricName(); 132 133 Counter c; 134 if (!DEFAULT_CHANNEL_METRIC_NAME.equals(myName)) 135 c = MeterFactory.updateCounter(registry, 136 getMetricName(), 137 meterInfo.add(tags).and(ft), // allow our tags to override meterInfo's 138 getMetricDescription()); 139 else 140 c = MeterFactory.updateCounter(registry, meterInfo, tags.and(ft)); 141 meters.add(c); 142 c.increment(); 143 } 144 } 145 146 147 @Override 148 public boolean register(MeterRegistry registry) { 149 Objects.requireNonNull(registry, "Null registry passed to register() method."); 150 this.registry = registry; 151 frozen = true; 152 return true; 153 } 154 155 @Override 156 public void unregister() { 157 removeMeters(); 158 registry = null; 159 } 160 161 162 @Override 163 public MeterRegistry getRegistry() { 164 return registry; 165 } 166 167 168 @Override 169 public void removeMeters() { 170 if (registry != null) { 171 LogEvent evt = logger != null ? new LogEvent(this, "info", "Removing meters: ") : null; 172 173 // flag will make new recordMessage calls fail, in a NON-thread safe way 174 // but this is normally called after the channel is being stopped anyway 175 frozen = false; 176 meters.forEach(m -> { 177 if (evt != null) evt.addMessage(m.getId()); 178 registry.remove(m); 179 }); 180 meters.clear(); 181 if (evt != null) 182 Logger.log(evt); 183 } 184 } 185 186 // ============= configuration ============= 187 188 @Override 189 public void setConfiguration(Configuration cfg) throws ConfigurationException { 190 String name = cfg.get("name", null); 191 if (name != null) 192 setMetricName(name); 193 boolean customName = !DEFAULT_CHANNEL_METRIC_NAME.equals(getMetricName()); 194 195 setMetricDescription(cfg.get("description", null)); 196 197 // Process custom tag overrides (global defaults were handled in constructor). 198 // Custom config overrides can only override the values of the pre-existing global env tags. 199 // New tags can't be added, global tags can't be removed. 200 // 201 // Exception: If this class has a custom metric name, then it can define its own tag set. 202 boolean hasTags = cfg.get("tags", null) != null; 203 if (customName && hasTags) 204 tags = Tags.empty(); // start afresh if custom metric has tags 205 206 var currTags = getTagsAsMap(); 207 var ovrMap = parseTagPairs(cfg.get("tags", ""), false); 208 for (var ent : ovrMap.entrySet()) { 209 if (currTags.containsKey(ent.getKey()) || customName) // if known tag, or custom name 210 currTags.put(ent.getKey(), ent.getValue()); // then allow override! 211 else 212 throw new ConfigurationException("Attempt to add unknown metric tag: '"+ent.getKey()+"'"); 213 } 214 currTags.forEach((k,v) -> tags = tags.and(k,v)); // add/override all custom tags to our tags 215 216 217 // Process custom isofield overrides (global defaults were handled in constructor). 218 // Custom overrides can only override pre-existing env tags, unless this class has a custom metric name. 219 boolean hasFields = cfg.get("fields", null) != null; 220 if (customName && hasFields) 221 fieldSet.clear(); // start afresh if custom metric has fields 222 223 var fieldsOvrMap = parseTagPairs(cfg.get("fields", ""), true); 224 validateFieldSetMap(fieldsOvrMap); 225 for (var ent : fieldsOvrMap.entrySet()) { 226 if (fieldSet.containsKey(ent.getKey()) || customName) // known tag, or custom metric 227 fieldSet.put(ent.getKey(), ent.getValue()); // allow, override! 228 else 229 throw new ConfigurationException( 230 "Attempt to add unknown metric isofield tag: '"+ent.getKey()+"'"); 231 } 232 } 233 234 235 236 // copySingleTag: copy the tag name as value if no colon syntax; else, set value as empty string 237 protected Map<String,String> parseTagPairs(String tp, boolean copySingleTag) { 238 Map<String,String> ret = new HashMap<>(); 239 String[] tagPairs = tp.trim().split("[, ]+"); 240 for (String pair : tagPairs) { 241 if (pair.isEmpty()) continue; // avoids possible commas at beginning of tp 242 243 String[] tv = pair.trim().split(":"); 244 ret.put(tv[0], 245 tv.length >= 2 ? tv[1] : 246 copySingleTag ? tv[0] : ""); 247 } 248 return ret; 249 } 250 251 protected void validateFieldSetMap(Map<String,String> fieldsMap) throws ConfigurationException { 252 for (var valexpr : fieldsMap.values()) { 253 boolean isField = valexpr.matches("^[0-9]+(\\.[0-9]+)*$"); // is isomsg field path dot-syntax? 254 if (!isField && aliases.get(valexpr) == null) 255 throw new ConfigurationException("Unknown metric tag alias for fieldset: '"+valexpr+"'"); 256 } 257 } 258 259 260 // ============= some helper methods ============= 261 262 private void throwIfFrozen(boolean frozenCondition) { 263 if (frozen == frozenCondition) 264 throw new IllegalStateException(frozen ? 265 "Can't modify this ISOMsgCounter after registration ("+getMetricSignature()+")" : 266 "Can't use this ISOMsgCounter before registration ("+getMetricSignature()+")" 267 ); 268 } 269 270 271 /** returns a clone set, which may not be up to date next time you use it */ 272 protected Set<Meter> getMeters() { 273 return new HashSet<>(meters); 274 } 275 276 // useful for tests and debugging 277 protected Map<String,String> getFieldSet() { 278 return Map.copyOf(fieldSet); 279 } 280 281 282 // Make Tags easy to navigate; do not abuse of this one unless you need repeated querying 283 public Map<String,String> getTagsAsMap() { 284 Map<String,String> tm= new HashMap<>(); // make Tags easy to navigate 285 getTags().forEach(t->tm.put(t.getKey(),t.getValue())); 286 return tm; 287 } 288 289 private static String getTtype(ISOMsg m) { 290 return m.hasField(3) ? m.getString(3).substring(0,2) : ""; 291 } 292 293 private static String getITC(ISOMsg m) { 294 String mti = m.getString(0); 295 if (mti == null || mti.trim().isEmpty()) return ""; 296 // some common fields to make an ITC from 297 return Stream.of(mti, getTtype(m), m.getString(24), m.getString(25), m.getString(70)) 298 .filter(s -> s != null && !s.isEmpty()) 299 .collect(Collectors.joining(".")); 300 } 301 302 303 /** 304 * Hook for subclasses to resolve, against an ISOMsg, the valexpr part of a tag:valexpr in a fieldset. 305 * <br/> 306 * A subclass may add or override its own aliases, or have a special way to convert "valexpr" 307 * to a String taken from the given ISOMsg. 308 * <br/> 309 * If the subclass can't resolve the alias/valexpr, it may call super (i.e. this method) 310 * as a fallback. 311 */ 312 protected String resolveValExpr(ISOMsg m, String val) { 313 var fun = aliases.get(val); // check if valexpr is a registered alias 314 val = fun != null ? fun.apply(m) : m.getString(val); 315 return val != null ? val : ""; 316 } 317 318 /** 319 * Returns a micrometer {@link Tags}, with keys and values resolved from a given fieldset against 320 * a given ISOMsg.<br/> 321 * Some of the valexprs in the fieldset may be aliases that need to be resolved to an ISOMsg field, 322 * and the field path is used to get the value from the ISOMsg.<br/> 323 * This method relies on the protected {@link #resolveValExpr(ISOMsg, String)} to resolve each valexpr. 324 * 325 * @param m the ISOMsg 326 * @return a micrometer {@link Tags} with all the tags from fieldset and the resolved values from the message 327 */ 328 private Tags resolveFieldTags(ISOMsg m, Map<String,String> fieldset) { 329 Tags tt = Tags.empty(); 330 // each entry is {tag,valexpr}, where valexpr may be an alias or an isofield path 331 for (var ent : fieldset.entrySet()) { 332 String val = resolveValExpr(m, ent.getValue()); 333 tt = tt.and(ent.getKey(), val); 334 } 335 return tt; 336 } 337 338 339 @Override 340 public void setLogger(Logger logger, String realm) { 341 this.logger = logger; 342 this.realm = realm; 343 } 344 @Override 345 public String getRealm() { 346 return realm; 347 } 348 @Override 349 public Logger getLogger() { 350 return logger; 351 } 352}