Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to dismiss incomple received lines? #10

Open
tophee opened this issue Sep 19, 2023 · 10 comments
Open

How to dismiss incomple received lines? #10

tophee opened this issue Sep 19, 2023 · 10 comments

Comments

@tophee
Copy link

tophee commented Sep 19, 2023

I have been using your script for a few years now and it did me great service. Thanks for creating it.

Recently I had to change my setup and I can't seem to get the script running continuously on the new machine. At first, it runs fine, but after a few minutes up to a few hours, it gets stuck with this error:

DEBUG:root:Received line b'FTX,status=testing elapsed=60017i,t0=17.90,t1=19.20,t2=22.90,t3=22.20,rh0=91.20,ah0=13.9,rh1=100.00,ah1=16.5,rh2=70.20,ah2=14.3,rh3=83.30,ah3=16.4,dew0=16.4,dew1=19.2,dew2=17.2,dew3=19.2,h-eff=601.1,t-eff=26.0,hum-gain=34.64,CR0-1=-0.20,CR0-2=-0.20,CR2-1=0.00,CR2-2=0.20,end=1\r\n'
DEBUG:root:Sending lines: [b'FTX,status=testing elapsed=60017i,t0=17.90,t1=19.20,t2=22.90,t3=22.20,rh0=91.20,ah0=13.9,rh1=100.00,ah1=16.5,rh2=70.20,ah2=14.3,rh3=83.30,ah3=16.4,dew0=16.4,dew1=19.2,dew2=17.2,dew3=19.2,h-eff=601.1,t-eff=26.0,hum-gain=34.64,CR0-1=-0.20,CR0-2=-0.20,CR2-1=0.00,CR2-2=0.20,end=1 1695087145207804928']
DEBUG:root:Received line b'FTX_log ventstate=1,ledventstate=1,buttonpressed=0i,ledventstatechanged=0i,brightness=25i,desiredventstate=1,error0=0i,error1=0i,error2=0i,error3=0i,error4=0i,error5=0i,lederror=0i,printerror=0i,timerup=20464546i,timerdown=0i,timerhigh=0i,old_t_vals0=0i,old_t_vals1=0i,old_t_vals2=0i,old_t_vals3=0i,old_h_vals0=0i,old_h_vals1=0i,old_h_vals2=0i,old_h_vals3=0i,senFTX,status=testing elapsed=60011i,t0=17.90,t1=19.20,t2=22.89,t3=22.20,rh0=91.20,ah0=13.9,rh1=100.00,ah1=16.5,rh2=70.20,ah2=14.3,rh3=83.30,ah3=16.4,dew0=16.4,dew1=19.2,dew2=17.2,dew3=19.2,h-eff=610.4,t-eff=26.0,hum-gain=34.74,CR0-1=-0.20,CR0-2=-0.20,CR2-1=0.00,CR2-2=0.20,end=1\r\n'
ERROR:root:Error, retrying with backoff
Traceback (most recent call last):
  File "/home/christoph/arduino-influxdb/collect.py", line 70, in ReadLoop
    float(timestamp)
ValueError: could not convert string to float: 'elapsed=60011i,t0=17.90,t1=19.20,t2=22.89,t3=22.20,rh0=91.20,ah0=13.9,rh1=100.00,ah1=16.5,rh2=70.20,ah2=14.3,rh3=83.30,ah3=16.4,dew0=16.4,dew1=19.2,dew2=17.2,dew3=19.2,h-eff=610.4,t-eff=26.0,hum-gain=34.74,CR0-1=-0.20,CR0-2=-0.20,CR2-1=0.00,CR2-2=0.20,end=1'
^CTraceback (most recent call last):
  File "/home/christoph/arduino-influxdb/collect.py", line 215, in <module>
    main()
  File "/home/christoph/arduino-influxdb/collect.py", line 211, in main
    writer.join()
  File "/usr/lib/python3.10/threading.py", line 1096, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock
    if lock.acquire(block, timeout):
KeyboardInterrupt

I have included the last succeeding line for comparison.

If you take a closer look at the second (i.e. the failing) received line, you can see that it is cut off mid-way:

DEBUG:root:Received line b'FTX_log ventstate=1,ledventstate=1,buttonpressed=0i,ledventstatechanged=0i,brightness=25i,desiredventstate=1,error0=0i,error1=0i,error2=0i,error3=0i,error4=0i,error5=0i,lederror=0i,printerror=0i,timerup=20464546i,timerdown=0i,timerhigh=0i,old_t_vals0=0i,old_t_vals1=0i,old_t_vals2=0i,old_t_vals3=0i,old_h_vals0=0i,old_h_vals1=0i,old_h_vals2=0i,old_h_vals3=0i,sen

And after that, a new line is received, starting with FTX...

