PERFORCE change 143018 for review
Victor Hugo Bilouro
bilouro at FreeBSD.org
Fri Jun 6 10:33:20 UTC 2008
http://perforce.freebsd.org/chv.cgi?CH=143018
Change 143018 by bilouro at bilouro_tcptest on 2008/06/06 10:32:50
Script is working. (send syn, receive syn+ack, send ack, send fin)
uniitest applied
Some References to RFC
It still been a prototype version
todo: create
Affected files ...
.. //depot/projects/soc2008/bilouro_tcptest/TODO#4 edit
.. //depot/projects/soc2008/bilouro_tcptest/src/scripts/tests/cthreewayhandshake_nooptions.py#2 edit
Differences ...
==== //depot/projects/soc2008/bilouro_tcptest/TODO#4 (text+ko) ====
@@ -15,5 +15,13 @@
this.packet == that.packet will be possible
+Think about:
+ self.ethersrc = ethernet.ether_atob("00:1c:42:47:3f:cd")
+ It should be automatic
+ * may be extend Ethernet to receive ip as constructor arg
+ ethernet( ip )
+ * may be as a utility method
+ ethernet().configue(ip)
+
done:
-manually establish a tcp connection using pcs
==== //depot/projects/soc2008/bilouro_tcptest/src/scripts/tests/cthreewayhandshake_nooptions.py#2 (text+ko) ====
@@ -33,178 +33,317 @@
# Description: A simple test of three way handshake without any changes
#
-"""
-THIS IS A MIDNIGHT COMMIT, THIS CODE NOT EVEN HAD ITS SYNTAX TESTED
-"""
-
-import unittest
+import unittest
+from pcs.packets import ipv4
+from pcs.packets import tcp
+from pcs.packets import ethernet
+import pcs
class TestThreeWayHandshakeWithoutAnyOptions(unittest.TestCase):
"""This class tests the threeway handshake without any options"""
-
+
+
def setUp(self):
"""
As it's the first test, I will use this method as a simple TCB.
I'm planning use some object to handle that.
In this version, TCB is being initialized with test setup
- values.
+ values.
Tcpdump file format would a form to store setup values.
"""
import random
- #TCB
self.tcb = self
+
+ #constants
self.thisside = 0
self.thatside = 1
- #...
+ #...Can add other sides
-
-
- #opt 1
+ #opt 1 - this opt reduce the scalability, and bring more code.
#self.ipsrc = pcs.inet_atol("192.168.1.10")
#self.ipdst = pcs.inet_atol("192.168.1.20")
#opt 2
self.ip = { self.thisside : pcs.inet_atol("192.168.1.10") , \
- self.thatside : pcs.inet_atol("192.168.1.20")}
+ self.thatside : pcs.inet_atol("192.168.1.20")}
+
self.ipid = { self.thisside : random.randrange(1,(1<<16)-1) , \
- self.thatside : 0}
+ self.thatside : 0}
+
+
+ self.tcpport = { self.thisside : random.randrange(50000,60000) , \
+ self.thatside : 9}
+
+ self.tcpsequence = { self.thisside : random.randrange(1,(1<<32)-1) ,
+ self.thatside : 0}
+
+ # see TODO
+ self.ether = \
+ { self.thisside : ethernet.ether_atob("00:1c:42:47:3f:cd") , \
+ self.thatside : ethernet.ether_atob("00:1c:42:db:c5:22") }
+
+ # see TODO
+ self.output = { self.thisside : pcs.PcapConnector("ed0") ,\
+ self.thatside : pcs.PcapConnector("ed0") }
+ def testTestThreeWayHandshakeWithoutAnyOptions(self):
+ """active open - three way handshake"""
+ #RFC793-P31-p1
+ #THISSIDE
- self.tcpport = { self.thisside : random.randrange(50000,60000) , \
- self.thatside : 9}
- self.tcpsequence = { self.thisside : random.randrange(1,(1<<32)-1) ,
- self.thatside : 0}
+ #
+ #Sendind SYN
+ #
+ #THISSIDE
+ (ipsyn, tcpsyn) = createsyn(self, self.tcb, self.tcb.thisside, self.tcb.thatside)
+ createwritepacket(self, self.tcb, ipsyn, tcpsyn, self.tcb.thisside, \
+ self.tcb.thatside)
+
+ #THATHSIDE
+ #Receiving SYN (unfurtunately this time we are receiving from the same board)
+ (ipsynreceived, tcpsynreceived) = receive(self, self.tcb, self.tcb.thatside, \
+ self.tcb.thisside)
+
+ self.assertEqual(ipsyn, ipsynreceived)
+ self.assertEqual(tcpsyn, tcpsynreceived)
+ print tcpsynreceived
+
+
+ #
+ #Receiving SYN+ACK
+ #
+ #THISSIDE
+ #Receivinig SYN + ACK
+ (ipsynack, tcpsynack) = receive(self, self.tcb, self.tcb.thisside, self.tcb.thatside)
+
+ #test conformance of header fields (todo)
+
+
+ #RFC793-P24-p1
+ #A fundamental notion in the design is that every octet of data sent
+ #over a TCP connection has a sequence number.
+ #
+ self.assertNotEqual(tcpsynack.sequence, 0)
+ self.assertNotEqual(tcpsynack.sequence, None)
+
+ #RFC793-P27-p5~P28-p2
+ #As we are testing 3wayhandshake it must be SYN+ACK
+ #
+ #self.assertEqual(tcpsynack.syn,1)
+ self.failIf(tcpsynack.syn < 1)
+ self.failIf(tcpsynack.ack < 1)
+
+ #RFC793-P16-p2
+ #Acknowledgment Number, value of the next sequence number the sender of
+ #the segment is expecting to receive
+ #
+ self.assertEqual(tcpsynack.ack_number, self.tcb.tcpsequence[ self.tcb.thisside ])
+ #OK Its a SYN+ACK
+ self.tcb.tcpsequence[ self.tcb.thatside ] = tcpsynack.sequence + 1
+ print tcpsynack
+
- #tcb?VVVVVVVV
- #self.ethersrc = ethernet.ether_atob("00:1c:42:47:3f:cd")
- #It should be automatic, may be extend Ethernet to receive ip as
- #constructor arg
- #self.etherdst = ethernet.ether_atob("00:1c:42:db:c5:22") i
- #May be something like ether.configure(ip)
+ #
+ #Sending ACK
+ #
+ #THISSIDE
+ (ipack, tcpack) = createip(self, self.tcb, self.tcb.thisside, self.tcb.thatside)
+ createwritepacket(self, self.tcb, ipack, tcpack, self.tcb.thisside, \
+ self.tcb.thatside)
+ print tcpack
+
+ #THATHSIDE
+ #Receiving SYN (unfurtunately this time we are receiving from the same board)
+ #(ipackreceived, tcpackreceived) = receive(self, self.tcb, self.tcb.thatside, \
+ # self.tcb.thisside)
+ #pcs is assigning values different then 0 and 1 to tcpflags
+ #self.assertEqual(ipack, ipackreceived)
+ #self.assertEqual(tcpack, tcpackreceived)
+
+ #
+ #Sending FIN
+ #
+ #THISSIDE
+ (ipfin, tcpfin) = createip(self, self.tcb, self.tcb.thisside, self.tcb.thatside)
+
+ tcpfin.fin=1
+ self.tcb.tcpsequence[ self.tcb.thisside ] += 1
+
+ createwritepacket(self, self.tcb, ipfin, tcpfin, self.tcb.thisside, \
+ self.tcb.thatside)
+ print tcpfin
+
+
+ #
+ #Receiving ACK
+ #
+ #OK - this is enougth...
+ #reduce code redundance (fasttodo)
+ #
+ #Receiving FIN
+ #
+
+ #
+ #Sending ACK
+ #
- self.tcpether = \
- { self.thisside : ethernet.ether_atob("00:1c:42:47:3f:cd") , \
- self.thatside : ethernet.ether_atob("00:1c:42:db:c5:22") }
-
+def createsyn(self, tcb, from_, to):
+ """Create tcp syn flag expertise"""
+ (ip, tcp) = createip(self, tcb, from_, to)
- #control
- self.output = pcs.PcapConnector("ed0")
- ##
- # Braindump
- # self.output = { self.thisside : pcs.PcapConnector("ed0") ,\
- # self.thatside : pcs.PcapConnector(remoteip,"ed0") }
- # to be able make synchronous tests, will be wonderful remotely
- # connect via pcap. eg.
- # thatsideoutput = pcs.PcapConnector(remote,"ed0")
- #
- # TODO
+ #business
+ tcp.syn = 1
+ tcp.ack = 0
+ tcb.tcpsequence[ from_ ] += 1
+ return (ip, tcp)
- def testTestThreeWayHandshakeWithoutAnyOptions(self):
- """three way handshake"""
- #SYN
- (ipsyn, tcpsyn) = createsyn(self, tcb, tcb.thisside, tcb.thatside)
+
+def createip(self, tcb, from_, to):
+ """Create ip packet
+ tcp is also created here"""
+ ip1 = ipv4.ipv4()
- ether = createethernet(self, tcb, tcb.thisside, tcb.thatside)
- packet = pcs.Chain([ ether, ip, tcp ])
- out = output.write(packet.bytes, len(packet.bytes))
-
- #here can occurs (ack,syn|syn,ack|syn+ack) (FASTTODO)
- (ipack, tcpack) = receive(self, tcb, tcb.thisside, tcb.thatside)
-
-
- def createsyn(self, tcb, from_, to):
- """Create tcp syn flag expertise"""
+ ip1.version = 4
+ ip1.hlen = 5
+ ip1.tos = 0
+ ip1.id = tcb.ipid[ from_ ]
+ ip1.flags = 0
+ ip1.offset = 0
+ ip1.ttl = 64
+ ip1.protocol = pcs.IPPROTO_TCP
+ ip1.src = tcb.ip[ from_ ]
+ ip1.dst = tcb.ip[ to ]
+ ip1.length = len(ip1.bytes)
- (ip, tcp) = createip(self, tcb, from_, to)
- tcp.syn = 1
+ # tcp here
+ tcp1 = createtcp(self, tcb, ip1, from_, to)
- return (ip, tcp)
+ ip1.length = len(ip1.bytes) + len(tcp1.bytes)
+ #ip1.checksum = ip_cksum(ip1) #doind this at checkout(createwritepacket)
- def receive(self, tcb, from_, to):
- """receive packet for this socket
- This method must handle timmers"""
- (ip, tcp) = (None, None)
- while 1:
- reply = output.read()
- try:
- packet = ethernet.ethernet(reply)
- ip = packet.data
- tcp = replyip.data
- if (tcb.ip[ from_ ]==ip.dst and \
- tcb.ip[ to ]==ip.src and \
- tcb.tcpport[ from_ ]==tcp.dport and \
- tcb.tcpport[ to ]==tcp.sport):
- break
- except:
- pass
-
- return (ip, tcp)
+ return (ip1, tcp1)
+
+
+def createtcp(self, tcb, ip, from_, to):
+ """Create tcp packet"""
+ tcp1 = tcp.tcp()
+
+ tcp1.sport = tcb.tcpport[ from_ ]
+ tcp1.dport = tcb.tcpport[ to ]
+ tcp1.sequence = tcb.tcpsequence[ from_ ]
+ tcp1.ack_number = tcb.tcpsequence[ to ]
+ tcp1.offset = 5
+ tcp1.urgent = 0
+ tcp1.ack = 1
+ tcp1.push = 0
+ tcp1.reset = 0
+ tcp1.syn = 0
+ tcp1.fin = 0
+ tcp1.window = (1<<16)-1
+ tcp1.urg_point = 0
+ #tcp1.options
+
+ #tcp1.checksum = tcp_cksum(tcp1 , ip) #doind this at checkout(createwrite)
+
+ return tcp1
+
- def createethernet(self, tcb, from_, to):
- """Create ethernet header"""
- ether1 = ethernet.ethernet()
- #opt2
- ether1.src = tcb.tcpether[from_]
- ether1.dst = tcb.tcpether[to]
- ether1.type = 0x800
+def receive(self, tcb, from_, to):
+ """receive packet for this socket
+ This method must handle timmers"""
+ (ip, tcp) = (None, None)
+ while 1:
+ reply = tcb.output[ from_ ].read()
+ try:
+ packet = ethernet.ethernet(reply)
+ ip = packet.data
+ tcp = ip.data
+ if (tcb.ip[ from_ ]==ip.dst and \
+ tcb.ip[ to ]==ip.src and \
+ tcb.tcpport[ from_ ]==tcp.dport and \
+ tcb.tcpport[ to ]==tcp.sport):
+ break
+ except:
+ print "eeee"
+ pass
- return ether1
-
- def createip(self, tcb, from_, to):
- """Create ip packet
- tcp is also created here"""
- ip1 = ipv4.ipv4()
+ return (ip, tcp)
+
+
+def createwritepacket(self, tcb, ip, tcp, from_, to):
+ ether = createethernet(self, tcb, from_, to)
+
+ tcp.checksum = tcp_cksum(tcp , ip)
+
+ ip.length = len(ip.bytes) + len(tcp.bytes)
+ ip.checksum = ip_cksum(ip)
+
+ packet = pcs.Chain([ ether, ip, tcp ])
+ tcb.output[ from_ ].write(packet.bytes, len(packet.bytes))
+
+
+def createethernet(self, tcb, from_, to):
+ """Create ethernet header"""
+ ether1 = ethernet.ethernet()
+ ether1.src = tcb.ether[ from_ ]
+ ether1.dst = tcb.ether[ to ]
+ ether1.type = 0x800
+
+ return ether1
- ip1.version = 4
- ip1.hlen = 5
- ip1.tos = 0
- ip1.id = tcb.ipid[ from_ ]
- ip1.flags = 0
- ip1.offset = 0
- ip1.ttl = 64
- ip1.protocol = pcs.IPPROTO_TCP
- ip1.src = tcb.ip[ from_ ]
- ip1.dst = tcb.ip[ to ]
- # tcp here
- tcp = createtcp(self, tcb, ip1, from_, to)
-
- ip1.length = len(ip1.bytes) + len(tcp.bytes)
- ip1.checksum = ip_cksum(ip1)
-
- return (ip1, tcp)
+def tcp_cksum(self, ip, data = ""): #TODO: add this method to pcs tcp.py
+ """return tcpv4 checksum"""
+ import struct
+ total = 0
+
+ tmpip = ipv4.pseudoipv4()
+ tmpip.src = ip.src
+ tmpip.dst = ip.dst
+ tmpip.reserved = 0
+ tmpip.protocol = pcs.IPPROTO_TCP
+ tmpip.length = len(self.getbytes()) + len(data)
+ pkt = tmpip.getbytes() + self.getbytes() + data
+ if len(pkt) % 2 == 1:
+ pkt += "\0"
+ for i in range(len(pkt)/2):
+ total += (struct.unpack("!H", pkt[2*i:2*i+2])[0])
+ total = (total >> 16) + (total & 0xffff)
+ total += total >> 16
+ return ~total & 0xffff
+
-
- def createtcp(self, tcb, ip, from_, to):
- """Create tcp packet"""
- tcp1 = tcp.tcp()
+def ip_cksum(ip): #TODO: solve the self problem, may be adding another arg
+ """calculate the IPv4 checksum over a packet
- tcp1.sport = tcb.tcpport[ from_ ]
- tcp1.dport = tcb.tcpport[ to ]
- tcp1.sequence = tcb.tcpsequence[ from_ ]
- tcp1.ack_number = tcb.tcpsequence[ to ]
- tcp1.offset = 5
- tcp1.urgent = 0
- tcp1.ack = 0
- tcp1.push = 0
- tcp1.reset = 0
- tcp1.syn = 0
- tcp1.fin = 0
- tcp1.window = (1<<16)-1
- tcp1.urg_point = 0
- #tcp1.options
+ returns the calculated checksum
+ """
+ import struct
+ total = 0
+ packet = ipv4.ipv4(ip.bytes)
+ packet.checksum = 0
+ bytes = packet.bytes
- tcp1.checksum = tcp_cksum(tcp1 , ip)
+ if len(bytes) % 2 == 1:
+ bytes += "\0"
+
+ for i in range(len(bytes)/2):
+ total += (struct.unpack("!H", bytes[2*i:2*i+2])[0])
- return tcp
-
+ total = (total >> 16) + (total & 0xffff)
+ total += total >> 16
+
+ return ~total & 0xffff
+
+
if __name__ == '__main__':
unittest.main()
+
More information about the p4-projects
mailing list