Skip to content

Commit

Permalink
Draft of metadata API with timestamp tracking capabilities.
Browse files Browse the repository at this point in the history
Signed-off-by: Łukasz Dywicki <[email protected]>
  • Loading branch information
splatch committed Oct 25, 2024
1 parent 5e8fa8b commit 6997913
Show file tree
Hide file tree
Showing 23 changed files with 913 additions and 378 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.plc4x.java.api.messages;

import org.apache.plc4x.java.api.metadata.Metadata.Key;
import org.apache.plc4x.java.api.metadata.time.TimeSource;

/**
* High level definition of common metadata keys which can occur across multiple drivers.
*/
public interface PlcMetadataKeys {

Key<Long> TIMESTAMP = Key.of("timestamp", Long.class);
Key<TimeSource> TIMESTAMP_SOURCE = Key.of("timestamp_source", TimeSource.class);

Key<Long> RECEIVE_TIMESTAMP = Key.of("receive_timestamp", Long.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.plc4x.java.api.messages;

import org.apache.plc4x.java.api.metadata.Metadata;
import org.apache.plc4x.java.api.model.PlcTag;
import org.apache.plc4x.java.api.types.PlcResponseCode;

Expand All @@ -38,4 +39,9 @@ public interface PlcTagResponse extends PlcResponse {

PlcResponseCode getResponseCode(String name);

/**
* Returns tag level metadata information.
*/
Metadata getTagMetadata(String name);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.plc4x.java.api.metadata.time;

public enum TimeSource {

// Time information is assumed by PLC4X itself
ASSUMPTION,
// Time comes from software layer, kernel driver and similar
SOFTWARE,
// Time can is confronted through hardware i.e. microcontroller
HARDWARE,
// Other source of time which fall into separate truthiness category
OTHER

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import org.apache.plc4x.java.api.exceptions.PlcInvalidTagException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.api.metadata.Metadata;
import org.apache.plc4x.java.spi.metadata.DefaultMetadata;
import org.apache.plc4x.java.api.metadata.time.TimeSource;
import org.apache.plc4x.java.api.model.*;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.types.PlcSubscriptionType;
Expand Down Expand Up @@ -718,8 +721,13 @@ protected CompletableFuture<PlcReadResponse> singleRead(PlcReadRequest readReque
.check(userdata -> userdata.getInvokeId() == amsPacket.getInvokeId())
.only(AdsReadResponse.class)
.handle(response -> {
// result metadata
Metadata metadata = new DefaultMetadata.Builder()
.put(PlcMetadataKeys.RECEIVE_TIMESTAMP, System.currentTimeMillis())
.put(PlcMetadataKeys.TIMESTAMP_SOURCE, TimeSource.ASSUMPTION)
.build();
if (response.getResult() == ReturnCode.OK) {
final PlcReadResponse plcReadResponse = convertToPlc4xReadResponse(readRequest, Map.of((AdsTag) readRequest.getTags().get(0), directAdsTag), response);
final PlcReadResponse plcReadResponse = convertToPlc4xReadResponse(readRequest, Map.of((AdsTag) readRequest.getTags().get(0), directAdsTag), response, metadata);
// Convert the response from the PLC into a PLC4X Response ...
future.complete(plcReadResponse);
} else {
Expand Down Expand Up @@ -791,8 +799,12 @@ protected CompletableFuture<PlcReadResponse> multiRead(PlcReadRequest readReques
.check(userdata -> userdata.getInvokeId() == amsPacket.getInvokeId())
.only(AdsReadWriteResponse.class)
.handle(response -> {
Metadata metadata = new DefaultMetadata.Builder()
.put(PlcMetadataKeys.RECEIVE_TIMESTAMP, System.currentTimeMillis())
.put(PlcMetadataKeys.TIMESTAMP_SOURCE, TimeSource.ASSUMPTION)
.build();
if (response.getResult() == ReturnCode.OK) {
final PlcReadResponse plcReadResponse = convertToPlc4xReadResponse(readRequest, resolvedTags, response);
final PlcReadResponse plcReadResponse = convertToPlc4xReadResponse(readRequest, resolvedTags, response, metadata);
// Convert the response from the PLC into a PLC4X Response ...
future.complete(plcReadResponse);
} else if (response.getResult() == ReturnCode.ADSERR_DEVICE_INVALIDSIZE) {
Expand All @@ -807,8 +819,9 @@ protected CompletableFuture<PlcReadResponse> multiRead(PlcReadRequest readReques
return future;
}

protected PlcReadResponse convertToPlc4xReadResponse(PlcReadRequest readRequest, Map<AdsTag, DirectAdsTag> resolvedTags, AmsPacket adsData) {
protected PlcReadResponse convertToPlc4xReadResponse(PlcReadRequest readRequest, Map<AdsTag, DirectAdsTag> resolvedTags, AmsPacket adsData, Metadata responseMetadata) {
ReadBuffer readBuffer = null;
Map<String, Metadata> metadata = new HashMap<>();
Map<String, PlcResponseCode> responseCodes = new HashMap<>();

// Read the response codes first
Expand Down Expand Up @@ -841,6 +854,7 @@ protected PlcReadResponse convertToPlc4xReadResponse(PlcReadRequest readRequest,
if (readBuffer != null) {
Map<String, PlcResponseItem<PlcValue>> values = new HashMap<>();
for (String tagName : readRequest.getTagNames()) {
metadata.put(tagName, new DefaultMetadata.Builder(responseMetadata).build());
// If the response-code was anything but OK, we don't need to parse the payload.
if(responseCodes.get(tagName) != PlcResponseCode.OK) {
values.put(tagName, new DefaultPlcResponseItem<>(responseCodes.get(tagName), null));
Expand All @@ -851,7 +865,7 @@ protected PlcReadResponse convertToPlc4xReadResponse(PlcReadRequest readRequest,
values.put(tagName, parseResponseItem(directAdsTag, readBuffer));
}
}
return new DefaultPlcReadResponse(readRequest, values);
return new DefaultPlcReadResponse(readRequest, values, metadata);
}
return null;
}
Expand Down Expand Up @@ -1071,8 +1085,13 @@ protected CompletableFuture<PlcWriteResponse> singleWrite(PlcWriteRequest writeR
.check(userdata -> userdata.getInvokeId() == amsPacket.getInvokeId())
.only(AdsWriteResponse.class)
.handle(response -> {
// result metadata
Metadata eventMetadata = new DefaultMetadata.Builder()
.put(PlcMetadataKeys.RECEIVE_TIMESTAMP, System.currentTimeMillis())
.put(PlcMetadataKeys.TIMESTAMP_SOURCE, TimeSource.ASSUMPTION)
.build();
if (response.getResult() == ReturnCode.OK) {
final PlcWriteResponse plcWriteResponse = convertToPlc4xWriteResponse(writeRequest, Collections.singletonMap((AdsTag) writeRequest.getTag(tagName), directAdsTag), response);
final PlcWriteResponse plcWriteResponse = convertToPlc4xWriteResponse(writeRequest, Collections.singletonMap((AdsTag) writeRequest.getTag(tagName), directAdsTag), response, eventMetadata);
// Convert the response from the PLC into a PLC4X Response ...
future.complete(plcWriteResponse);
} else {
Expand Down Expand Up @@ -1149,8 +1168,14 @@ protected CompletableFuture<PlcWriteResponse> multiWrite(PlcWriteRequest writeRe
.check(userdata -> userdata.getInvokeId() == amsPacket.getInvokeId())
.only(AdsReadWriteResponse.class)
.handle(response -> {
// result metadata
Metadata eventMetadata = new DefaultMetadata.Builder()
.put(PlcMetadataKeys.RECEIVE_TIMESTAMP, System.currentTimeMillis())
.put(PlcMetadataKeys.TIMESTAMP_SOURCE, TimeSource.ASSUMPTION)
.build();

if (response.getResult() == ReturnCode.OK) {
final PlcWriteResponse plcWriteResponse = convertToPlc4xWriteResponse(writeRequest, resolvedTags, response);
final PlcWriteResponse plcWriteResponse = convertToPlc4xWriteResponse(writeRequest, resolvedTags, response, eventMetadata);
// Convert the response from the PLC into a PLC4X Response ...
future.complete(plcWriteResponse);
} else {
Expand Down Expand Up @@ -1244,8 +1269,9 @@ else if (!dataType.getChildren().isEmpty()) {
}
}

protected PlcWriteResponse convertToPlc4xWriteResponse(PlcWriteRequest writeRequest, Map<AdsTag, DirectAdsTag> resolvedTags, AmsPacket adsData) {
protected PlcWriteResponse convertToPlc4xWriteResponse(PlcWriteRequest writeRequest, Map<AdsTag, DirectAdsTag> resolvedTags, AmsPacket adsData, Metadata eventMtadata) {
Map<String, PlcResponseCode> responseCodes = new HashMap<>();
Map<String, Metadata> metadata = new HashMap<>();
if (adsData instanceof AdsWriteResponse) {
AdsWriteResponse adsWriteResponse = (AdsWriteResponse) adsData;
responseCodes.put(writeRequest.getTagNames().stream().findFirst().orElse(""),
Expand All @@ -1256,6 +1282,9 @@ protected PlcWriteResponse convertToPlc4xWriteResponse(PlcWriteRequest writeRequ
// When parsing a multi-item response, the error codes of each items come
// in sequence and then come the values.
for (String tagName : writeRequest.getTagNames()) {
// result metadata
metadata.put(tagName, eventMtadata);

AdsTag adsTag = (AdsTag) writeRequest.getTag(tagName);
// Skip invalid addresses.
if(resolvedTags.get(adsTag) == null) {
Expand All @@ -1271,7 +1300,7 @@ protected PlcWriteResponse convertToPlc4xWriteResponse(PlcWriteRequest writeRequ
}
}

return new DefaultPlcWriteResponse(writeRequest, responseCodes);
return new DefaultPlcWriteResponse(writeRequest, responseCodes, metadata);
}

@Override
Expand Down Expand Up @@ -1493,20 +1522,29 @@ protected void decode(ConversationContext<AmsTCPPacket> context, AmsTCPPacket ms
if (msg.getUserdata() instanceof AdsDeviceNotificationRequest) {
AdsDeviceNotificationRequest notificationData = (AdsDeviceNotificationRequest) msg.getUserdata();
List<AdsStampHeader> stamps = notificationData.getAdsStampHeaders();
long receiveTs = System.currentTimeMillis();
for (AdsStampHeader stamp : stamps) {
// convert Windows FILETIME format to unix epoch
long unixEpochTimestamp = stamp.getTimestamp().divide(BigInteger.valueOf(10000L)).longValue() - 11644473600000L;
// result metadata
Metadata eventMetadata = new DefaultMetadata.Builder()
.put(PlcMetadataKeys.RECEIVE_TIMESTAMP, receiveTs)
.put(PlcMetadataKeys.TIMESTAMP, unixEpochTimestamp)
.put(PlcMetadataKeys.TIMESTAMP_SOURCE, TimeSource.SOFTWARE)
.build();
List<AdsNotificationSample> samples = stamp.getAdsNotificationSamples();
for (AdsNotificationSample sample : samples) {
long handle = sample.getNotificationHandle();
for (DefaultPlcConsumerRegistration registration : consumers.keySet()) {
for (PlcSubscriptionHandle subscriptionHandle : registration.getSubscriptionHandles()) {
if (subscriptionHandle instanceof AdsSubscriptionHandle) {
AdsSubscriptionHandle adsHandle = (AdsSubscriptionHandle) subscriptionHandle;
if (adsHandle.getNotificationHandle() == handle)
consumers.get(registration).accept(
new DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(unixEpochTimestamp),
convertSampleToPlc4XResult(adsHandle, sample.getData())));
if (adsHandle.getNotificationHandle() == handle) {
Map<String, Metadata> metadata = new HashMap<>();
Instant timestamp = Instant.ofEpochMilli(unixEpochTimestamp);
DefaultPlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(timestamp, convertSampleToPlc4XResult(adsHandle, sample.getData(), metadata, eventMetadata));
consumers.get(registration).accept(event);
}
}
}
}
Expand All @@ -1515,12 +1553,13 @@ protected void decode(ConversationContext<AmsTCPPacket> context, AmsTCPPacket ms
}
}

private Map<String, PlcResponseItem<PlcValue>> convertSampleToPlc4XResult(AdsSubscriptionHandle subscriptionHandle, byte[] data) throws
private Map<String, PlcResponseItem<PlcValue>> convertSampleToPlc4XResult(AdsSubscriptionHandle subscriptionHandle, byte[] data, Map<String, Metadata> tagMetadata, Metadata metadata) throws
ParseException {
Map<String, PlcResponseItem<PlcValue>> values = new HashMap<>();
ReadBufferByteBased readBuffer = new ReadBufferByteBased(data, ByteOrder.LITTLE_ENDIAN);
values.put(subscriptionHandle.getTagName(), new DefaultPlcResponseItem<>(PlcResponseCode.OK,
DataItem.staticParse(readBuffer, getPlcValueTypeForAdsDataType(subscriptionHandle.getAdsDataType()), data.length)));
tagMetadata.put(subscriptionHandle.getTagName(), new DefaultMetadata.Builder(metadata).build());
return values;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.plc4x.java.opcua;

import org.apache.plc4x.java.api.metadata.Metadata;
import org.apache.plc4x.java.api.metadata.Metadata.Key;
import org.apache.plc4x.java.opcua.tag.OpcuaQualityStatus;

/**
* OPC UA level metadata keys.
*/
public interface OpcMetadataKeys {

Key<OpcuaQualityStatus> QUALITY = Metadata.Key.of("opcua_quality", OpcuaQualityStatus.class);

Key<Long> SERVER_TIMESTAMP = Metadata.Key.of("opcua_server_timestamp", Long.class);
Key<Long> SOURCE_TIMESTAMP = Metadata.Key.of("opcua_source_timestamp", Long.class);

}
Loading

0 comments on commit 6997913

Please sign in to comment.