So, for some reason, the transmission of that line failed and we shall for now not care why. What I'm trying to understand is whether there is a way how the script can be made to handle (i.e. dismiss) such incomplete lines.

Is there a way?

As I write, it occurs to me that I might have set the timeout too high (I set it to 130 seconds, while the arduino is sending every 60 seconds), so if the line is dismissed if no end of line is received after the timeout, then that will save the problem. Will try it now. But let me know if there is another/better way.

@tophee
Copy link
Author

tophee commented Sep 20, 2023

OK, unfortunately, changing the timeout didn't resolve the issue:

DEBUG:root:Received line b'FTX_log ventstate=1,ledventstate=1,buttonpressFTX,status=testing elapsed=60017i,t0=16.70,t1=22.10,t2=22.80,t3=18.80,rh0=91.42,ah0=13.0,rh1=71.30,ah1=13.9,rh2=66.00,ah2=13.4,rh3=97.30,ah3=15.7,dew0=15.3,dew1=16.7,dew2=16.1,dew3=18.4,h-eff=219.1,t-eff=88.5,hum-gain=8.20,CR0-1=1.19,CR0-2=0.60,CR2-1=0.20,CR2-2=0.00,end=1\r\n'
ERROR:root:Error, retrying with backoff
Traceback (most recent call last):
  File "/home/christoph/arduino-influxdb/collect.py", line 70, in ReadLoop
    float(timestamp)
ValueError: could not convert string to float: 'elapsed=60017i,t0=16.70,t1=22.10,t2=22.80,t3=18.80,rh0=91.42,ah0=13.0,rh1=71.30,ah1=13.9,rh2=66.00,ah2=13.4,rh3=97.30,ah3=15.7,dew0=15.3,dew1=16.7,dew2=16.1,dew3=18.4,h-eff=219.1,t-eff=88.5,hum-gain=8.20,CR0-1=1.19,CR0-2=0.60,CR2-1=0.20,CR2-2=0.00,end=1'

I'm not sure what this means...

@ppetr
Copy link
Owner

ppetr commented Sep 22, 2023

Hi, I'm glad that you find the script useful :). I'll need to have a look at the source code. IIRC there is some functionality to skip the first line, which might be incomplete, but not afterwards. I wonder what might be a good way to detect such incomplete lines. Perhaps I could add a flag that when no newline is received within N seconds, the line is discarded, WDYT?

@tophee
Copy link
Author

tophee commented Sep 22, 2023

add a flag that when no newline is received within N seconds, the line is discarded

Yes, that makes sense!

@tophee
Copy link
Author

tophee commented Sep 22, 2023

I don't know how to add a flag, so I hard-coded a 10-second wait into the readloop like this:

def ReadLoop(args, queue: persistent_queue.Queue):
    """Reads samples and stores them in a queue. Retries on IO errors."""
    serial_fn: Optional[Callable[[BinaryIO], Generator[bytes, None,
                                                       None]]] = None
    if args.serial_function:
        module, fn_name = args.serial_function.rsplit(".", 1)
        serial_fn = getattr(importlib.import_module(module), fn_name)
    try:
        logging.debug("Read loop started")
        with serial.serial_for_url(args.device,
                                   baudrate=args.baud_rate,
                                   timeout=args.read_timeout) as handle:
            if serial_fn:
                lines = serial_fn(handle)
            else:
                lines = serial_samples.SerialLines(handle, args.max_line_length)
            start_time = time.time()
            for line in lines:
                # If line does not end with newline character and more than 10 seconds have passed since its reception began
                if not line.endswith(b'\n') and (time.time() - start_time > 10):
                    continue
                # Reset the timer when we receive a complete transmission.
                elif line.endswith(b'\n'):
                    start_time = time.time()
                try:
                    line = str(line, encoding="UTF-8")
                except TypeError:
                    pass
                # Parse 'line', either with or without timestamp.
                words = line.strip().split(" ")
                if len(words) == 2:
                    (tags, values) = words
                    timestamp = int(time.time() * 1000000000)
                elif len(words) == 3:
                    (tags, values, timestamp) = words
                    float(timestamp)
                else:
                    raise ValueError("Unable to parse line {0!r}".format(line))
                tags: str = ",".join(t for t in (tags, args.tags) if t)
                queue.put("{0} {1} {2:d}".format(tags, values, timestamp))
    except:
        logging.exception("Error, retrying with backoff")
        raise

I'm currently testing this and will let you now if it works (or if you spot a flaw right away, let me know).

@tophee
Copy link
Author

tophee commented Sep 22, 2023

