diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcMetadataKeys.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcMetadataKeys.java new file mode 100644 index 00000000000..e88ca207f32 --- /dev/null +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcMetadataKeys.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.api.messages; + +import org.apache.plc4x.java.api.metadata.Metadata.Key; +import org.apache.plc4x.java.api.metadata.time.TimeSource; + +/** + * High level definition of common metadata keys which can occur across multiple drivers. + */ +public interface PlcMetadataKeys { + + Key TIMESTAMP = Key.of("timestamp", Long.class); + Key TIMESTAMP_SOURCE = Key.of("timestamp_source", TimeSource.class); + + Key RECEIVE_TIMESTAMP = Key.of("receive_timestamp", Long.class); + +} diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcTagResponse.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcTagResponse.java index 3aa2d642deb..1feae9f3bbb 100644 --- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcTagResponse.java +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcTagResponse.java @@ -18,6 +18,7 @@ */ package org.apache.plc4x.java.api.messages; +import org.apache.plc4x.java.api.metadata.Metadata; import org.apache.plc4x.java.api.model.PlcTag; import org.apache.plc4x.java.api.types.PlcResponseCode; @@ -38,4 +39,9 @@ public interface PlcTagResponse extends PlcResponse { PlcResponseCode getResponseCode(String name); + /** + * Returns tag level metadata information. + */ + Metadata getTagMetadata(String name); + } diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/metadata/Metadata.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/metadata/Metadata.java new file mode 100644 index 00000000000..d0a87d52469 --- /dev/null +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/metadata/Metadata.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.api.metadata; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Metadata { + + public static class Key { + + private final String key; + private final Class type; + + protected Key(String key, Class type) { + this.key = key; + this.type = type; + } + + public String getKey() { + return key; + } + + public boolean validate(Object value) { + return type.isInstance(value); + } + + public static Key of(String key, Class type) { + return new Key<>(key, type); + } + + } + + public final static Metadata EMPTY = new Metadata(Collections.emptyMap()); + + private final Metadata parent; + private final Map, Object> values; + + Metadata(Map, Object> values) { + this(values, EMPTY); + } + + public Metadata(Map, Object> values, Metadata parent) { + this.parent = parent; + this.values = values; + } + + public Set> keys() { + Set> keys = new LinkedHashSet<>(values.keySet()); + keys.addAll(parent.keys()); + return Collections.unmodifiableSet(keys); + } + + public Object getValue(Key key) { + Object value = values.get(key); + if (value == null) { + return parent.getValue(key); + } + return value; + } + + public static class Builder { + private final Logger logger = LoggerFactory.getLogger(Builder.class); + + private final Map, Object> values = new HashMap<>(); + private final Metadata parent; + + public Builder() { + this(Metadata.EMPTY); + } + + public Builder(Metadata parent) { + this.parent = parent; + } + + public Builder put(Key key, T value) { + if (!key.validate(value)) { + logger.debug("Ignore metadata value {}, it does not match constraints imposed by key {}", value, key); + return this; + } + + values.put(key, value); + return this; + } + + public Metadata build() { + return new Metadata(values, parent); + } + } + +} diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/metadata/time/TimeSource.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/metadata/time/TimeSource.java new file mode 100644 index 00000000000..84e5843635d --- /dev/null +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/metadata/time/TimeSource.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.api.metadata.time; + +public enum TimeSource { + + // Time information is assumed by PLC4X itself + ASSUMPTION, + // Time comes from software layer, kernel driver and similar + SOFTWARE, + // Time can is confronted through hardware i.e. microcontroller + HARDWARE, + // Other source of time which fall into separate truthiness category + OTHER + +} diff --git a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java index 7e70c78a2f0..db60c034c36 100644 --- a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java +++ b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java @@ -33,6 +33,8 @@ import org.apache.plc4x.java.api.exceptions.PlcInvalidTagException; import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; import org.apache.plc4x.java.api.messages.*; +import org.apache.plc4x.java.api.metadata.Metadata; +import org.apache.plc4x.java.api.metadata.time.TimeSource; import org.apache.plc4x.java.api.model.*; import org.apache.plc4x.java.api.types.PlcResponseCode; import org.apache.plc4x.java.api.types.PlcSubscriptionType; @@ -1447,9 +1449,16 @@ protected void decode(ConversationContext context, AmsTCPPacket ms if (msg.getUserdata() instanceof AdsDeviceNotificationRequest) { AdsDeviceNotificationRequest notificationData = (AdsDeviceNotificationRequest) msg.getUserdata(); List stamps = notificationData.getAdsStampHeaders(); + long receiveTs = System.currentTimeMillis(); for (AdsStampHeader stamp : stamps) { // convert Windows FILETIME format to unix epoch long unixEpochTimestamp = stamp.getTimestamp().divide(BigInteger.valueOf(10000L)).longValue() - 11644473600000L; + // result metadata + Metadata eventMetadata = new Metadata.Builder() + .put(PlcMetadataKeys.RECEIVE_TIMESTAMP, receiveTs) + .put(PlcMetadataKeys.TIMESTAMP, unixEpochTimestamp) + .put(PlcMetadataKeys.TIMESTAMP_SOURCE, TimeSource.SOFTWARE) + .build(); List samples = stamp.getAdsNotificationSamples(); for (AdsNotificationSample sample : samples) { long handle = sample.getNotificationHandle(); @@ -1457,10 +1466,12 @@ protected void decode(ConversationContext context, AmsTCPPacket ms for (PlcSubscriptionHandle subscriptionHandle : registration.getSubscriptionHandles()) { if (subscriptionHandle instanceof AdsSubscriptionHandle) { AdsSubscriptionHandle adsHandle = (AdsSubscriptionHandle) subscriptionHandle; - if (adsHandle.getNotificationHandle() == handle) - consumers.get(registration).accept( - new DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(unixEpochTimestamp), - convertSampleToPlc4XResult(adsHandle, sample.getData()))); + if (adsHandle.getNotificationHandle() == handle) { + Map metadata = new HashMap<>(); + Instant timestamp = Instant.ofEpochMilli(unixEpochTimestamp); + DefaultPlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(timestamp, convertSampleToPlc4XResult(adsHandle, sample.getData(), metadata, eventMetadata)); + consumers.get(registration).accept(event); + } } } } @@ -1469,12 +1480,14 @@ protected void decode(ConversationContext context, AmsTCPPacket ms } } - private Map> convertSampleToPlc4XResult(AdsSubscriptionHandle subscriptionHandle, byte[] data) throws + private Map> convertSampleToPlc4XResult(AdsSubscriptionHandle subscriptionHandle, byte[] data, + Map tagMetadata, Metadata metadata) throws ParseException { Map> values = new HashMap<>(); ReadBufferByteBased readBuffer = new ReadBufferByteBased(data, ByteOrder.LITTLE_ENDIAN); values.put(subscriptionHandle.getTagName(), new ResponseItem<>(PlcResponseCode.OK, DataItem.staticParse(readBuffer, getPlcValueTypeForAdsDataType(subscriptionHandle.getAdsDataType()), data.length))); + tagMetadata.put(subscriptionHandle.getTagName(), new Metadata.Builder(metadata).build()); return values; } diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/OpcMetadataKeys.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/OpcMetadataKeys.java new file mode 100644 index 00000000000..713081159ca --- /dev/null +++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/OpcMetadataKeys.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.opcua; + +import org.apache.plc4x.java.api.metadata.Metadata; +import org.apache.plc4x.java.api.metadata.Metadata.Key; +import org.apache.plc4x.java.opcua.tag.OpcuaQualityStatus; + +/** + * OPC UA level metadata keys. + */ +public interface OpcMetadataKeys { + + Key QUALITY = Metadata.Key.of("opcua_quality", OpcuaQualityStatus.class); + + Key SERVER_TIMESTAMP = Metadata.Key.of("opcua_server_timestamp", Long.class); + Key SOURCE_TIMESTAMP = Metadata.Key.of("opcua_source_timestamp", Long.class); + +} diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java index b20a92f50f2..2cce869e66b 100644 --- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java +++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java @@ -26,16 +26,20 @@ import org.apache.plc4x.java.api.exceptions.PlcConnectionException; import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; import org.apache.plc4x.java.api.messages.*; +import org.apache.plc4x.java.api.metadata.Metadata; +import org.apache.plc4x.java.api.metadata.time.TimeSource; import org.apache.plc4x.java.api.model.PlcConsumerRegistration; import org.apache.plc4x.java.api.model.PlcSubscriptionHandle; import org.apache.plc4x.java.api.types.PlcResponseCode; import org.apache.plc4x.java.api.types.PlcValueType; import org.apache.plc4x.java.api.value.PlcValue; +import org.apache.plc4x.java.opcua.OpcMetadataKeys; import org.apache.plc4x.java.opcua.config.OpcuaConfiguration; import org.apache.plc4x.java.opcua.context.Conversation; import org.apache.plc4x.java.opcua.context.OpcuaDriverContext; import org.apache.plc4x.java.opcua.context.SecureChannel; import org.apache.plc4x.java.opcua.readwrite.*; +import org.apache.plc4x.java.opcua.tag.OpcuaQualityStatus; import org.apache.plc4x.java.opcua.tag.OpcuaTag; import org.apache.plc4x.java.spi.ConversationContext; import org.apache.plc4x.java.spi.Plc4xProtocolBase; @@ -196,6 +200,10 @@ private SecureChannel createSecureChannel(ConversationContext context, public CompletableFuture read(PlcReadRequest readRequest) { LOGGER.trace("Reading Value"); + Metadata responseMetadata = new Metadata.Builder() + .put(PlcMetadataKeys.RECEIVE_TIMESTAMP, System.currentTimeMillis()) + .build(); + DefaultPlcReadRequest request = (DefaultPlcReadRequest) readRequest; RequestHeader requestHeader = conversation.createRequestHeader(); @@ -225,7 +233,10 @@ public CompletableFuture read(PlcReadRequest readRequest) { transaction.submit(() -> { conversation.submit(opcuaReadRequest, ReadResponse.class).whenComplete((response, error) -> bridge(transaction, future, response, error)); }); - return future.thenApply(response -> new DefaultPlcReadResponse(request, readResponse(request.getTagNames(), response.getResults()))); + return future.thenApply(response -> { + Map metadata = new LinkedHashMap<>(); + return new DefaultPlcReadResponse(request, readResponse(request.getTagNames(), response.getResults(), metadata, responseMetadata), metadata); + }); } static NodeId generateNodeId(OpcuaTag tag) { @@ -250,14 +261,15 @@ static NodeId generateNodeId(OpcuaTag tag) { return nodeId; } - public Map> readResponse(LinkedHashSet tagNames, List results) { + public Map> readResponse(LinkedHashSet tagNames, List results, Map metadata, Metadata responseMetadata) { PlcResponseCode responseCode = null; // initialize variable Map> response = new HashMap<>(); int count = 0; for (String tagName : tagNames) { PlcValue value = null; - if (results.get(count).getValueSpecified()) { - Variant variant = results.get(count).getValue(); + DataValue dataValue = results.get(count); + if (dataValue.getValueSpecified()) { + Variant variant = dataValue.getValue(); LOGGER.trace("Response of type {}", variant.getClass().toString()); if (variant instanceof VariantBoolean) { byte[] array = ((VariantBoolean) variant).getValue(); @@ -410,12 +422,22 @@ public Map> readResponse(LinkedHashSet ta responseCode = PlcResponseCode.OK; } } else { - StatusCode statusCode = results.get(count).getStatusCode(); + StatusCode statusCode = dataValue.getStatusCode(); responseCode = mapOpcStatusCode(statusCode.getStatusCode(), PlcResponseCode.UNSUPPORTED); - LOGGER.error("Error while reading value from OPC UA server error code:- " + results.get(count).getStatusCode().toString()); + LOGGER.error("Error while reading value from OPC UA server error code:- " + dataValue.getStatusCode().toString()); } - count++; + + Metadata tagMetadata = new Metadata.Builder(responseMetadata) + .put(OpcMetadataKeys.QUALITY, new OpcuaQualityStatus(dataValue.getStatusCode())) + .put(OpcMetadataKeys.SERVER_TIMESTAMP, dataValue.getServerTimestamp()) + .put(OpcMetadataKeys.SOURCE_TIMESTAMP, dataValue.getSourceTimestamp()) + .put(PlcMetadataKeys.TIMESTAMP, dataValue.getServerTimestamp()) + .put(PlcMetadataKeys.TIMESTAMP_SOURCE, TimeSource.SOFTWARE) + .build(); response.put(tagName, new ResponseItem<>(responseCode, value)); + metadata.put(tagName, tagMetadata); + + count++; } return response; } diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java index a68a04fb4d1..137c57db21b 100644 --- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java +++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java @@ -24,8 +24,10 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import org.apache.plc4x.java.api.messages.PlcMetadataKeys; import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent; import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest; +import org.apache.plc4x.java.api.metadata.Metadata; import org.apache.plc4x.java.api.model.PlcConsumerRegistration; import org.apache.plc4x.java.api.value.PlcValue; import org.apache.plc4x.java.opcua.context.Conversation; @@ -258,14 +260,21 @@ public void stopSubscriber() { * @param values - array of data values to be sent to the client. */ private void onSubscriptionValue(MonitoredItemNotification[] values) { + long receiveTs = System.currentTimeMillis(); + Metadata responseMetadata = new Metadata.Builder() + .put(PlcMetadataKeys.RECEIVE_TIMESTAMP, receiveTs) + .build(); + LinkedHashSet tagNameList = new LinkedHashSet<>(); List dataValues = new ArrayList<>(values.length); for (MonitoredItemNotification value : values) { tagNameList.add(tagNames.get((int) value.getClientHandle() - 1)); dataValues.add(value.getValue()); } - Map> tags = plcSubscriber.readResponse(tagNameList, dataValues); - final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(), tags); + + Map metadata = new HashMap<>(); + Map> tags = plcSubscriber.readResponse(tagNameList, dataValues, metadata, responseMetadata); + final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(), tags, metadata); consumers.forEach(plcSubscriptionEventConsumer -> plcSubscriptionEventConsumer.accept(event)); } diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/tag/OpcuaQualityStatus.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/tag/OpcuaQualityStatus.java new file mode 100644 index 00000000000..1500e210999 --- /dev/null +++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/tag/OpcuaQualityStatus.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.opcua.tag; + +import org.apache.plc4x.java.opcua.readwrite.StatusCode; + +public final class OpcuaQualityStatus { + + private static final long STATUS_MASK = 0xC0000000L; + private static final long STATUS_GOOD = 0x00000000L; + private static final long STATUS_UNCERTAIN = 0x40000000L; + private static final long STATUS_BAD = 0x80000000L; + + private final StatusCode statusCode; + + public OpcuaQualityStatus(StatusCode statusCode) { + this.statusCode = statusCode; + } + + public boolean isGood() { + return (statusCode.getStatusCode() & STATUS_MASK) == STATUS_GOOD; + } + + public boolean isBad() { + return (statusCode.getStatusCode() & STATUS_MASK) == STATUS_BAD; + } + + public boolean isUncertain() { + return (statusCode.getStatusCode() & STATUS_MASK) == STATUS_UNCERTAIN; + } + + @Override + public String toString() { + if (isGood()) { + return "good"; + } else if (isBad()) { + return "bad"; + } + return "uncertain"; + } +}