Skip to content
This repository has been archived by the owner on Jul 2, 2020. It is now read-only.

Substitute headers #46

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/scala/com/twitter/diffy/ApiController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ApiController @Inject()(
JoinedEndpoint: JoinedEndpoint,
includeWeights: Boolean,
excludeNoise: Boolean
) =
) : Map[String, Map[String, Any]] =
Map(
"endpoint" -> Renderer.endpoint(JoinedEndpoint.endpoint),
"fields" ->
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/twitter/diffy/proxy/ClientService.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.twitter.diffy.proxy

import com.twitter.finagle.{Addr, Name, Resolver, Service}
import com.twitter.finagle.thrift.ThriftClientRequest
import com.twitter.finagle.{Addr, Name, Service}
import com.twitter.util.{Time, Var}
import org.jboss.netty.handler.codec.http.{HttpRequest, HttpResponse}

Expand Down Expand Up @@ -42,7 +42,7 @@ case class ThriftService(
}
}

private[this] def sizeChange(size: Int) {
private[this] def sizeChange(size: Int) : Unit = {
changeCount += 1
if (changeCount > 1) {
changedAt = Some(Time.now)
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ object DifferenceProxyModule extends TwitterModule {
}

object DifferenceProxy {
val AllLabels: Seq[String] = Seq("primary", "secondary", "candidate")

object NoResponseException extends Exception("No responses provided by diffy")
val NoResponseExceptionFuture = Future.exception(NoResponseException)
val log = Logger(classOf[DifferenceProxy])
}

trait DifferenceProxy {

import DifferenceProxy._

type Req
Expand Down
18 changes: 12 additions & 6 deletions src/main/scala/com/twitter/diffy/proxy/HttpDifferenceProxy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package com.twitter.diffy.proxy

import java.net.SocketAddress

import com.twitter.diffy.analysis.{DifferenceAnalyzer, JoinedDifferences, InMemoryDifferenceCollector}
import com.twitter.diffy.analysis.{DifferenceAnalyzer, InMemoryDifferenceCollector, JoinedDifferences}
import com.twitter.diffy.lifter.{HttpLifter, Message}
import com.twitter.diffy.proxy.DifferenceProxy.NoResponseException
import com.twitter.finagle.{Service, Http, Filter}
import com.twitter.finagle.http.{Status, Response, Method, Request}
import com.twitter.util.{Try, Future}
import org.jboss.netty.handler.codec.http.{HttpResponse, HttpRequest}
import com.twitter.diffy.proxy.http.filter.{CloneHttpRequestFilter, RefineHttpHeadersByLabelFilter}
import com.twitter.finagle.http.{Method, Request, Response, Status}
import com.twitter.finagle.{Filter, Http, Service}
import com.twitter.util.{Future, Try}
import org.jboss.netty.handler.codec.http.{HttpRequest, HttpResponse}

object HttpDifferenceProxy {

val okResponse = Future.value(Response(Status.Ok))

val noResponseExceptionFilter =
Expand All @@ -34,8 +36,12 @@ trait HttpDifferenceProxy extends DifferenceProxy {
override type Rep = HttpResponse
override type Srv = HttpService


override def serviceFactory(serverset: String, label: String) =
HttpService(Http.newClient(serverset, label).toService)
HttpService(
CloneHttpRequestFilter.apply.
andThen(RefineHttpHeadersByLabelFilter(label, DifferenceProxy.AllLabels)).
andThen(Http.newClient(serverset, label).toService))

override lazy val server =
Http.serve(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.twitter.diffy.proxy.http.filter

import com.twitter.finagle.Filter
import com.twitter.util.Future
import org.jboss.netty.handler.codec.http.{DefaultHttpRequest, HttpHeaders, HttpRequest, HttpResponse}

import scala.collection.JavaConverters._

object CloneHttpRequestFilter {

type Req = HttpRequest
type Res = HttpResponse

private def mkCloneRequest(reqIn:Req, filter: (Req => Future[Res])):Future[Res] = {
def copyHeader(headers:HttpHeaders)(k: String, v:String) : Unit = { headers.add(k, v)}

val reqOut: DefaultHttpRequest = new DefaultHttpRequest(reqIn.getProtocolVersion, reqIn.getMethod, reqIn.getUri)
reqOut.setChunked(reqIn.isChunked)
reqOut.setContent(reqIn.getContent)

val headers = for {entry <- reqIn.headers().asScala} yield (entry.getKey, entry.getValue)
headers.foreach((copyHeader(reqOut.headers)_).tupled)

filter(reqOut)
}

def apply(): Filter[Req, Res, Req, Res] = Filter.mk(mkCloneRequest)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.twitter.diffy.proxy.http.filter

import org.jboss.netty.handler.codec.http.HttpHeaders

sealed trait HeaderEffect {
def apply(headers:HttpHeaders) : Unit
}

case class Rewrite(oldName:String, newName:String) extends HeaderEffect {
def apply(headers:HttpHeaders): Unit ={
val temp = headers.get(oldName)
headers.remove(oldName)
headers.set(newName, temp)
}
}

case class Remove( name:String) extends HeaderEffect {
def apply(headers:HttpHeaders): Unit = {
headers.remove(name)
}
}

object HeaderEffect {
def rewrite(oldName: String, newName:String): HeaderEffect = {
if (newName.isEmpty){
Remove(oldName)
} else {
Rewrite(oldName,newName)
}
}
def remove(oldName:String):HeaderEffect = Remove(oldName)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.twitter.diffy.proxy.http.filter

import com.twitter.finagle.Filter
import org.jboss.netty.handler.codec.http.{HttpRequest, HttpResponse}

object RefineHttpHeadersByLabelFilter {

def rewriteRule(prefix:String):HeaderRule = {
val prefixLength = prefix.length
def go(name:String) = {
if (name.startsWith(prefix)) {
Some(HeaderEffect.rewrite(name, name.substring(prefixLength)))
} else None
}
go
}

def removeRule(prefix:String): HeaderRule = {
def go(name:String) = {
if (name.startsWith(prefix)) Some(HeaderEffect.remove(name))
else None
}
go
}

def apply(label: String, allLabels: Seq[String]): Filter[HttpRequest, HttpResponse, HttpRequest, HttpResponse] = {
val (inclusions, exclusion) = allLabels.partition(_.equalsIgnoreCase(label))
val rules =
inclusions.map(_ + "_").map(rewriteRule) ++
exclusion.map(_+ "_").map(removeRule)
RewriteHttpHeadersFilter(rules)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.twitter.diffy.proxy.http.filter

import com.twitter.finagle.Filter
import com.twitter.util.Future
import org.jboss.netty.handler.codec.http.{HttpHeaders, HttpResponse, HttpRequest}
import scala.collection.JavaConverters._

object RewriteHttpHeadersFilter {

def apply(rules:Seq[HeaderRule]) : Filter[HttpRequest, HttpResponse, HttpRequest, HttpResponse] = {
def filter(req: HttpRequest):HttpRequest = {
val headers: HttpHeaders = req.headers
val effects = for {
name <- headers.names().asScala
effect <- rules.flatMap(_(name)).headOption
} yield {
effect
}
effects.foreach(_ (headers))
req
}

def go(req: HttpRequest, next: (HttpRequest => Future[HttpResponse])): Future[HttpResponse] ={
next(filter(req))
}

Filter.mk(go)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.twitter.diffy.proxy.http

package object filter {
type HeaderRule = String => Option[HeaderEffect]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.twitter.diffy.proxy.http.filter;

import com.twitter.diffy.ParentSpec
import com.twitter.diffy.util.TwitterFutures
import com.twitter.finagle.http.{Request, Response}
import com.twitter.finagle.{Filter, Service}
import com.twitter.util.Future
import org.jboss.netty.handler.codec.http._
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class CloneHttpRequestFilterSpec extends ParentSpec with TwitterFutures{
describe("CloneHttpRequestFilter"){
val subject: Filter[HttpRequest, HttpResponse, HttpRequest, HttpResponse] = CloneHttpRequestFilter.apply()

def mutateHeader(req: HttpRequest): Future[HttpResponse] = {
req.headers().add("mutation", "test")
Future.apply(Response(req))
}

val mutateHeaderService =subject.andThen(Service.mk(mutateHeader))


describe("recieving a request with no headers"){
def request:Request={ Request(HttpVersion.HTTP_1_1, HttpMethod.GET, "/") }

it("must prevent services which mutate the output request header from affecting the input request"){
val input: Request = request
whenReady(mutateHeaderService(input)){ _ => input.headers().names() mustNot contain("mutation") }
}
it("must preserve http method"){
val input = request
whenReady(mutateHeaderService(input)){ _ => input.method mustBe HttpMethod.GET }
}
it("must preserve http version"){
val input = request
whenReady(mutateHeaderService(input)){_ => input.version mustBe HttpVersion.HTTP_1_1 }
}

it("must preserve content buffer"){
val input = request
val expected = request.content.array()
whenReady(mutateHeaderService(input)){_ => input.content.array() mustBe expected }
}
it("must preserve chunkiness"){
val input = request
whenReady(mutateHeaderService(input)){_ => input.isChunked() mustBe false}

}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.twitter.diffy.proxy.http.filter

import com.twitter.diffy.ParentSpec
import com.twitter.diffy.util.TwitterFutures
import com.twitter.finagle.http.{Request, Response}
import com.twitter.finagle.{Filter, Service}
import com.twitter.util.Future
import org.jboss.netty.handler.codec.http.{HttpHeaders, HttpRequest, HttpResponse}
import org.junit.runner.RunWith
import org.scalatest.FunSpec
import org.scalatest.junit.JUnitRunner

import scala.collection.JavaConverters._

trait HeaderTransformations extends ParentSpec with TwitterFutures { this: FunSpec =>

def returnHeaders(req: HttpRequest): Future[HttpResponse] = {
val response: Response = Response(req)
response.headers().add(req.headers())
Future.apply(response)
}

def anEchoServiceForHeaders(subject: Filter[HttpRequest, HttpResponse, HttpRequest, HttpResponse],
inputHeaders: Map[String,String],
expectedHeaders: Map[String,String]): Unit ={
val service = subject.andThen(Service.mk(returnHeaders))

it ("must match headers"){
val req1: HttpRequest = Request("/")
val addHeader = (k:String, v:String) => req1.headers().add(k, v)
inputHeaders.foreach( addHeader.tupled )

def headersToMap(httpHeaders:HttpHeaders): Map[String,String] = {
httpHeaders.iterator().asScala.map( e=>(e.getKey,e.getValue )).toMap
}
whenReady(service(req1).map( _.headers ).map(headersToMap))( _ mustBe expectedHeaders )
}
}

def aFilterThatTargetsLabel(label:String, labelEx1:String, labelEx2:String) : Unit ={
describe(s"given a $label filter"){
val subject: Filter[HttpRequest, HttpResponse, HttpRequest, HttpResponse] =
RefineHttpHeadersByLabelFilter(label, List(label, labelEx1, labelEx2))

describe("given empty headers"){
it must behave like anEchoServiceForHeaders(subject, Map.empty, Map.empty)
}
describe("given a set of headers"){
val headers=Map(("A", "B"), ("C-c", "D-d"))
it must behave like anEchoServiceForHeaders(subject, headers, headers)
}
describe(s"given an exact $label header"){
it must behave like anEchoServiceForHeaders(subject, Map( (s"$label", "X")), Map((s"$label", "X")))
}
describe(s"given a $label prefixed header"){
it must behave like anEchoServiceForHeaders(subject, Map( (s"${label}_X", "X")), Map(("X", "X")))
}
describe(s"given a $label prefixed header with no suffix"){
it must behave like anEchoServiceForHeaders(subject, Map( (s"${label}_", "X")), Map.empty)
}

describe(s"given a $labelEx1 prefixed header"){
it must behave like anEchoServiceForHeaders(subject, Map( (s"${labelEx1}_X", "X")), Map.empty)
}
describe(s"given a $labelEx2 prefixed header"){
it must behave like anEchoServiceForHeaders(subject, Map( (s"${labelEx2}_Y", "Y")), Map.empty)
}

}
}

}
@RunWith(classOf[JUnitRunner])
class RefineHttpHeadersByLabelSpec extends ParentSpec with TwitterFutures with HeaderTransformations {
describe("RefineHttpHeadersByLabelFilter"){
it must behave like aFilterThatTargetsLabel("primary", "candidate", "secondary")
it must behave like aFilterThatTargetsLabel("candidate", "primary", "secondary")
it must behave like aFilterThatTargetsLabel("secondary", "primary", "candidate")
}
}
21 changes: 21 additions & 0 deletions src/test/scala/com/twitter/diffy/util/TwitterFutures.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.twitter.diffy.util

import com.twitter.util.{Return, Throw}
import org.scalatest.concurrent.Futures

trait TwitterFutures extends Futures {

import scala.language.implicitConversions

implicit def convertTwitterFuture[T](twitterFuture: com.twitter.util.Future[T]): FutureConcept[T] =
new FutureConcept[T] {
override def eitherValue: Option[Either[Throwable, T]] = {
twitterFuture.poll.map {
case Return(o) => Right(o)
case Throw(e) => Left(e)
}
}
override def isCanceled: Boolean = false
override def isExpired: Boolean = false
}
}