001package org.jpos.metrics.iso;
002
003import io.micrometer.core.instrument.*;
004import org.jpos.core.Configurable;
005import org.jpos.core.Configuration;
006import org.jpos.core.ConfigurationException;
007import org.jpos.core.Environment;
008import org.jpos.iso.ISOMsg;
009import org.jpos.metrics.MeterFactory;
010import org.jpos.metrics.MeterInfo;
011import org.jpos.util.LogEvent;
012import org.jpos.util.LogSource;
013import org.jpos.util.Logger;
014
015import java.util.*;
016import java.util.concurrent.ConcurrentHashMap;
017import java.util.function.Function;
018import java.util.stream.Collectors;
019import java.util.stream.Stream;
020
021/** Counts ISO messages using Micrometer meters and tags resolved from message fields. */
022public class ISOMsgCounter implements ISOMsgMetrics, LogSource, Configurable {
023    private final static Map<String, Function<ISOMsg,String>> aliases= Map.of(
024            "mti",    (m) -> m.getString(0),
025            "rc",     (m) -> m.getString(39),
026            "scheme", (m) -> m.getString("113.66"),             // jPOS-CMF field
027            "isemv",  (m) -> Boolean.toString(m.hasField(55)),
028            "ttype",  ISOMsgCounter::getTtype,
029            "itc",    ISOMsgCounter::getITC
030    );
031
032    private MeterRegistry registry;
033
034    // Store my meters for safe removal later on. Meters are uniquely identified by their Meter.Id.
035    private final Set<Meter> meters = ConcurrentHashMap.newKeySet();
036    private boolean frozen = false;
037
038    // custom properties
039    private String metricName;
040    private String metricDescription;
041    private Tags tags = Tags.empty();
042    private final Map<String,String> fieldSet = new HashMap<>();
043
044    private Logger logger;
045    private String realm;
046
047    /** Default constructor.
048     * @throws ConfigurationException if configuration fails
049     */
050    public ISOMsgCounter() throws ConfigurationException {
051        // Configure initial **default global** tags and fieldsets from env vars.
052
053        // Custom tags are added to the Meter.
054        // Syntax: comma/space separated entries of the form "tag:value" or just "tag" .
055        var envTags = Environment.get("${"+ENV_CHANNEL_TAGS+"}", DEFAULT_TAGS);
056        var tagMap = parseTagPairs(envTags, false);
057        tagMap.forEach((k,v) -> tags = tags.and(k,v));
058
059        // Fieldsets are tags and values taken from the ISOMsg
060        // Syntax: comma/space separated entries of the form "alias, tag:alias, tag:isofield".
061        var envFields = Environment.get("${"+ENV_CHANNEL_FIELDS+"}", DEFAULT_CHANNEL_FIELDS);
062        var fieldsMap = parseTagPairs(envFields, true);
063        validateFieldSetMap(fieldsMap);
064        fieldSet.putAll(fieldsMap);
065    }
066
067
068    /**
069     * Returns the metric meters.
070     * @return This overrides the default implementation, also including the keys from the internal field set.
071     */
072    @Override
073    public String getMetricSignature() {
074        List<String> keys = new ArrayList<>(fieldSet.size()*2);
075        tags.forEach(t -> keys.add(t.getKey()));
076        fieldSet.forEach((k,_) -> keys.add(k));
077        return getMetricName()+"|"+
078                keys.stream().sorted().collect(Collectors.joining(","));
079    }
080
081
082    @Override
083    public String getMetricName() {
084        return metricName != null ? metricName : DEFAULT_CHANNEL_METRIC_NAME;
085    }
086    @Override
087    public void setMetricName(String metricName) {
088        throwIfFrozen(true);
089        Objects.requireNonNull(metricName, "Metric name can't be null");
090        this.metricName = metricName;
091    }
092
093
094    public String getMetricDescription() {
095        return metricDescription != null ? metricDescription : "";
096    }
097    public void setMetricDescription(String metricDescription) {
098        this.metricDescription = metricDescription;
099    }
100
101    @Override
102    public Tags addTags(Tags tags) {
103        throwIfFrozen(true);
104        if (!meters.isEmpty()) {
105            String name = tags.stream()
106                              .filter(t->"name".equals(t.getKey()))
107                              .map(Tag::getValue).findAny()
108                              .orElse(getMetricName());
109            throw new IllegalStateException("ISOMsgCounter "+name+" can't add tags after started");
110        }
111        return (this.tags= this.tags.and(tags));
112    }
113    public Tags getTags() {
114        return Tags.of(tags);
115    }
116
117    @Override
118    public void recordMessage(ISOMsg m) {
119        if (registry != null && m != null)  {
120            throwIfFrozen(false);
121            Tags ft = resolveFieldTags(m, fieldSet);
122            Counter c = MeterFactory.updateCounter(registry,
123                        getMetricName(),
124                        tags.and(ft),
125                        getMetricDescription());
126            meters.add(c);
127            c.increment();
128        }
129    }
130
131    @Override
132    public void recordMessage(ISOMsg m, MeterInfo meterInfo) {
133        if (registry != null && m != null)  {
134            throwIfFrozen(false);
135            Tags ft = resolveFieldTags(m, fieldSet);
136            String myName = getMetricName();
137
138            Counter c;
139            if (!DEFAULT_CHANNEL_METRIC_NAME.equals(myName))
140                c = MeterFactory.updateCounter(registry,
141                        getMetricName(),
142                        meterInfo.add(tags).and(ft),    // allow our tags to override meterInfo's
143                        getMetricDescription());
144            else
145                c = MeterFactory.updateCounter(registry, meterInfo, tags.and(ft));
146            meters.add(c);
147            c.increment();
148        }
149    }
150
151
152    @Override
153    public boolean register(MeterRegistry registry) {
154        Objects.requireNonNull(registry, "Null registry passed to register() method.");
155        this.registry = registry;
156        frozen = true;
157        return true;
158    }
159
160    @Override
161    public void unregister() {
162        removeMeters();
163        registry = null;
164    }
165
166
167    @Override
168    public MeterRegistry getRegistry() {
169        return registry;
170    }
171
172
173    @Override
174    public void removeMeters()  {
175        if (registry != null) {
176            LogEvent evt = logger != null ? new LogEvent(this, "info", "Removing meters: ") : null;
177
178            // flag will make new recordMessage calls fail, in a NON-thread safe way
179            // but this is normally called after the channel is being stopped anyway
180            frozen = false;
181            meters.forEach(m -> {
182                if (evt != null) evt.addMessage(m.getId());
183                registry.remove(m);
184            });
185            meters.clear();
186            if (evt != null)
187                Logger.log(evt);
188        }
189    }
190
191    // ============= configuration =============
192
193    @Override
194    public void setConfiguration(Configuration cfg) throws ConfigurationException {
195        String name = cfg.get("name", null);
196         if (name != null)
197            setMetricName(name);
198        boolean customName = !DEFAULT_CHANNEL_METRIC_NAME.equals(getMetricName());
199
200        setMetricDescription(cfg.get("description", null));
201
202        // Process custom tag overrides (global defaults were handled in constructor).
203        // Custom config overrides can only override the values of the pre-existing global env tags.
204        // New tags can't be added, global tags can't be removed.
205        //
206        // Exception: If this class has a custom metric name, then it can define its own tag set.
207        boolean hasTags = cfg.get("tags", null) != null;
208        if (customName && hasTags)
209            tags = Tags.empty();                                                // start afresh if custom metric has tags
210
211        var currTags = getTagsAsMap();
212        var ovrMap = parseTagPairs(cfg.get("tags", ""), false);
213        for (var ent : ovrMap.entrySet()) {
214            if (currTags.containsKey(ent.getKey()) || customName)               // if known tag, or custom name
215                currTags.put(ent.getKey(), ent.getValue());                     // then allow override!
216            else
217                throw new ConfigurationException("Attempt to add unknown metric tag: '"+ent.getKey()+"'");
218        }
219        currTags.forEach((k,v) -> tags = tags.and(k,v));                        // add/override all custom tags to our tags
220
221
222        // Process custom isofield overrides (global defaults were handled in constructor).
223        // Custom overrides can only override pre-existing env tags, unless this class has a custom metric name.
224        boolean hasFields = cfg.get("fields", null) != null;
225        if (customName && hasFields)
226            fieldSet.clear();                                                   // start afresh if custom metric has fields
227
228        var fieldsOvrMap = parseTagPairs(cfg.get("fields", ""), true);
229        validateFieldSetMap(fieldsOvrMap);
230        for (var ent : fieldsOvrMap.entrySet()) {
231            if (fieldSet.containsKey(ent.getKey()) || customName)               // known tag, or custom metric
232                fieldSet.put(ent.getKey(), ent.getValue());                     // allow, override!
233            else
234                throw new ConfigurationException(
235                    "Attempt to add unknown metric isofield tag: '"+ent.getKey()+"'");
236        }
237    }
238
239
240
241    // copySingleTag: copy the tag name as value if no colon syntax; else, set value as empty string
242    /** Parses tag-value pairs from a string.
243     * @param tp            tag-pair string to parse
244     * @param copySingleTag if true, copy single tags
245     * @return the parsed tag pairs
246     */
247    protected Map<String,String> parseTagPairs(String tp, boolean copySingleTag) {
248        Map<String,String> ret = new HashMap<>();
249        String[] tagPairs = tp.trim().split("[, ]+");
250        for (String pair : tagPairs) {
251            if (pair.isEmpty()) continue;               // avoids possible commas at beginning of tp
252
253            String[] tv = pair.trim().split(":");
254            ret.put(tv[0],
255                    tv.length >= 2 ? tv[1] :
256                        copySingleTag ? tv[0] : "");
257        }
258        return ret;
259    }
260
261    /** Validates the field set map.
262     * @param fieldsMap the field set map to validate
263     * @throws ConfigurationException if validation fails
264     */
265    protected void validateFieldSetMap(Map<String,String> fieldsMap) throws ConfigurationException {
266        for (var valexpr : fieldsMap.values()) {
267            boolean isField = valexpr.matches("^[0-9]+(\\.[0-9]+)*$");          // is isomsg field path dot-syntax?
268            if (!isField && aliases.get(valexpr) == null)
269                throw new ConfigurationException("Unknown metric tag alias for fieldset: '"+valexpr+"'");
270        }
271    }
272
273
274    // ============= some helper methods =============
275
276    private void throwIfFrozen(boolean frozenCondition) {
277        if (frozen == frozenCondition)
278            throw new IllegalStateException(frozen ?
279                        "Can't modify this ISOMsgCounter after registration ("+getMetricSignature()+")" :
280                        "Can't use this ISOMsgCounter before registration ("+getMetricSignature()+")"
281            );
282    }
283
284
285    /**
286     * Returns a clone set, which may not be up to date next time you use it.
287     *
288     * @return a snapshot of the live meter set
289     */
290    protected Set<Meter> getMeters() {
291        return new HashSet<>(meters);
292    }
293
294    // useful for tests and debugging
295    /** Returns the configured field set map.
296     * @return the field set map
297     */
298    protected Map<String,String> getFieldSet() {
299        return Map.copyOf(fieldSet);
300    }
301
302
303    // Make Tags easy to navigate; do not abuse of this one unless you need repeated querying
304    /** Returns the metric tags as a map.
305     * @return the tags map
306     */
307    public Map<String,String> getTagsAsMap() {
308        Map<String,String> tm= new HashMap<>();                             // make Tags easy to navigate
309        getTags().forEach(t->tm.put(t.getKey(),t.getValue()));
310        return tm;
311    }
312
313    private static String getTtype(ISOMsg m) {
314        return m.hasField(3) ? m.getString(3).substring(0,2) : "";
315    }
316
317    private static String getITC(ISOMsg m) {
318        String mti = m.getString(0);
319        if (mti == null || mti.trim().isEmpty()) return "";
320        // some common fields to make an ITC from
321        return Stream.of(mti, getTtype(m), m.getString(24), m.getString(25), m.getString(70))
322                .filter(s -> s != null && !s.isEmpty())
323                .collect(Collectors.joining("."));
324    }
325
326
327    /**
328    * Hook for subclasses to resolve, against an ISOMsg, the valexpr part of a tag:valexpr in a fieldset.
329    * <br>
330    * A subclass may add or override its own aliases, or have a special way to convert "valexpr"
331    * to a String taken from the given ISOMsg.
332    * <br>
333    * If the subclass can't resolve the alias/valexpr, it may call super (i.e. this method)
334    * as a fallback.
335    *
336    * @param m the ISOMsg whose fields are inspected
337    * @param val the valexpr (alias name or field path) to resolve
338    * @return the resolved value, or an empty string when no value is available
339    */
340    protected String resolveValExpr(ISOMsg m, String val) {
341        var fun = aliases.get(val);   // check if valexpr is a registered alias
342        val = fun != null ? fun.apply(m) : m.getString(val);
343        return val != null ? val : "";
344    }
345
346    /**
347     * Returns a micrometer {@link Tags}, with keys and values resolved from a given fieldset against
348     * a given ISOMsg.<br/>
349     * Some of the valexprs in the fieldset may be aliases that need to be resolved to an ISOMsg field,
350     * and the field path is used to get the value from the ISOMsg.<br/>
351     * This method relies on the protected {@link #resolveValExpr(ISOMsg, String)} to resolve each valexpr.
352     *
353     * @param m the ISOMsg
354     * @param fieldset field-to-tag mapping to resolve against the message
355     * @return a micrometer {@link Tags} with all the tags from fieldset and the resolved values from the message
356     */
357    private Tags resolveFieldTags(ISOMsg m, Map<String,String> fieldset) {
358        Tags tt = Tags.empty();
359        // each entry is {tag,valexpr}, where valexpr may be an alias or an isofield path
360        for (var ent : fieldset.entrySet()) {
361            String val = resolveValExpr(m, ent.getValue());
362            tt = tt.and(ent.getKey(), val);
363        }
364        return tt;
365    }
366
367
368    @Override
369    public void setLogger(Logger logger, String realm) {
370        this.logger = logger;
371        this.realm = realm;
372    }
373    @Override
374    public String getRealm() {
375        return realm;
376    }
377    @Override
378    public Logger getLogger() {
379        return logger;
380    }
381}