Upon further reflection, I realize that my test code is not actually discarding the incomplete line. But I don't understand whether that is a problem or not - it probably is - or how I can discard it. I don't understand how the serial data is read (i.e. how and when lines is written.

@tophee
Copy link
Author

tophee commented Sep 22, 2023

Not sure if this is any smarter than my initial try, but now I modified the SerialLines function:

def SerialLines(handle: BinaryIO, max_line_length: int) -> Generator[bytes, None, None]:
    """A generator that yields lines from a configured serial line.

    Will never exit normally, only with an exception when there is an error
    in the serial communication.
    """
    SkipUntilNewLine(handle)
    start_time = time.time()  # initiates the start_time
    buffer = b''
    while True:
        line: bytes = handle.readline(max_line_length)
        logging.debug("Received line %r", line)
        buffer += line
        if buffer.endswith(b"\n"):
            full_line = buffer
            buffer = b''  # reset the buffer
            start_time = time.time()  # reset the time
            yield full_line.rstrip()
        elif time.time() - start_time > 10:
            buffer = b''  # discard the buffer if no newline is received in 10 secs
            start_time = time.time()  # reset the time

@tophee
Copy link
Author

tophee commented Sep 23, 2023

It didn't work, so I dove a bit more into the code and started wondering: shouldn't we actually be checking for \r\n rather than just \n?

It looks like the reason why the existing checks for \n are not working for me is that my Arduino is sending the individual values with Serial.print() and only the last transmission is sent with Serial.println(). I believe that Serial.print() appends \n and Serial.println() appends \r\n... Edit: No, I misunderstood. This is not the case.

@tophee
Copy link
Author

tophee commented Sep 23, 2023

FWIW, I'm trying this now:

def check_incomplete_line(handle: BinaryIO, timeout: float, max_line_length: int):
    '''Attempt to read a line within the given timeout. 
       Returns an incomplete line if timeout expires.'''
    end_time = time.time() + timeout
    line = b""
    while time.time() < end_time and len(line) < max_line_length:
        if handle.readable():
            char = handle.read(1)  # read one byte
            line += char
            if char == b'\n':
                return line
    return line  # return incomplete line

def SerialLines(handle: BinaryIO, max_line_length: int, timeout: float) -> Generator[bytes, None, None]:
    """A generator that yields lines from a configured serial line.
       Will never exit normally, only with an exception when there is an error
       in the serial communication.
    """
    SkipUntilNewLine(handle)
    while True:
        # replaced the handle.readline() call with our custom function 
        line: bytes = check_incomplete_line(handle, timeout, max_line_length)
        logging.debug("Received line %r", line)
        if not line.endswith(b"\n"):
            raise LineOverflowError(line, max_line_length)
        yield line.rstrip()

Which means I'm calling SerialLines() with a 10 second timeout. Let's see...
Edit: Nope. Doesn't work at all.

This is too difficult for me. I obviously don't know what I'm doing.

@tophee
Copy link
Author

tophee commented Sep 24, 2023

I think the problem in the previous version was that the timer started before anything was received. It looks like this works:

def check_incomplete_line(handle: BinaryIO, timeout: float, max_line_length: int):
    '''Attempt to read a line within the given timeout starting from the first received byte.
       Returns nothing if timeout expires.'''
    line = b""
    end_time = None
    while len(line) < max_line_length:
        if handle.readable():
            char = handle.read(1)  # read one byte
            if end_time is None:  # if this is the first received byte
                end_time = time.time() + timeout  # set the end time
            line += char
            if char == b'\n':
                return line
            elif  (time.time() >= end_time):
                logging.warning("Dismissing incomplete line: %r", line)
                return None # don't return the incomplete line

def SerialLines(handle: BinaryIO, max_line_length: int, timeout: float) -> Generator[bytes, None, None]:
    """A generator that yields lines from a configured serial line.
       Will never exit normally, only with an exception when there is an error
       in the serial communication.
    """
    SkipUntilNewLine(handle)
    while True:
        # replaced the handle.readline() call with our custom function
        line: bytes = check_incomplete_line(handle, timeout, max_line_length)
        logging.debug("Received line %r", line)
        if not line.endswith(b"\n"):
            raise LineOverflowError(line, max_line_length)
        yield line.rstrip()

Here, the timer is started first when the first byte is received (i.e. that's when we set the end_time). If a newline is not encountered before end_time, the function stops reading more characters and returns None. So, if this continues to work, it would be the answer to the title of this issue. :-)

For the sake of completeness: in order for this to work, we need to specify the timeout when the function is called in collect.py:

                lines = serial_samples.SerialLines(handle, args.max_line_length, 10.0)

I have hard-coded 10 seconds, because figuring out how to add a flag that receives that value and adding code for when no timeout is specified would probably take me more hours of trial and error... I'm not even sure whether no flag should use the original code or some default timeout. The latter would be easier to implement.

@ppetr
Copy link
Owner

ppetr commented Sep 25, 2023

Thank you for looking into this! I'll try to incorporate your fix soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants