Skip to content

Commit

Permalink
Draft of metadata API with timestamp tracking capabilities.
Browse files Browse the repository at this point in the history
Signed-off-by: Łukasz Dywicki <[email protected]>
  • Loading branch information
splatch committed Aug 9, 2024
1 parent b186437 commit 989cc2c
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.plc4x.java.api.messages;

import org.apache.plc4x.java.api.metadata.Metadata.Key;
import org.apache.plc4x.java.api.metadata.time.TimeSource;

/**
* High level definition of common metadata keys which can occur across multiple drivers.
*/
public interface PlcMetadataKeys {

Key<Long> TIMESTAMP = Key.of("timestamp", Long.class);
Key<TimeSource> TIMESTAMP_SOURCE = Key.of("timestamp_source", TimeSource.class);

Key<Long> RECEIVE_TIMESTAMP = Key.of("receive_timestamp", Long.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.plc4x.java.api.messages;

import org.apache.plc4x.java.api.metadata.Metadata;
import org.apache.plc4x.java.api.model.PlcTag;
import org.apache.plc4x.java.api.types.PlcResponseCode;

Expand All @@ -38,4 +39,9 @@ public interface PlcTagResponse extends PlcResponse {

PlcResponseCode getResponseCode(String name);

/**
* Returns tag level metadata information.
*/
Metadata getTagMetadata(String name);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.plc4x.java.api.metadata;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Metadata {

public static class Key<T> {

private final String key;
private final Class<T> type;

protected Key(String key, Class<T> type) {
this.key = key;
this.type = type;
}

public String getKey() {
return key;
}

public boolean validate(Object value) {
return type.isInstance(value);
}

public static <T> Key<T> of(String key, Class<T> type) {
return new Key<>(key, type);
}

}

public final static Metadata EMPTY = new Metadata(Collections.emptyMap());

private final Metadata parent;
private final Map<Key<?>, Object> values;

Metadata(Map<Key<?>, Object> values) {
this(values, EMPTY);
}

public Metadata(Map<Key<?>, Object> values, Metadata parent) {
this.parent = parent;
this.values = values;
}

public Set<Key<?>> keys() {
Set<Key<?>> keys = new LinkedHashSet<>(values.keySet());
keys.addAll(parent.keys());
return Collections.unmodifiableSet(keys);
}

public Object getValue(Key<?> key) {
Object value = values.get(key);
if (value == null) {
return parent.getValue(key);
}
return value;
}

public static class Builder {
private final Logger logger = LoggerFactory.getLogger(Builder.class);

private final Map<Key<?>, Object> values = new HashMap<>();
private final Metadata parent;

public Builder() {
this(Metadata.EMPTY);
}

public Builder(Metadata parent) {
this.parent = parent;
}

public <T> Builder put(Key<T> key, T value) {
if (!key.validate(value)) {
logger.debug("Ignore metadata value {}, it does not match constraints imposed by key {}", value, key);
return this;
}

values.put(key, value);
return this;
}

public Metadata build() {
return new Metadata(values, parent);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.plc4x.java.api.metadata.time;

public enum TimeSource {

// Time information is assumed by PLC4X itself
ASSUMPTION,
// Time comes from software layer, kernel driver and similar
SOFTWARE,
// Time can is confronted through hardware i.e. microcontroller
HARDWARE,
// Other source of time which fall into separate truthiness category
OTHER

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.plc4x.java.api.exceptions.PlcInvalidTagException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.api.metadata.Metadata;
import org.apache.plc4x.java.api.metadata.time.TimeSource;
import org.apache.plc4x.java.api.model.*;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.types.PlcSubscriptionType;
Expand Down Expand Up @@ -1447,20 +1449,29 @@ protected void decode(ConversationContext<AmsTCPPacket> context, AmsTCPPacket ms
if (msg.getUserdata() instanceof AdsDeviceNotificationRequest) {
AdsDeviceNotificationRequest notificationData = (AdsDeviceNotificationRequest) msg.getUserdata();
List<AdsStampHeader> stamps = notificationData.getAdsStampHeaders();
long receiveTs = System.currentTimeMillis();
for (AdsStampHeader stamp : stamps) {
// convert Windows FILETIME format to unix epoch
long unixEpochTimestamp = stamp.getTimestamp().divide(BigInteger.valueOf(10000L)).longValue() - 11644473600000L;
// result metadata
Metadata eventMetadata = new Metadata.Builder()
.put(PlcMetadataKeys.RECEIVE_TIMESTAMP, receiveTs)
.put(PlcMetadataKeys.TIMESTAMP, unixEpochTimestamp)
.put(PlcMetadataKeys.TIMESTAMP_SOURCE, TimeSource.SOFTWARE)
.build();
List<AdsNotificationSample> samples = stamp.getAdsNotificationSamples();
for (AdsNotificationSample sample : samples) {
long handle = sample.getNotificationHandle();
for (DefaultPlcConsumerRegistration registration : consumers.keySet()) {
for (PlcSubscriptionHandle subscriptionHandle : registration.getSubscriptionHandles()) {
if (subscriptionHandle instanceof AdsSubscriptionHandle) {
AdsSubscriptionHandle adsHandle = (AdsSubscriptionHandle) subscriptionHandle;
if (adsHandle.getNotificationHandle() == handle)
consumers.get(registration).accept(
new DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(unixEpochTimestamp),
convertSampleToPlc4XResult(adsHandle, sample.getData())));
if (adsHandle.getNotificationHandle() == handle) {
Map<String, Metadata> metadata = new HashMap<>();
Instant timestamp = Instant.ofEpochMilli(unixEpochTimestamp);
DefaultPlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(timestamp, convertSampleToPlc4XResult(adsHandle, sample.getData(), metadata, eventMetadata));
consumers.get(registration).accept(event);
}
}
}
}
Expand All @@ -1469,12 +1480,14 @@ protected void decode(ConversationContext<AmsTCPPacket> context, AmsTCPPacket ms
}
}

private Map<String, ResponseItem<PlcValue>> convertSampleToPlc4XResult(AdsSubscriptionHandle subscriptionHandle, byte[] data) throws
private Map<String, ResponseItem<PlcValue>> convertSampleToPlc4XResult(AdsSubscriptionHandle subscriptionHandle, byte[] data,
Map<String, Metadata> tagMetadata, Metadata metadata) throws
ParseException {
Map<String, ResponseItem<PlcValue>> values = new HashMap<>();
ReadBufferByteBased readBuffer = new ReadBufferByteBased(data, ByteOrder.LITTLE_ENDIAN);
values.put(subscriptionHandle.getTagName(), new ResponseItem<>(PlcResponseCode.OK,
DataItem.staticParse(readBuffer, getPlcValueTypeForAdsDataType(subscriptionHandle.getAdsDataType()), data.length)));
tagMetadata.put(subscriptionHandle.getTagName(), new Metadata.Builder(metadata).build());
return values;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.plc4x.java.opcua;

import org.apache.plc4x.java.api.metadata.Metadata;
import org.apache.plc4x.java.api.metadata.Metadata.Key;
import org.apache.plc4x.java.opcua.tag.OpcuaQualityStatus;

/**
* OPC UA level metadata keys.
*/
public interface OpcMetadataKeys {

Key<OpcuaQualityStatus> QUALITY = Metadata.Key.of("opcua_quality", OpcuaQualityStatus.class);

Key<Long> SERVER_TIMESTAMP = Metadata.Key.of("opcua_server_timestamp", Long.class);
Key<Long> SOURCE_TIMESTAMP = Metadata.Key.of("opcua_source_timestamp", Long.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.api.metadata.Metadata;
import org.apache.plc4x.java.api.metadata.time.TimeSource;
import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.types.PlcValueType;
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.opcua.OpcMetadataKeys;
import org.apache.plc4x.java.opcua.config.OpcuaConfiguration;
import org.apache.plc4x.java.opcua.context.Conversation;
import org.apache.plc4x.java.opcua.context.OpcuaDriverContext;
import org.apache.plc4x.java.opcua.context.SecureChannel;
import org.apache.plc4x.java.opcua.readwrite.*;
import org.apache.plc4x.java.opcua.tag.OpcuaQualityStatus;
import org.apache.plc4x.java.opcua.tag.OpcuaTag;
import org.apache.plc4x.java.spi.ConversationContext;
import org.apache.plc4x.java.spi.Plc4xProtocolBase;
Expand Down Expand Up @@ -196,6 +200,10 @@ private SecureChannel createSecureChannel(ConversationContext<OpcuaAPU> context,
public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
LOGGER.trace("Reading Value");

Metadata responseMetadata = new Metadata.Builder()
.put(PlcMetadataKeys.RECEIVE_TIMESTAMP, System.currentTimeMillis())
.build();

DefaultPlcReadRequest request = (DefaultPlcReadRequest) readRequest;
RequestHeader requestHeader = conversation.createRequestHeader();

Expand Down Expand Up @@ -225,7 +233,10 @@ public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
transaction.submit(() -> {
conversation.submit(opcuaReadRequest, ReadResponse.class).whenComplete((response, error) -> bridge(transaction, future, response, error));
});
return future.thenApply(response -> new DefaultPlcReadResponse(request, readResponse(request.getTagNames(), response.getResults())));
return future.thenApply(response -> {
Map<String, Metadata> metadata = new LinkedHashMap<>();
return new DefaultPlcReadResponse(request, readResponse(request.getTagNames(), response.getResults(), metadata, responseMetadata), metadata);
});
}

static NodeId generateNodeId(OpcuaTag tag) {
Expand All @@ -250,14 +261,15 @@ static NodeId generateNodeId(OpcuaTag tag) {
return nodeId;
}

public Map<String, ResponseItem<PlcValue>> readResponse(LinkedHashSet<String> tagNames, List<DataValue> results) {
public Map<String, ResponseItem<PlcValue>> readResponse(LinkedHashSet<String> tagNames, List<DataValue> results, Map<String, Metadata> metadata, Metadata responseMetadata) {
PlcResponseCode responseCode = null; // initialize variable
Map<String, ResponseItem<PlcValue>> response = new HashMap<>();
int count = 0;
for (String tagName : tagNames) {
PlcValue value = null;
if (results.get(count).getValueSpecified()) {
Variant variant = results.get(count).getValue();
DataValue dataValue = results.get(count);
if (dataValue.getValueSpecified()) {
Variant variant = dataValue.getValue();
LOGGER.trace("Response of type {}", variant.getClass().toString());
if (variant instanceof VariantBoolean) {
byte[] array = ((VariantBoolean) variant).getValue();
Expand Down Expand Up @@ -410,12 +422,22 @@ public Map<String, ResponseItem<PlcValue>> readResponse(LinkedHashSet<String> ta
responseCode = PlcResponseCode.OK;
}
} else {
StatusCode statusCode = results.get(count).getStatusCode();
StatusCode statusCode = dataValue.getStatusCode();
responseCode = mapOpcStatusCode(statusCode.getStatusCode(), PlcResponseCode.UNSUPPORTED);
LOGGER.error("Error while reading value from OPC UA server error code:- " + results.get(count).getStatusCode().toString());
LOGGER.error("Error while reading value from OPC UA server error code:- " + dataValue.getStatusCode().toString());
}
count++;

Metadata tagMetadata = new Metadata.Builder(responseMetadata)
.put(OpcMetadataKeys.QUALITY, new OpcuaQualityStatus(dataValue.getStatusCode()))
.put(OpcMetadataKeys.SERVER_TIMESTAMP, dataValue.getServerTimestamp())
.put(OpcMetadataKeys.SOURCE_TIMESTAMP, dataValue.getSourceTimestamp())
.put(PlcMetadataKeys.TIMESTAMP, dataValue.getServerTimestamp())
.put(PlcMetadataKeys.TIMESTAMP_SOURCE, TimeSource.SOFTWARE)
.build();
response.put(tagName, new ResponseItem<>(responseCode, value));
metadata.put(tagName, tagMetadata);

count++;
}
return response;
}
Expand Down
Loading

0 comments on commit 989cc2c

Please sign in to comment.