Skip to content

Commit

Permalink
Add proxy config
Browse files Browse the repository at this point in the history
Signed-off-by: felix.gateru <[email protected]>
  • Loading branch information
felixgateru committed Oct 18, 2023
1 parent 57d2b2a commit 908e596
Show file tree
Hide file tree
Showing 126 changed files with 2,188 additions and 10,861 deletions.
41 changes: 24 additions & 17 deletions coap/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package api

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -65,20 +66,26 @@ func MakeCoAPHandler(svc coap.Service, l mflog.Logger) mux.HandlerFunc {
return handler
}

func sendResp(w mux.ResponseWriter, resp *message.Message) {
if err := w.Client().WriteMessage(resp); err != nil {
func sendResp(ctx context.Context, w mux.ResponseWriter, resp *message.Message) {
m := w.Conn().AcquireMessage(ctx)
m.SetCode(resp.Code)
m.SetBody(bytes.NewReader(resp.Payload))
m.SetToken(resp.Token)
for _, option := range resp.Options {
m.SetOptionBytes(option.ID, option.Value)
}
if err := w.Conn().WriteMessage(m); err != nil {
logger.Warn(fmt.Sprintf("Can't set response: %s", err))
}
}

func handler(w mux.ResponseWriter, m *mux.Message) {
resp := message.Message{
Code: codes.Content,
Token: m.Token,
Context: m.Context,
Token: m.Token(),
Options: make(message.Options, 0, 16),
}
defer sendResp(w, &resp)
defer sendResp(m.Context(), w, &resp)
msg, err := decodeMessage(m)
if err != nil {
logger.Warn(fmt.Sprintf("Error decoding message: %s", err))
Expand All @@ -91,12 +98,12 @@ func handler(w mux.ResponseWriter, m *mux.Message) {
resp.Code = codes.Unauthorized
return
}
switch m.Code {
switch m.Code() {
case codes.GET:
err = handleGet(m.Context, m, w.Client(), msg, key)
err = handleGet(m.Context(), m, w.Conn(), msg, key)
case codes.POST:
resp.Code = codes.Created
err = service.Publish(m.Context, key, msg)
err = nil
default:
err = errors.ErrNotFound
}
Expand All @@ -115,25 +122,25 @@ func handler(w mux.ResponseWriter, m *mux.Message) {
}
}

func handleGet(ctx context.Context, m *mux.Message, c mux.Client, msg *messaging.Message, key string) error {
func handleGet(ctx context.Context, m *mux.Message, c mux.Conn, msg *messaging.Message, key string) error {
var obs uint32
obs, err := m.Options.Observe()
obs, err := m.Observe()
if err != nil {
logger.Warn(fmt.Sprintf("Error reading observe option: %s", err))
return errBadOptions
}
if obs == startObserve {
c := coap.NewClient(c, m.Token, logger)
c := coap.NewClient(c, m.Token(), logger)
return service.Subscribe(ctx, key, msg.Channel, msg.Subtopic, c)
}
return service.Unsubscribe(ctx, key, msg.Channel, msg.Subtopic, m.Token.String())
return service.Unsubscribe(ctx, key, msg.Channel, msg.Subtopic, m.Token().String())
}

func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
if msg.Options == nil {
if msg.Options() == nil {
return &messaging.Message{}, errBadOptions
}
path, err := msg.Options.Path()
path, err := msg.Path()
if err != nil {
return &messaging.Message{}, err
}
Expand All @@ -155,7 +162,7 @@ func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
}

if msg.Body != nil {
buff, err := io.ReadAll(msg.Body)
buff, err := io.ReadAll(msg.Body())
if err != nil {
return ret, err
}
Expand All @@ -165,10 +172,10 @@ func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
}

func parseKey(msg *mux.Message) (string, error) {
if obs, _ := msg.Options.Observe(); obs != 0 && msg.Code == codes.GET {
if obs, _ := msg.Observe(); obs != 0 && msg.Code() == codes.GET {
return "", nil
}
authKey, err := msg.Options.GetString(message.URIQuery)
authKey, err := msg.Options().GetString(message.URIQuery)
if err != nil {
return "", err
}
Expand Down
31 changes: 14 additions & 17 deletions coap/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ type Client interface {
var ErrOption = errors.New("unable to set option")

type client struct {
client mux.Client
client mux.Conn
token message.Token
observe uint32
logger logger.Logger
}

// NewClient instantiates a new Observer.
func NewClient(c mux.Client, tkn message.Token, l logger.Logger) Client {
func NewClient(c mux.Conn, tkn message.Token, l logger.Logger) Client {
return &client{
client: c,
token: tkn,
Expand All @@ -57,13 +57,10 @@ func (c *client) Done() <-chan struct{} {
}

func (c *client) Cancel() error {
m := message.Message{
Code: codes.Content,
Token: c.token,
Context: context.Background(),
Options: make(message.Options, 0, 16),
}
if err := c.client.WriteMessage(&m); err != nil {
pm := c.client.AcquireMessage(context.Background())
pm.SetCode(codes.Content)
pm.SetToken(c.token)
if err := c.client.WriteMessage(pm); err != nil {
c.logger.Error(fmt.Sprintf("Error sending message: %s.", err))
}
return c.client.Close()
Expand All @@ -74,12 +71,10 @@ func (c *client) Token() string {
}

func (c *client) Handle(msg *messaging.Message) error {
m := message.Message{
Code: codes.Content,
Token: c.token,
Context: c.client.Context(),
Body: bytes.NewReader(msg.Payload),
}
pm := c.client.AcquireMessage(context.Background())
pm.SetCode(codes.Content)
pm.SetToken(c.token)
pm.SetBody(bytes.NewReader(msg.Payload))

atomic.AddUint32(&c.observe, 1)
var opts message.Options
Expand All @@ -103,6 +98,8 @@ func (c *client) Handle(msg *messaging.Message) error {
return fmt.Errorf("cannot set options to response: %w", err)
}

m.Options = opts
return c.client.WriteMessage(&m)
for _, option := range opts {
pm.SetOptionBytes(option.ID, option.Value)
}
return c.client.WriteMessage(pm)
}
Loading

0 comments on commit 908e596

Please sign in to comment.