diff --git a/web_ui/device_manager.lua b/web_ui/device_manager.lua new file mode 100644 index 00000000..5ab70b8e --- /dev/null +++ b/web_ui/device_manager.lua @@ -0,0 +1,72 @@ + +-- +-- This class keeps track of the devices in the local network +-- + +local spin_util = require('spin_util') + +local _M = {} + +local device_manager = {} +device_manager_mt = { __index = device_manager } + +function _M.create(profile_manager) + local newinst = {} + setmetatable(newinst, device_manager_mt) + + newinst.devices_seen = {} + newinst.profile_manager = profile_manager + newinst.last_update = spin_util.get_time_string() + + return newinst +end + +function device_manager:get_device_seen(mac) + return self.devices_seen[mac] +end + +function device_manager:get_devices_seen() + return self.devices_seen +end + +-- Returns one of three options: +-- true if the device has been seen and is marked as 'new' +-- false if the device has been seen and is not marked as 'new' +-- nil if the device has not been seen +function device_manager:device_is_new(mac) + if self.devices_seen[mac] == nil then return nil end + return self.devices_seen[mac].new +end + +function device_manager:set_device_is_new(mac, value) + self.devices_seen_updated = spin_util.get_time_string() + self.devices_seen[mac] = value +end + + +function device_manager:add_device_seen(mac, name, timestamp) + local device_is_new = False + if self.devices_seen[mac] ~= nil then + self.devices_seen[mac]['lastSeen'] = timestamp + self.devices_seen[mac]['name'] = name + self.devices_seen[mac]['appliedProfiles'] = self.profile_manager:get_device_profiles(mac) + else + local device_data = {} + device_data['lastSeen'] = timestamp + device_data['name'] = name + device_data['new'] = true + device_data['mac'] = mac + device_data['appliedProfiles'] = self.profile_manager:get_device_profiles(mac) + device_data['enforcement'] = "" + device_data['logging'] = "" + + self.devices_seen[mac] = device_data + + device_is_new = True + end + self.devices_seen_updated = spin_util.get_time_string() + return device_is_new +end + + +return _M diff --git a/web_ui/spin_util.lua b/web_ui/spin_util.lua new file mode 100644 index 00000000..434505f0 --- /dev/null +++ b/web_ui/spin_util.lua @@ -0,0 +1,110 @@ +-- +-- Assorted utility functions for spin_webui +-- + +local sys_stat = require "posix.sys.stat" +local mt_io = require 'minittp_io' + +local _M = {} + +-- returns a straight list of all the whitespace-separated tokens +-- in the given filename +-- or nil, err on error +function _M.file_tokenize(filename) + local result = {} + local fr, err = mt_io.file_reader(filename) + if fr == nil then return nil, err end + + for line in fr:read_line_iterator(true) do + for token in line:gmatch("%S+") do table.insert(result, token) end + end + + return result +end + +function _M.file_tokenize_iterator(filename) + local data, err = file_tokenize(filename) + if data == nil then return nil, err end + + result = {} + result.index = 1 + result.done = false + result.data = data + + function result:nxt() + if not self.done then + local value = self.data[self.index] + self.index = self.index + 1 + if self.index > table.getn(self.data) then self.done = true end + return value + else + return nil + end + end + return result +end + +function _M.strip_quotes(value) + if value:len() == 1 then return value end + if value:startswith("'") and value:endswith("'") then + return value:sub(2, value:len()-1) + elseif value:startswith('"') and value:endswith('"') then + return value:sub(2, value:len()-1) + else + return value + end +end + +-- +-- config file parser, reads config files +-- in the 'luci-style' form of +-- +-- config +-- option '' +-- option '' +-- config +-- option '' +-- +-- very basic config parser; hardly any checking +function _M.config_parse(filename) + local config = {} + local tokens, err = file_tokenize_iterator(filename) + if tokens == nil then return nil, err end + + local cur_section = "main" + + while not tokens.done do + local token = tokens:nxt() + if token == "config" then + cur_section = tokens:nxt() + elseif token == "option" then + local option_name = tokens:nxt() + local option_val = strip_quotes(tokens:nxt()) + if config[cur_section] == nil then config[cur_section] = {} end + config[cur_section][option_name] = option_val + print(config[cur_section][option_name]) + end + end + return config +end + +-- return the current time, or given timestamp in the format of RFC7232 section 2.2 +function _M.get_time_string(timestamp) + if timestamp ~= nil then + return os.date("%a, %d %b %Y %X %z", timestamp) + else + return os.date("%a, %d %b %Y %X %z") + end +end + +-- return the file timestamp in the format of RFC7232 section 2.2 +function _M.get_file_timestamp(file_path) + local fstat, err = sys_stat.stat(file_path) + if fstat == nil then + return nil, err + end + + return os.date("%a, %d %b %Y %X %z", fstat.st_mtime) +end + +return _M diff --git a/web_ui/spin_webui.lua b/web_ui/spin_webui.lua index ccacfc91..ef1e56b8 100644 --- a/web_ui/spin_webui.lua +++ b/web_ui/spin_webui.lua @@ -1,130 +1,48 @@ +-- +-- This is the main spin web UI handling code +-- +-- This class handles the incoming HTTP and websocket requests and +-- responses. +-- +-- Internal SPIN functionality is generally handled by the _manager +-- classes + +local coxpcall = require 'coxpcall' local mt_engine = require 'minittp_engine' local mt_io = require 'minittp_io' local mt_util = require 'minittp_util' +local spin_util = require 'spin_util' + local copas = require 'copas' local liluat = require 'liluat' -local sys_stat = require "posix.sys.stat" local mqtt = require 'mosquitto' local json = require 'json' +-- Additional supporting tools local ws_ext = require 'ws_ext' +local tcpdumper = require 'tcpdumper' local TRAFFIC_CHANNEL = "SPIN/traffic" local HISTORY_SIZE = 600 +-- The managers implement the main functionality +local device_manager_m = require 'device_manager' local profile_manager_m = require 'profile_manager' local TEMPLATE_PATH = "templates/" posix = require 'posix' --- --- config file parser, reads config files --- in the 'luci-style' form of --- --- config --- option '' --- option '' --- config --- option '' --- --- returns a straight list of all the whitespace-separated tokens --- in the given filename --- or nil, err on error -function file_tokenize(filename) - local result = {} - local fr, err = mt_io.file_reader(filename) - if fr == nil then return nil, err end - - for line in fr:read_line_iterator(true) do - for token in line:gmatch("%S+") do table.insert(result, token) end - end - - return result -end - -function file_tokenize_iterator(filename) - local data, err = file_tokenize(filename) - if data == nil then return nil, err end - - result = {} - result.index = 1 - result.done = false - result.data = data - - function result:nxt() - if not self.done then - local value = self.data[self.index] - self.index = self.index + 1 - if self.index > table.getn(self.data) then self.done = true end - return value - else - return nil - end - end - return result -end - -function strip_quotes(value) - if value:len() == 1 then return value end - if value:startswith("'") and value:endswith("'") then - return value:sub(2, value:len()-1) - elseif value:startswith('"') and value:endswith('"') then - return value:sub(2, value:len()-1) - else - return value - end -end - --- very basic config parser; hardly any checking -function config_parse(filename) - local config = {} - local tokens, err = file_tokenize_iterator(filename) - if tokens == nil then return nil, err end - - local cur_section = "main" - - while not tokens.done do - local token = tokens:nxt() - if token == "config" then - cur_section = tokens:nxt() - elseif token == "option" then - local option_name = tokens:nxt() - local option_val = strip_quotes(tokens:nxt()) - if config[cur_section] == nil then config[cur_section] = {} end - config[cur_section][option_name] = option_val - print(config[cur_section][option_name]) - end - end - return config -end - --- return the current time, or given timestamp in the format of RFC7232 section 2.2 -function get_time_string(timestamp) - if timestamp ~= nil then - return os.date("%a, %d %b %Y %X %z", timestamp) - else - return os.date("%a, %d %b %Y %X %z") - end -end - --- return the file timestamp in the format of RFC7232 section 2.2 -function get_file_timestamp(file_path) - local fstat, err = sys_stat.stat(file_path) - if fstat == nil then - return nil, err - end - - return os.date("%a, %d %b %Y %X %z", fstat.st_mtime) -end - -- -- minittp handler functionality -- handler = {} +-- +-- Loader for the templates in case of HTML responses +-- function handler:load_templates() self.templates = {} self.base_template_name = "base.html" @@ -214,7 +132,7 @@ function handler:read_config(args) config['mqtt']['host'] = "127.0.0.1" config['mqtt']['port'] = 1883 if config_file ~= nil then - config, err = config_parse(config_file) + config, err = spin_util.config_parse(config_file) if config == nil then return nil, err end end if mqtt_host ~= nil then @@ -227,31 +145,18 @@ function handler:read_config(args) end function handler:add_device_seen(mac, name, timestamp) - if self.devices_seen[mac] ~= nil then - self.devices_seen[mac]['lastSeen'] = timestamp - self.devices_seen[mac]['name'] = name - self.devices_seen[mac]['appliedProfiles'] = self.profile_manager:get_device_profiles(mac) - self:send_websocket_update("deviceUpdate", self.devices_seen[mac]) - else - local device_data = {} - device_data['lastSeen'] = timestamp - device_data['name'] = name - device_data['new'] = true - device_data['mac'] = mac - device_data['appliedProfiles'] = self.profile_manager:get_device_profiles(mac) - device_data['enforcement'] = "" - device_data['logging'] = "" - - self.devices_seen[mac] = device_data - - -- this device is new, so send a notification + -- add_device_seen returns True if the device is 'new' (i.e. wasn't seen + -- before) + if (self.device_manager:add_device_seen(mac, name, timestamp)) then local notification_txt = "New device on network! Please set a profile" self:create_notification("new_device", {}, notification_txt, mac, name) - self:send_websocket_update("newDevice", self.devices_seen[mac]) + self:send_websocket_update("newDevice", self.device_manager:get_device_seen(mac)) + else + self:send_websocket_update("deviceUpdate", self.device_manager:get_device_seen(mac)) end - self.devices_seen_updated = get_time_string() end +-- TODO: should the device manager do this part too? function handler:handle_traffic_message(data, orig_data) if data.flows == nil then return end for i, d in ipairs(data.flows) do @@ -291,9 +196,43 @@ function handler:mqtt_looper() end end +function handler:handle_mqtt_queue_msg(msg) + -- TODO: pass topic? (traffic/incident) + local success, pd = coxpcall.pcall(json.decode, msg) + if success and pd then + --print("[XX] msg: " .. msg) + if pd["command"] and pd["command"] == "traffic" then + self:handle_traffic_message(pd["result"], payload) + elseif pd["incident"] ~= nil then + local incident = pd["incident"] + local ts = incident["incident_timestamp"] + for i=ts-5,ts+5 do + if self:handle_incident_report(incident, i) then break end + end + end + end +end + +function handler:mqtt_queue_looper() + self.mqtt_queue_msgs = {} + while true do + while table.getn(self.mqtt_queue_msgs) > 0 do + local msg = table.remove(self.mqtt_queue_msgs, 1) + self:handle_mqtt_queue_msg(msg) + end + copas.sleep(0.1) + for i,c in pairs(self.websocket_clients) do + if c:has_queued_messages() then + c:send_queued_messages() + print("[XX] still queueud msgs") + end + end + end +end + function handler:handle_index(request, response) html, err = self:render("index.html", {mqtt_host = self.config['mqtt']['host']}) - response:set_header("Last-Modified", get_file_timestamp(TEMPLATE_PATH .. "index.html")) + response:set_header("Last-Modified", spin_util.get_file_timestamp(TEMPLATE_PATH .. "index.html")) if html == nil then response:set_status(500, "Internal Server Error") response.content = "Template error: " .. err @@ -307,80 +246,6 @@ function get_tcpdump_pname(request, mac) return request.client_address .. "-" .. mac end --- --- Class for managing tcpdump processes --- -local tcpdumper = {} -tcpdumper.__index = tcpdumper - -function tcpdumper.create(device, response) - local td = {} - setmetatable(td, tcpdumper) - -- check device for format here or at caller? - -- should be aa:bb:cc:dd:ee:ff - td.running = true - td.bytes_sent = 0 - td.response = response - - local subp, err = mt_io.subprocess("tcpdump", {"-U", "-i", "br-lan", "-s", "1600", "-w", "-", "ether", "host", device}, 0, true, false, false) - if subp == nil then - return nil - end - td.subp = subp - - return td -end - -function tcpdumper:read_and_send(size) - line, err = self.subp:read_bytes(size) - if line == nil then - if err ~= "read timed out" then - print("Error reading from subprocess: " .. err) - sent, err = response:send_chunk("") - subp:kill() - subp:close() - return nil, err - end - else - sent, err = self.response:send_chunk(line) - if sent == nil then - sent, err = self.response:send_chunk("") - print("Error sending data: " .. err) - subp:kill() - subp:close() - return nil, err - else - -- do not count the \r\n that signals end of chunk - self.bytes_sent = self.bytes_sent + sent - 2 - return sent - 2 - end - end -end - -function tcpdumper:run() - while self.running do - self:read_and_send(1600) - copas.sleep(0.1) - end - self.subp:kill() - self.subp:close() - - -- End with an empty chunk, as per transfer-encoding: chunked protocol - sent, err = self.response:send_chunk("") - if sent == nil then - print("Error sending data: " .. err) - else - print("Sent " .. " bytes"); - end - -- just to make sure - self.running = false - return nil -end - -function tcpdumper:stop() - self.running = false -end - function handler:handle_tcpdump_start(request, response) local device = request.params["device"] local dname = get_tcpdump_pname(request, device) @@ -446,7 +311,7 @@ function handler:handle_tcpdump_manage(request, response) end html, err = self:render_raw("tcpdump.html", { device=device, running=running, bytes_sent=bytes_sent }) - response:set_header("Last-Modified", get_file_timestamp(TEMPLATE_PATH .. "tcpdump.html")) + response:set_header("Last-Modified", spin_util.get_file_timestamp(TEMPLATE_PATH .. "tcpdump.html")) if html == nil then response:set_status(500, "Internal Server Error") @@ -473,7 +338,7 @@ function handler:handle_configuration(request, response) if request.method == "GET" then -- read the config, we don't use it ourselves just yet local fr, err = mt_io.file_reader("/etc/spin/spin_webui.conf") - response:set_header("Last-Modified", get_file_timestamp("/etc/spin/spin_webui.conf")) + response:set_header("Last-Modified", spin_util.get_file_timestamp("/etc/spin/spin_webui.conf")) if fr ~= nil then response.content = fr:read_lines_single_str() else @@ -503,29 +368,40 @@ end function handler:handle_device_list(request, response) self:set_api_headers(response) - response.content = json.encode(self.devices_seen) - - response:set_header("Last-Modified", self.devices_seen_updated) - + response.content = json.encode(self.device_manager:get_devices_seen()) + response:set_header("Last-Modified", self.device_manager.last_update) return response end +-- this *queues* a message to send to all active clients function handler:send_websocket_update(name, arguments) - print("[XX] WEBSOCKCLIENTCOUNT: " .. table.getn(self.websocket_clients)) + if table.getn(self.websocket_clients) == 0 then return end local msg = "" - if args == nil then - c:send('{"type": "update", "name": "' + name + '"}') + if arguments == nil then + msg = '{"type": "update", "name": "' .. name .. '"}' else - c:send('{"type": "update", "name": "' + name + '", "args": ' + json.encode(args) + '}') + msg = '{"type": "update", "name": "' .. name .. '", "args": ' .. json.encode(arguments) .. '}' end - for i,c in pairs(self.websocket_clients) do - c:send(msg) + print("[XX] send msg to client") + --c:send(msg) + c:queue_message(msg) end end -function handler:handle_profile_list(request, response) - self:set_api_headers(response) +-- note, client needs to be passed here (this data is only sent to the given client) +local function send_websocket_initialdata(client, name, arguments) + local msg = "" + if arguments == nil then + msg = '{"type": "data", "name": "' .. name .. '"}' + else + msg = '{"type": "data", "name": "' .. name .. '", "args": ' .. json.encode(arguments) .. '}' + end + + client:send(msg) +end + +function handler:get_filtered_profile_list() local profile_list = {} -- we'll make a selective deep copy of the data, since for now we -- want to leave out some of the fields @@ -543,7 +419,12 @@ function handler:handle_profile_list(request, response) v.rules_v4 = nil v.rules_v6 = nil end - response.content = json.encode(profile_list) + return profile_list +end + +function handler:handle_profile_list(request, response) + self:set_api_headers(response) + response.content = json.encode(self:get_filtered_profile_list()) response:set_header("Last-Modified", self.profile_manager.profiles_updated) return response end @@ -553,16 +434,15 @@ function handler:handle_device_profiles(request, response, device_mac) if request.method == "GET" or request.method == "HEAD" then local content_json, updated = self.profile_manager:get_device_profiles(device_mac) response.content = json.encode(content_json) - print("[XX] ytoyoyo lu: " .. updated) response:set_header("Last-Modified", updated) else if request.post_data ~= nil and request.post_data.profile_id ~= nil then local profile_id = request.post_data.profile_id local status = nil local err = "" - if self.devices_seen[device_mac] ~= nil then + if self.device_manager:get_device_seen(device_mac) ~= nil then status, err = self.profile_manager:set_device_profile(device_mac, request.post_data.profile_id) - local device_name = self.devices_seen[device_mac].name + local device_name = self.device_manager:get_device_seen(device_mac).name local profile_name = self.profile_manager.profiles[profile_id].name if status then local notification_txt = "Profile set to " .. profile_name @@ -597,12 +477,12 @@ end function handler:handle_toggle_new(request, response, device_mac) self:set_api_headers(response) if request.method == "POST" then - if self.devices_seen[device_mac] ~= nil and self.devices_seen[device_mac].new then - self.devices_seen[device_mac].new = false - self.devices_seen_updated = get_time_string() - else + local seen = self.device_manager:device_is_new(device_mac) + if seen == nil then response:set_status(400, "Bad request") response.content = json.encode({status = 400, error = "Unknown device: " .. device_mac}) + else + self.device_manager:set_device_is_new(device_mac, not seen) end end return response @@ -625,7 +505,9 @@ function handler:create_notification(msg_key, msg_args, text, device_mac, device new_notification['deviceName'] = device_name end table.insert(self.notifications, new_notification) - self.notifications_updated = get_time_string() + self.notifications_updated = spin_util.get_time_string() + + self:send_websocket_update("newNotification", new_notification) end function handler:delete_notification(id) @@ -637,7 +519,7 @@ function handler:delete_notification(id) end if to_remove ~= nil then table.remove(self.notifications, to_remove) - self.notifications_updated = get_time_string() + self.notifications_updated = spin_util.get_time_string() end end @@ -737,9 +619,9 @@ function handler:handle_websocket(request, response) --print("[XX] END OF FLAT HEADERS OF TYPE " .. type(flat_headers)) request.raw_sock:settimeout(1) print("[XX] AAAAAA") - status, err = self.ws_handler.add_client(flat_headers, request.raw_sock, self) + client, err = self.ws_handler.add_client(flat_headers, request.raw_sock, request.connection, self) print("[XX] BBBBBB") - if not status then + if not client then print("[XX] CCCCCC") response:set_status(400, "Bad request") response.content = err @@ -747,7 +629,29 @@ function handler:handle_websocket(request, response) else print("[XX] DDDDDD") table.insert(self.websocket_clients, status) + -- send any initial client information here + client:send('{"message": "hello, world"}') + -- Send the overview of profiles + send_websocket_initialdata(client, "profiles", self:get_filtered_profile_list()) + -- Send the overview of known devices so far, which includes their profiles + send_websocket_initialdata(client, "devices", self.device_manager:get_devices_seen()) + -- Send all notifications + send_websocket_initialdata(client, "notifications", self.notifications) print("[XX] NEW CONNECT NOW COUNT: " .. table.getn(self.websocket_clients)) + while client.state ~= 'CLOSED' do + print("[XX] NOTCLOSED") + --if client:has_queued_messages() then + -- print("[XX] have messages to send!") + -- client:send_queued_messages() + --else + -- copas.sleep(0.1) + --end + local dummy = { + send = function() end, + close = function() end + } + copas.send(dummy) + end end print("[XX] yoyo yoyo") @@ -756,6 +660,10 @@ function handler:handle_websocket(request, response) return nil end +function handler:have_websocket_messages() + return table.getn(self.websocket_messages) > 0 +end + function handler:do_add_ws_c(client) print("[XX] FOO ADDING CLINET") table.insert(self.websocket_clients, client) @@ -774,16 +682,23 @@ function handler:init(args) self.profile_manager = profile_manager_m.create_profile_manager() self.profile_manager:load_all_profiles() self.profile_manager:load_device_profiles() + + self.device_manager = device_manager_m.create(self.profile_manager) + self.notifications = {} - self.notifications_updated = get_time_string() + self.notifications_updated = spin_util.get_time_string() self.notification_counter = 1 self.websocket_clients = {} + self.websocket_messages = {} self.ws_handler = ws_ext.ws_server_create(ws_opts) + self.mqtt_queue_msgs = {} + -- We will use this list for the fixed url mappings -- Fixed handlers are interpreted as they are; they are -- ONLY valid for the EXACT path identified in this list + -- (for more flexibility, see the pattern handlers below) self.fixed_handlers = { ["/"] = handler.handle_index, ["/spin_api"] = self.handle_index, @@ -818,9 +733,6 @@ function handler:init(args) } local client = mqtt.new() - self.devices_seen = {} - self.devices_seen_updated = get_time_string() - client.ON_CONNECT = function() vprint("Connected to MQTT broker") client:subscribe(TRAFFIC_CHANNEL) @@ -834,27 +746,31 @@ function handler:init(args) local h = self client.ON_MESSAGE = function(mid, topic, payload) - --print("[XX] message for you, sir!") - local success, pd = pcall(json.decode, payload) - if success and pd then - if topic == TRAFFIC_CHANNEL then - if pd["command"] and pd["command"] == "traffic" then - h:handle_traffic_message(pd["result"], payload) - end - elseif handle_incidents and topic == INCIDENT_CHANNEL then - if pd["incident"] == nil then - print("Error: no incident data found in " .. payload) - print("Incident report ignored") - else - local incident = pd["incident"] - local ts = incident["incident_timestamp"] - for i=ts-5,ts+5 do - if handle_incident_report(incident, i) then break end - end - end - end - end - end + table.insert(h.mqtt_queue_msgs, payload) + end + +-- client.ORIG_ON_MESSAGE = function(mid, topic, payload) +-- --print("[XX] message for you, sir!") +-- local success, pd = coxpcall.pcall(json.decode, payload) +-- if success and pd then +-- if topic == TRAFFIC_CHANNEL then +-- if pd["command"] and pd["command"] == "traffic" then +-- h:handle_traffic_message(pd["result"], payload) +-- end +-- elseif handle_incidents and topic == INCIDENT_CHANNEL then +-- if pd["incident"] == nil then +-- print("Error: no incident data found in " .. payload) +-- print("Incident report ignored") +-- else +-- local incident = pd["incident"] +-- local ts = incident["incident_timestamp"] +-- for i=ts-5,ts+5 do +-- if handle_incident_report(incident, i) then break end +-- end +-- end +-- end +-- end +-- end print("[XX] connecting to " .. self.config.mqtt.host .. ":" .. self.config.mqtt.port) a,b,c,d = client:connect(self.config.mqtt.host, self.config.mqtt.port) @@ -867,6 +783,7 @@ function handler:init(args) self.client = client copas.addthread(self.mqtt_looper, self) + copas.addthread(self.mqtt_queue_looper, self) return true end diff --git a/web_ui/tcpdumper.lua b/web_ui/tcpdumper.lua new file mode 100644 index 00000000..953e6bfd --- /dev/null +++ b/web_ui/tcpdumper.lua @@ -0,0 +1,81 @@ +-- +-- Class for managing tcpdump processes +-- + +local mt_io = require 'minittp_io' +local copas = require 'copas' + +local _M = {} + +local tcpdumper = {} +tcpdumper.__index = tcpdumper + +function _M.create(device, response) + local td = {} + setmetatable(td, tcpdumper) + -- check device for format here or at caller? + -- should be aa:bb:cc:dd:ee:ff + td.running = true + td.bytes_sent = 0 + td.response = response + + local subp, err = mt_io.subprocess("tcpdump", {"-U", "-i", "br-lan", "-s", "1600", "-w", "-", "ether", "host", device}, 0, true, false, false) + if subp == nil then + return nil + end + td.subp = subp + + return td +end + +function tcpdumper:read_and_send(size) + line, err = self.subp:read_bytes(size) + if line == nil then + if err ~= "read timed out" then + print("Error reading from subprocess: " .. err) + sent, err = response:send_chunk("") + subp:kill() + subp:close() + return nil, err + end + else + sent, err = self.response:send_chunk(line) + if sent == nil then + sent, err = self.response:send_chunk("") + print("Error sending data: " .. err) + subp:kill() + subp:close() + return nil, err + else + -- do not count the \r\n that signals end of chunk + self.bytes_sent = self.bytes_sent + sent - 2 + return sent - 2 + end + end +end + +function tcpdumper:run() + while self.running do + self:read_and_send(1600) + copas.sleep(0.1) + end + self.subp:kill() + self.subp:close() + + -- End with an empty chunk, as per transfer-encoding: chunked protocol + sent, err = self.response:send_chunk("") + if sent == nil then + print("Error sending data: " .. err) + else + print("Sent " .. " bytes"); + end + -- just to make sure + self.running = false + return nil +end + +function tcpdumper:stop() + self.running = false +end + +return _M diff --git a/web_ui/ws_ext.lua b/web_ui/ws_ext.lua index 1bbac19d..941913fd 100644 --- a/web_ui/ws_ext.lua +++ b/web_ui/ws_ext.lua @@ -1,3 +1,4 @@ +require 'coxpcall' local socket = require'socket' local copas = require'copas' local tools = require'websocket.tools' @@ -7,6 +8,8 @@ local sync = require'websocket.sync' local tconcat = table.concat local tinsert = table.insert +local json = require('json') + local clients = {} local client = function(sock,protocol) @@ -16,20 +19,23 @@ local client = function(sock,protocol) self.state = 'OPEN' self.is_server = true + self.queued_messages = {} self.sock_send = function(self,...) + print("[XX] calling copas.send!") return copas.send(sock,...) end self.sock_receive = function(self,...) + print("[XX] calling copas.receive!") return copas.receive(sock,...) end self.sock_close = function(self) - sock:shutdown() + --sock:shutdown() sock:close() end - + self = sync.extend(self) self.on_close = function(self) @@ -44,7 +50,36 @@ local client = function(sock,protocol) end self:send(...) end - + + self.dsend = function(self,data,opcode) + if self.state ~= 'OPEN' then + return nil,false,1006,'wrong state' + end + local encoded = frame.encode(data,opcode or frame.TEXT,not self.is_server) + local n,err = sock:send(encoded) + if n ~= #encoded then + return nil,self:close(1006,err) + end + return true + end + + self.queue_message = function(self, message) + table.insert(self.queued_messages, message) + end + + self.has_queued_messages = function(self) + return table.getn(self.queued_messages) > 0 + end + + self.send_queued_messages = function(self) + while table.getn(self.queued_messages) > 0 do + local msg = json.encode(table.remove(self.queued_messages, 1)) + print("[XX] SENDING MESSAGE: " .. msg) + self:send(msg) + end + print("[XX] QUEUE SENT") + end + return self end @@ -66,7 +101,7 @@ local ws_server_create = function (opts) clients[true] = {} local self = {} - self.add_client = function(request, sock, main_handler) + self.add_client = function(request, raw_sock, copas_sock, main_handler) --local request = {} --repeat -- -- no timeout used, so should either return with line or err @@ -85,15 +120,27 @@ local ws_server_create = function (opts) print("[XX] REQUEST:") print(upgrade_request) print("[XX] END OF REQUEST") - --local status,response,protocol = handshake.accept_upgrade(upgrade_request,protocols) - local status,response,protocol = pcall(handshake.accept_upgrade, upgrade_request,protocols) + local response,protocol = handshake.accept_upgrade(upgrade_request,protocols) + local status = true + --local status,response,protocol = pcall(handshake.accept_upgrade, upgrade_request,protocols) if not status then print("[XX] error: Client request does not appear to be a websocket\n") --copas.send(sock,protocol) --sock:close() return nil, "Client request does not appear to be a websocket" end - copas.send(sock,response) + print("[XX] ABOUT TO SEND OK RESPONSE (status was " .. json.encode(status) .. ")") + if sock ~= nil then + print("[XX] HAVE SOCK") + else + print("[XX] SOCK NIL") + end + if resp == nil then + print("[XX] COULD NOT UPGRADE REQUEST") + print(protocol) + end + print("[XX] RESP: " .. json.encode(response)) + copas.send(copas_sock, response) local handler local new_client local protocol_index @@ -105,21 +152,22 @@ local ws_server_create = function (opts) protocol_index = true handler = opts.default else - sock:close() + copas_sock:close() if on_error then on_error('bad protocol, and no default set') end return nil, 'Unknown protocol and no default protocol set' end - new_client = client(sock,protocol_index) + new_client = client(copas_sock,protocol_index) clients[protocol_index][new_client] = true --handler(new_client) - print("[XX] ADD TO SERVER") + print("[XX] ADD TO SERVER1") main_handler:do_add_ws_c(new_client) - print("[XX] ADDED TO SERVER") + print("[XX] ADDED TO SERVER!") -- this is a dirty trick for preventing -- copas from automatically and prematurely closing -- the socket + --print("[XX] STARTING ETERNAL LOOP. SEE YOU AT THE END OF THE UNIVERSE") while new_client.state ~= 'CLOSED' do local dummy = { send = function() end, @@ -127,7 +175,8 @@ local ws_server_create = function (opts) } copas.send(dummy) end - return true + --print("[XX] LOOP ENDED ANYWAY") + return new_client end self.close = function(_,keep_clients)