PERFORCE change 167424 for review
Zachariah Riggle
zjriggl at FreeBSD.org
Mon Aug 17 06:50:58 UTC 2009
http://perforce.freebsd.org/chv.cgi?CH=167424
Change 167424 by zjriggl at zjriggl_tcpregression on 2009/08/17 06:50:01
Final commit for SoC. Everything should work after this. I'm gonna download it all into my FreeBSD VM and make sure the tests run.
Affected files ...
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/CHANGES#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/LICENSE#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/Makefile#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/README#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/pcap.pyx#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/pcap_ex.c#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/pcap_ex.h#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/setup.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/tests/__init__.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/tests/packetFilterTest.pcap#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/tests/test.pcap#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/tests/testPcap.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/pcs/pcap/testsniff.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/echoServer.py#6 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcs/pcap/CHANGES#4 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcs/pcap/README#4 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/StringField.py#4 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/__init__.py#10 edit
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/backup.tar#4 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/hwAddress.py#5 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/hwaddress.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/ipAddress.py#7 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/ipaddress.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/networkPort.py#6 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/networkport.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/payload.py#5 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/pseudoipv4.py#4 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/segmentBuffer.py#4 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/segmentbuffer.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/sequence.py#2 edit
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/sniffLocalhost.py#6 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpConstructor.py#7 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpFilter.py#8 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpconstructor.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpfilter.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpstatemachine.py#10 edit
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/test.html#6 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/test.py#2 delete
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/__init__.py#2 edit
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/echoclient.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/echoserver.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/logging.conf#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/segmentBufferTest.py#2 edit
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/tcpFilterTest.py#2 edit
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/test.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/testSequence.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/testStates.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/testTcpConstructor.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/testTcpHandshake.py#1 add
.. //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tests/testTcpStateMachine.py#1 add
Differences ...
==== //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/pcsextension/__init__.py#10 (text+ko) ====
@@ -4,6 +4,7 @@
import socket
import sys
import re
+import binascii
from pcs.packets.ipv4 import ipv4
from pcs.packets.tcp import tcp
from pcs.packets.tcpv6 import tcpv6
@@ -37,4 +38,3 @@
x = re.sub( octet, r"\1 ", x )
x = re.sub( twoBytes, r"\1\n", x )
return x
-
==== //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/sequence.py#2 (text+ko) ====
@@ -5,55 +5,69 @@
'''
from pcsextension import findTcpLayer
-def seq(x):
+def seq( x ):
# It is essential to remember that the actual sequence number space is
# finite, though very large. This space ranges from 0 to 2**32 - 1.
# Since the space is finite, all arithmetic dealing with sequence
# numbers must be performed modulo 2**32. This unsigned arithmetic
# preserves the relationship of sequence numbers as they cycle from
# 2**32 - 1 to 0 again.
- return x % (2**32)
+ return x % ( 2 ** 32 )
-class sequenced(str):
+class Sequenced( str ):
'''
This class exists to encapsulate sequenced items in the TCP stream. Each
byte (octet) is assigned a sequence number, as is the original SYN and final
- FIN.
+ FIN.
+
+ Raw data CAN be encapsulated in a Sequenced object, but doing so will use
+ additional memory and is not necessary. The Sequenced class should only
+ be used to denote special sequences that have either the SYN or FIN bits
+ set (each of which can have the ACK bits set).
+
+ For example, a data stream may look like the following...
+
+ Client
+ [SYN] datadatadatadata
+ [SYN\ACK]
+ Server
'''
syn = False
fin = False
ack = False
-
- def __new__(cls, data='', syn=False, fin=False, ack=True):
- # New has to be used, because the str type is immutable.
- if len(data) == 0:
- if syn:
- data += "[SYN]"
- if fin:
- data += "[FIN]"
- if ack:
- data += "[ACK]"
-
- obj = super( sequenced, cls).__new__( sequenced, data or "" )
+
+ # New has to be used, because the str type is immutable.
+ def __new__( cls, syn = False, fin = False, ack = False ):
+
+ # Don't overwrite any user-provided data.
+ data = ''
+ if syn and not ack:
+ data += "[SYN]"
+ elif fin and not ack:
+ data += "[FIN]"
+ elif syn and ack:
+ data += "[SYN\ACK]"
+ elif fin and ack:
+ data += "[FIN\ACK]"
+
+ # Check for invalid flag combos.
+ if ack and not ( syn or fin ):
+ raise Warning, "The 'ack' bit should not be set by itself. This is not " \
+ "the intended use of the class Sequenced."
+ if not ( syn or ack or fin ):
+ raise Warning, "Did not set any flags."
+
+ obj = super( Sequenced, cls ).__new__( Sequenced, data or "" )
+
+ # The below statements allow us to use True/False or 1/0 or any other
+ # expression that can be evaluated to a boolean.
obj.syn = True if syn else False
obj.ack = True if ack else False
obj.fin = True if fin else False
- return obj
-
-def makeSequenced(packet):
-
- t = findTcpLayer(packet)
-
- if t.syn or t.fin:
- return [sequenced(syn=t.syn, ack=t.ack, fin=t.fin)]
- elif t.data is not None:
- return [sequenced(byte,ack=t.ack) for byte in t.data]
-
- return []
-
+ return obj
# Special cases defined here for convenience...
-seq_syn = sequenced(syn=True,ack=False)
-seq_synack = sequenced(syn=True,ack=True)
-seq_fin = sequenced(fin=True,ack=False)
-seq_finack = sequenced(fin=True,ack=True)+seq_syn = Sequenced( syn = True, ack = False )
+seq_synack = Sequenced( syn = True, ack = True )
+seq_fin = Sequenced( fin = True, ack = False )
+seq_finack = Sequenced( fin = True, ack = True )
==== //depot/projects/soc2009/zjriggl_tcpregression/src/tcpregression/tcpstatemachine.py#10 (text+ko) ====
@@ -9,20 +9,20 @@
from pcsextension import findTcpLayer
from pcsextension.checksum import tcpChecksum
from pcsextension.decorators import prop, validateTypes, uint16, uint32, synchronized
-from pcsextension.hwAddress import HwAddress
-from pcsextension.ipAddress import IpAddress
-from pcsextension.networkPort import NetworkPort
-from segmentBuffer import segmentBuffer
-from sequence import seq, sequenced, makeSequenced, seq_fin, seq_finack, seq_syn, seq_synack
-from tcpConstructor import tcpConstructor
-from tcpFilter import tcpFilter
+from pcsextension.hwaddress import HwAddress
+from pcsextension.ipaddress import IpAddress
+from pcsextension.networkport import NetworkPort
+from segmentbuffer import SegmentBuffer
+from sequence import seq, Sequenced, seq_fin, seq_finack, seq_syn, seq_synack
+from tcpconstructor import tcpConstructor
+from tcpfilter import TcpFilter
from tcprecvdaemon import TcpRecvDaemon
from tcpsenddaemon import TcpSendDaemon
from tcpstates import CLOSE_WAIT, CLOSED, CLOSING, ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, \
LAST_ACK, LISTEN, SYN_RECEIVED, SYN_SENT, TIME_WAIT, TcpState, synchronizedStates, tcpStates
from time import time
from threading import RLock
-import pcap
+import pcs.pcap as pcap
import pcs
import testconfig
import time
@@ -116,12 +116,59 @@
3
'''
- __constructor = tcpConstructor()
- __connector = tcpFilter( testconfig.interface )
+ __constructor = None
+ __tcpFilter = None
__recvThread = None
__sendThread = None
lock = RLock()
+ sendLock = RLock()
+ recvLock = RLock()
+
+ def __init__( self, localhost = ( testconfig.localIP, testconfig.localPort ),
+ remotehost = ( testconfig.remoteIP, testconfig.remotePort ) ):
+ '''
+ Initialize the TCP State Machine. The information for the local host and the
+ remote host must be provided as a 2 - tuple, a string and an integer.
+
+ Example:
+ >>> t = TcpStateMachine( ( "127.0.0.1", 10000 ), ( "192.168.0.1", 80 ) )
+ '''
+ # Required objects.
+ # self.lock = RLock()
+ self.__constructor = tcpConstructor()
+ self.__tcpFilter = TcpFilter( testconfig.interface )
+ self._inboundSequences = SegmentBuffer()
+ self._outboundSequences = SegmentBuffer()
+
+ # Logger
+ self.log = tcplog( self )
+ self.log.debug( "(%s, %s)" % ( localhost, remotehost ) )
+
+ # Set IP and host
+ if validateTypes( {localhost: tuple} ) and len( localhost ) == 2:
+ self._localIP = IpAddress( localhost[0] )
+ self._localPort = NetworkPort( localhost[1] )
+ self.log.info( 'Local host: %s:%i' % ( self.localIP.getAscii(), self.localPort.getInteger() ) )
+
+ if validateTypes( {remotehost: tuple} ) and len( remotehost ) == 2:
+ self._remoteIP = IpAddress( remotehost[0] )
+ self._remotePort = NetworkPort( remotehost[1] )
+ self.log.info( 'Remote host: %s:%i' % ( self.remoteIP.getAscii(), self.remotePort.getInteger() ) )
+
+ # Set loopback mode
+ if self.remoteIP == self.localIP or \
+ testconfig.interface in self.loopbackInterfaces:
+
+ # If the user didn't specify a loopback interface, print a warning.
+ if testconfig.interface not in self.loopbackInterfaces:
+ self.log.warn( 'Automatically setting loopback mode to \'True\'' )
+
+ self.setLoopback( True )
+
+ # Reset the internal state.
+ self.reset()
+
# @uint32
@prop
def snd_nxt():
@@ -202,7 +249,7 @@
self.inboundSequences.base = irs
self.inboundSequences = []
del self.inboundSequences[:]
- # self.inboundSequences = segmentBuffer([], irs, self.inboundSequences.limit)
+ # self.inboundSequences = SegmentBuffer([], irs, self.inboundSequences.limit)
# self.inboundSequences.base = irs
# @uint32
@@ -249,20 +296,29 @@
_remoteEthernet = HwAddress( default = testconfig.remoteMAC )
@prop
- def localIP(): '''Local IP address.'''
- _localIP = IpAddress( default = testconfig.localIP )
+ def localIP():
+ '''Local IP address.'''
+ return {'fset': lambda self, x: self.localIP.set( x ) }
@prop
- def remoteIP(): '''Remote IP address.'''
- _remoteIP = IpAddress( default = testconfig.remoteIP )
+ def remoteIP():
+ '''Remote IP address.'''
+ return {'fset': lambda self, x: self.remoteIP.set( x ) }
@prop
- def localPort(): '''Local port.'''
- _localPort = NetworkPort( default = testconfig.localPort )
+ def localPort():
+ '''Local port.'''
+ return {'fset': lambda self, x: self.localPort.set( x ) }
@prop
- def remotePort(): '''Remote port.'''
- _remotePort = NetworkPort( default = testconfig.remotePort )
+ def remotePort():
+ '''Remote port.'''
+ return {'fset': lambda self, x: self.remotePort.set( x ) }
+
+ def setRemotePort( self, x ):
+ import traceback
+ traceback.print_stack()
+ self._remotePort = x
@prop
def userTimer(): ''' User timeout timer '''
@@ -274,21 +330,21 @@
# Interface is actually a shortcut to the connector's interface field,
# which is itself a property. Setting a TcpStateMachine's interface will
- # effectively trigger the tcpFilter object to switch interfaces to the
+ # effectively trigger the TcpFilter object to switch interfaces to the
# specified interface.
@prop
def interface():
'''Interface to use for sending/recving data'''
- return {'fget': lambda self: self.__connector.interface,
- 'fset': lambda self, x: setattr( self.__connector, 'interface', x )}
+ return {'fget': lambda self: self.__tcpFilter.interface,
+ 'fset': lambda self, x: setattr( self.__tcpFilter, 'interface', x )}
def getConnector( self ):
'''
Retrieves the connector used to send and receive packets.
Note: The name 'connector' is simply for consistency. The object that is
- returned is actually a tcpFilter object.
+ returned is actually a TcpFilter object.
'''
- return self.__connector
+ return self.__tcpFilter
def setLoopback( self, lb = True ):
'''
@@ -346,42 +402,12 @@
tcp.f_window: True,
tcp.f_urg_pointer: True }
- @prop
- def validate():
- '''
- Dictionary of fields to be validated. Non - valid packets are dropped, non - valid
- settings throw an error. Accepted Values:
- %s - Incoming packet TCP Checksum
- %s - Incoming packet TCP Sequence Number
- %s - Incoming packet TCP Ack number
- %s - TCP State Machine transition
- '''
- _validate = { tcp.f_checksum: True,
- tcp.f_sequence: True,
- tcp.f_ack_number: True,
- #tcp.f_sport: True,
- #tcp.f_dport: True,
- 'transition': True }
-
def generateISS( self ):
'''
Generates a new Initial Sequence Number (ISS).
'''
return seq( random.randint( 0, ( 1 << 32 ) - 1 ) )
-# def _updateSegmentBuffer(self, newBuffer, oldBuffer):
-# # If it's the same object, just return it.
-# if newBuffer is oldBuffer:
-# return segmentBuffer
-#
-# # If it is not of the correct type, make it into a segment buffer,
-# # preserving the current base and max.
-# if type(newBuffer) is not segmentBuffer:
-# return segmentBuffer(newBuffer, base=oldBuffer.base, max=oldBuffer.max)
-#
-# # If it is the correct type, but is just a new buffer, assign it.
-# return newBuffer
-
@prop
def outboundSequences():
'''
@@ -399,7 +425,6 @@
sequence number overflows 2**32.
'''
return {'fset': lambda self, x: self.outboundSequences.update( x ) }
- _outboundSequences = segmentBuffer()
@prop
def inboundSequences():
@@ -408,7 +433,6 @@
@see outboundSequences for more information.
'''
return { 'fset': lambda self, x: self.inboundSequences.update( x ) }
- _inboundSequences = segmentBuffer()
@prop
def retransmissionQ():
@@ -424,11 +448,24 @@
Note that this buffer will explicitly exclude all SYN and FIN sequences.
@see inboundSequences
'''
- return {'fget':
- lambda self:
- [octet for octet in self.inboundSequences[seq( self.recvBufferOffset + self.irs ):] \
- if type( octet ) != sequenced or not ( octet.syn or octet.ack ) ]
- }
+ return {'fget': lambda self: [octet for octet in self.inboundSequences ] }
+
+ # Old version didn't work, because Sequenced objects would go into inboundSequences.
+ # This would, in turn, cause a single byte to be received twice once the buffer
+ # was incremented, because the number of bytes that were incremented would not
+ # take into account the SYN or SYN/ACK at the beginning of the buffer that did not
+ # get returned by recvBuffer
+ #[octet for octet in self.inboundSequences[seq( self.recvBufferOffset + self.irs ):] \
+ # if type( octet ) != Sequenced or not ( octet.syn or octet.ack ) ]
+
+
+ def getRecvBuffer( self ):
+ print "%i bytes in inboundSequences" % len( self.inboundSequences )
+ print "IRS: %i Rcv Offset: %i" % ( self.irs, self.recvBufferOffset )
+ return [octet for octet in self.inboundSequences ]
+ # if type( octet ) != Sequenced or not ( octet.syn or octet.ack ) ]
+
+
@prop
def recvBufferOffset():
@@ -491,7 +528,6 @@
def __str__( self ):
'''
Prints out the annotated status.
-
'''
statusNames = ['local socket',
'foreign socket',
@@ -559,6 +595,8 @@
Close the socket connection. This is synonymous with the 'close' function used
by normal UNIX sockets.
'''
+ self.log.info( "Closing connection" )
+
#CLOSED STATE (i.e., TCB does not exist)
if self.state is CLOSED:
# If the user does not have access to such a connection, return
@@ -587,13 +625,13 @@
# then form a FIN segment and send it, and enter FIN-WAIT-1 state;
# otherwise queue for processing after entering ESTABLISHED state.
if self.snd_una == self.snd_nxt:
- self.outboundSequences += [seq_fin] #sequenced(fin=1)]
+ self.outboundSequences += [seq_fin] #Sequenced(fin=1)]
#ESTABLISHED STATE
elif self.state is ESTABLISHED:
# Queue this until all preceding SENDs have been segmentized, then
# form a FIN segment and send it. In any case, enter FIN-WAIT-1
# state.
- self.outboundSequences += [seq_fin]# sequenced(fin=1)]
+ self.outboundSequences += [seq_fin]# Sequenced(fin=1)]
self.state = FIN_WAIT_1
#FIN-WAIT-1 STATE
@@ -609,7 +647,7 @@
elif self.state is CLOSE_WAIT:
# Queue this request until all preceding SENDs have been
# segmentized; then send a FIN segment, enter CLOSING state.
- self.outboundSequences += [seq_fin] #sequenced(fin=1)]
+ self.outboundSequences += [seq_fin] #Sequenced(fin=1)]
#CLOSING STATE
#LAST-ACK STATE
@@ -658,7 +696,7 @@
# snd_una is automatically handled
# self.snd_una = self.iss
self.state = SYN_SENT
- self.outboundSequences = [seq_syn]# sequenced(syn=True)]
+ self.outboundSequences = [seq_syn]# Sequenced(syn=True)]
self.__sendThread.timeout.trigger()
# SNX_NXT is automatically incremented
@@ -721,11 +759,12 @@
del self.outboundSequences[:]
del self.inboundSequences[:]
+ self.recvBufferOffset = 0
# self.recvBufferOffset = 0
self.state = CLOSED
- @synchronized( lock )
+# @synchronized( sendLock )
def sendPacket( self, packet ):
'''
Inform the TCP State Machine about packets that have been transmitted.
@@ -746,7 +785,7 @@
# Check for a syn
if seg.syn:
self.iss = seg.sequence
- self.outboundSequences += [sequenced( syn = seg.syn, ack = seg.ack )]
+ self.outboundSequences += [Sequenced( syn = seg.syn, ack = seg.ack )]
if self.state is CLOSED or ( self.state is LISTEN and not seg.ack ):
self.state = SYN_SENT
@@ -755,17 +794,21 @@
# Check for other data
if seg.data is not None:
- self.outboundSequences += [sequenced( octet ) for octet in seg.data]
+ # It is unnecessary to use the Sequenced class on raw data. It
+ # provides no extra utility, and simply causes excess memory usage.
+ # self.outboundSequences += [Sequenced( octet ) for octet in seg.data]
+ self.outboundSequences += [octet for octet in seg.data]
# Check for a fin
if seg.fin:
- self.outboundSequences += [sequenced( fin = seg.fin, ack = seg.ack )]
+ self.outboundSequences += [Sequenced( fin = seg.fin, ack = seg.ack )]
if self.state == ESTABLISHED:
self.state = FIN_WAIT_1
self.__sendThread.timeout.trigger()
+# @synchronized( recvLock )
def recvRawTcp( self, block = True ):
'''
Receives a single packet off of the interface that is destined for
@@ -773,11 +816,12 @@
@param block: Set to True to block until a packet is available.
'''
- rv = self.__connector.readFilteredByTuple( self.localIP, self.localPort, block )
+ rv = self.__tcpFilter.read()
if rv is not None:
return findTcpLayer( rv )
return rv
+# @synchronized( sendLock )
def sendRawTcp( self, tcpLayer ):
'''
Packages the provided TCP packet and payload inside of the appropriate
@@ -788,17 +832,20 @@
'''
# self.log.debug( repr( tcpLayer ) )
+ self.log.debug( 'In sendRawTcp()' )
if not validateTypes( {tcpLayer:tcp.tcp} ):
+ self.log.error( "tcpLayer was incorrect type: %s" % type( tcpLayer ) )
return
+ # self.__socket.sendto(tcpLayer.chain().bytes, (self.remoteIP().getAscii(),self.remotePort().getAscii()))
- # self.__socket.sendto(tcpLayer.chain().bytes, (self.remoteIP().getAscii(),self.remotePort().getAscii()))
chain = self.__constructor.generateChain( tcpLayer )
+ self.log.debug( 'Built chain' )
self.log.pktsent( repr ( tcpLayer.chain() ) )
- self.__connector.write( chain.bytes )
+ self.__tcpFilter.write( chain.bytes )
pass
- @synchronized( lock )
+# @synchronized( sendLock )
def send( self, data, async = False ):
'''
Sends the specified data.
@@ -810,24 +857,28 @@
Optionally send asynchronously. Do not wait for acknowledgement.
'''
if not validateTypes( {data:str} ):
- return - 1
+ self.log.error( 'Can only send strings' )
+ return ( -1 )
if self.state == CLOSED:
self.log.error( 'connection does not exist' )
- return - 1
+ return ( -1 )
if self.state == LISTEN:
+ self.log.info( 'attempted send() on LISTENing connection.'\
+ ' performing active-open' )
self.open()
if self.state in ( SYN_RECEIVED, SYN_SENT ):
# Queue the data for send if we are not in the connected state.
+ self.log.debug( 'Added %i bytes to the outboundSequences queue' % len( data ) )
self.outboundSequences += [x for x in data]
if self.state in ( ESTABLISHED, CLOSE_WAIT ):
# Add the data to the outbound buffer.
firstSeq = self.snd_nxt
- self.outboundSequences += [sequenced( octet ) for octet in data]
+ self.outboundSequences += [octet for octet in data] # [Sequenced(octed) for octed in data]
lastSeq = self.snd_nxt - 1
# Break the data up into MSS-sized chunks
@@ -840,23 +891,29 @@
pkt = self.newPacket( {tcp.f_data: payload.payload( chunk ),
tcp.f_sequence: sequence } )
- self.sendPacket( pkt )
+ self.sendRawTcp( pkt )
sequence += len( chunk )
+ # If packet auto-processing has been disabled and the call is
+ # blocking, this could lead to a deadlock.
+ if not async and not self.processPacketsOnArrival:
+ self.log.warn( 'Performed blocking send with processPacketsOnArrival disabled.' \
+ ' This can lead to a deadlock situation because ACKs may not ' \
+ 'be received and processed.' )
+
# This is a blocking call, we must wait until all of the data has
# been acknowledged.
if not async:
timer( 5.0, lambda: self.snd_una > lastSeq ).wait()
- #timeout = timer(timeout=5)
- #while (not async) and (self.snd_una <= lastSeq) and not timeout.expired:
- # time.sleep(0.01)
-
return len( pkt.data )
elif self.state in ( FIN_WAIT_1, FIN_WAIT_2, CLOSING, LAST_ACK, TIME_WAIT ):
self.log.error( 'connection closing' )
return - 1
+
+ self.__sendThread.timeout.trigger()
+
def logGenerated( self, packet = None, fieldname = None ):
'''
Helper method to log generated field data.
@@ -982,7 +1039,7 @@
tcp.f_ack_number: self.rcv_nxt,
tcp.f_ack: 1}
- print "Sending ack: {seq: %i, ack: %i, ack#: %i}" % ( self.snd_nxt, 1, self.rcv_nxt )
+ self.log.debug( "Sending ack: {seq: %i, ack: %i, ack#: %i}" % ( self.snd_nxt, 1, self.rcv_nxt ) )
if fields is not None:
ackFields.update( fields )
@@ -1032,8 +1089,8 @@
return packet
- @synchronized( lock )
- def recv( self, numOctets = 4096, delay = 120 ):
+# @synchronized( recvLock )
+ def recv( self, numOctets = 4096, timeout = 120, fillBuffer = False ):
'''
Returns *up to* numOctets octets of data.
Returns the octets as a string.
@@ -1042,28 +1099,50 @@
@param numOctets:
Maximum number of octets to get. (Default 4096)
- @param delay:
+ @param timeout:
Maximum amount of time, in seconds, to wait for data to be available.
(Default 2 minutes)
+ @param fillBuffer:
+ Attempt to fill the buffer within the timeout, instead of returning
+ immediately when data is available. Return when the timeout expires
+ or when the numOctets octest have been read.
@return:
Returns the octets as a string, or None if no octets were available
before the operation timed out.
'''
- t = timer( delay )
- while len( self.recvBuffer ) == 0 and not t.expired:
- time.sleep( 0.05 )
+ t = timer( timeout )
+ octets = []
+
+ # Keep looping until [1] we have received data or [2] we have filled the
+ # buffer, with the condition that looping will stop *immediately* when
+ # the timer expires.
+ while ( ( len( octets ) == 0 and len( octets ) < numOctets ) or \
+ ( fillBuffer and len( octets ) < numOctets ) ) and \
+ not t.expired:
+
+ # Are there any bytes? Append them to octets.
+ if len( self.recvBuffer ) > 0:
+ # self.log.debug( 'Recv buffer has %i octets' % len( self.recvBuffer ) )
+ # What is our length now? We need to know this for below.
+ lengthBefore = len( octets )
+
+ # Append the octets
+ octets += self.recvBuffer[:numOctets - len( octets )]
- if len( self.recvBuffer ) > 0:
- octets = self.recvBuffer[:numOctets]
- self.recvBufferOffset += len( octets )
+ # Increment recvBufferOffset by the number of octets received
+ # during just this one pass.
+ self.recvBufferOffset += ( len( octets ) - lengthBefore )
+ else:
+ time.sleep( 0.01 )
+ if len( octets ) > 0:
return ''.join( octets )
- else:
- return None
+
+ return None
- @synchronized( lock )
+# @synchronized( recvLock )
def recvPacket( self, packet = None, timeout = None ):
'''
Get the next packet that has been received. Packets are guaranteed
@@ -1080,7 +1159,7 @@
self.log.error( "Use segmentArrives(packet)" )
raise DeprecationWarning
- @synchronized( lock )
+# @synchronized( recvLock )
def segmentArrives( self, packet = None ):
'''
Inform the TCP State Machine about packets that have been received.
@@ -1092,7 +1171,7 @@
# Make sure that there is TCP data in the packet.
seg = findTcpLayer( packet )
- self.log.info( "Segment arrived!" )
+ self.log.debug( "Segment arrived" )
if seg is None:
self.log.warn( 'Could not find TCP layer in packet' )
@@ -1164,18 +1243,18 @@
# <SEQ=SEG.ACK><CTL=RST>
pass # We don't check security/compartment
self.debugWithState( "Received SYN" )
+
# Set RCV.NXT to SEG.SEQ+1, IRS is set to SEG.SEQ and any other
# control or text should be queued for processing later. ISS
# should be selected and a SYN segment sent of the form:
# <SEQ=ISS><ACK=RCV.NXT><CTL=SYN,ACK>
self.irs = seg.sequence
- # self.rcv_nxt = seg.sequence + 1 # Done automatically by property
self.iss = self.generateISS()
- # self.snd_nxt = self.iss # Done automatically by property
- self.inboundSequences += [sequenced( syn = seg.syn, ack = seg.ack, fin = seg.fin )] # sequenced(syn=1)]
+ self.inboundSequences += 1
+ # self.inboundSequences += [Sequenced( syn = seg.syn, ack = seg.ack, fin = seg.fin )]
# pkt = self.newPacket( {tcp.f_sequence:self.iss, tcp.f_ack_number: self.rcv_nxt, tcp.f_ack:1, tcp.f_syn: 1} )
- self.outboundSequences += [seq_synack]#sequenced(syn=1)]
+ self.outboundSequences += [seq_synack]#Sequenced(syn=1)]
pkt = self.newPacket( {tcp.f_syn:1, tcp.f_ack:1, tcp.f_ack_number: seg.sequence} )
self.sendPacket( pkt )
@@ -1267,7 +1346,8 @@
# SEG.SEQ.
# self.rcv_nxt = seg.sequence + 1 # Done automagically
self.irs = seg.sequence
- self.inboundSequences += [sequenced( syn = seg.syn, ack = seg.ack, fin = seg.fin )] # [sequenced(syn=1)]
+ self.inboundSequences += 1
+ # self.inboundSequences += [Sequenced( syn = seg.syn, ack = seg.ack, fin = seg.fin )] # [Sequenced(syn=1)]
# SND.UNA should be advanced to equal SEG.ACK (if there
# is an ACK), and any segments on the retransmission queue which
@@ -1363,6 +1443,7 @@
if seg.data is not None and self.rcv_wnd == 0:
acceptable = False # for good measure
+
if seg.data is not None and self.rcv_wnd > 0 and \
( ( self.rcv_nxt <= seg.sequence < ( self.rcv_nxt + self.rcv_wnd ) ) or
( self.rcv_nxt <= ( seg.sequence + len( seg.data ) - 1 ) < ( self.rcv_nxt + self.rcv_wnd ) ) ):
@@ -1374,6 +1455,7 @@
#
# <SEQ=SND.NXT><ACK=RCV.NXT><CTL=ACK>
if not acceptable:
+ self.log.debug( "Ack is unacceptable!" )
if seg.reset:
return None
@@ -1382,7 +1464,8 @@
# After sending the acknowledgment, drop the unacceptable segment
# and return.
return None
-
+ else:
+ self.log.debug( "Processing acceptable ACK" )
# In the following it is assumed that the segment is the idealized
# segment that begins at RCV.NXT and does not exceed the window.
# One could tailor actual segments to fit this assumption by
@@ -1400,6 +1483,7 @@
# came from the LISTEN state), then return this connection to
# LISTEN state and return. The user need not be informed.
if self._lastState == LISTEN:
+ self.log.debug( "Received RST, restoring connection to %s state." % LISTEN )
self.state = LISTEN
return seg
@@ -1416,6 +1500,7 @@
# And in the active OPEN case, enter the CLOSED state and delete the TCB,
# and return.
if self._lastState == SYN_SENT:
+ self.log.debug( "Received RST packet, resetting connection state" )
self.reset()
return seg
@@ -1429,6 +1514,7 @@
# flushed. Users should also receive an unsolicited general
# "connection reset" signal. Enter the CLOSED state, delete the
# TCB, and return.
+ self.log.debug( "Received RST packet, resetting connection state" )
self.reset()
return seg
@@ -1438,6 +1524,7 @@
if self.state in ( CLOSING, LAST_ACK, TIME_WAIT ):
# If the RST bit is set then, enter the CLOSED state, delete the
# TCB, and return.
+ self.log.debug( "Received RST packet, resetting connection state" )
self.reset()
return seg
@@ -1465,6 +1552,7 @@
# If the SYN is not in the window this step would not be reached
# and an ack would have been sent in the first step (sequence
# number check).
+ self.log.debug( "Received out-of-place SYN, sending RST and resetting connection state" )
self.sendRst( seg )
self.reset()
return seg
@@ -1472,6 +1560,7 @@
# fifth check the ACK field,
# if the ACK bit is off drop the segment and return
if not seg.ack:
+ self.log.debug( "ACK bit was not set, dropping segment" )
return None
# if the ACK bit is on
@@ -1489,6 +1578,7 @@
# <SEQ=SEG.ACK><CTL=RST>
# and send it.
if not acceptable:
+ self.log.debug( "ACK is not acceptable, sending RST" )
self.sendRst( seg )
# TODO TODO TODO
# Might have to return here. RFC doesn't say to.
@@ -1511,12 +1601,14 @@
# If the ACK is a duplicate (SEG.ACK < SND.UNA), it can be ignored.
elif seg.ack_number < self.snd_una:
+ self.log.debug( 'Received duplicate ack, ignoring' )
pass
# If the ACK acks
# something not yet sent (SEG.ACK > SND.NXT) then send an ACK,
# drop the segment, and return.
elif seg.ack_number > self.snd_nxt:
+ self.log.debug( 'Received ACK for something not yet sent, sending ACK and dropping the segment' )
self.sendAck()
return None
@@ -1600,16 +1692,19 @@
# the 2 MSL timeout.
if seg.fin:
+ self.log.debug( 'Acknowledging remote FIN with ACK' )
self.sendAck()
- # TODO TODO TODO
- # Implement TIme-Wait timeout
+ self.log.debug( 'Starting 2msl %s timeout' % TIME_WAIT )
+ self.restartTimeWaitTimeout( 2 * self.msl )
+
# sixth, check the URG bit
pass # Not supported
# seventh, process the segment text
if seg.data is not None:
+ self.log.debug( 'Segment contains data' )
# ESTABLISHED STATE
# FIN-WAIT-1 STATE
# FIN-WAIT-2 STATE
@@ -1618,11 +1713,17 @@
# Once in the ESTABLISHED state, it is possible to deliver segment
# text to user RECEIVE buffers. Text from segments can be moved
# into buffers until either the buffer is full or the segment is
- # empty.
- bytes = seg.data.chain().bytes
+ # empty.
+ # byteArray = [byte for byte in bytes[]]
+
+ if seg.sequence == self.rcv_nxt:
+ self.inboundSequences += [byte for byte in seg.data.chain().bytes]
+ #self.log.debug( 'Setting inboundSequences[%i:%i] (%s) to recvd bytes' %
+ # ( seg.sequence, seg.sequence + len( byteArray ), self.inboundSequences ) )
- self.inboundSequences[seg.sequence:seg.sequence + len( bytes )] = \
- [sequenced( byte ) for byte in bytes]
+ # self.inboundSequences[seg.sequence:( seg.sequence + len( byteArray ) )] = byteArray
+ self.log.debug( 'Length of inboundSequences is now %i' % len( self.inboundSequences ) )
+ # [Sequenced( byte ) for byte in bytes] # Don't really need to use the Sequenced() for raw data.
#If the segment empties and carries an PUSH flag, then
#the user is informed, when the buffer is returned, that a PUSH
@@ -1675,7 +1776,8 @@
# user.
self.debugWithState( "connection closing" )
- self.inboundSequences += [sequenced( fin = seg.fin, syn = seg.syn, ack = seg.ack )]
+ self.inboundSequences += 1
+ # self.inboundSequences += [Sequenced( fin = seg.fin, syn = seg.syn, ack = seg.ack )]
# self.rcv_nxt = seg.sequence + 1 # Done automatically by above
self.debugWithState( "sending ACK of FIN" )
@@ -1725,8 +1827,11 @@
# And return
return seg
- self.log.error( 'This point should never be reached!' )
- return None
+ # If we get here, the packet was just an ACK packet that may or
+ # may not have had data. Otherwise, something interesting happened.
+ if not seg.ack or ( seg.syn or seg.fin or seg.reset ):
+ self.log.error( "Should never get here. Something weird happened." )
+ return seg
def turnOffTimers( self ):
'''
@@ -1740,72 +1845,60 @@
self.stopTimeWaitTimeout()
def stopUserTimeout( self ):
+ '''
+ Stops the User Timeout.
+ '''
if self.userTimer is not None:
self.userTimer.cancel()
self.userTimer = None
def restartUserTimeout( self, timeout = 10.0 ):
+ '''
+ Stops, then starts the User Timeout.
+ '''
self.stopUserTimeout()
self.userTimer = threading.Timer( timeout, TcpStateMachine.setState, args = ( self, CLOSED ) )
self.userTimer.start()
def stopTimeWaitTimeout( self ):
+ '''
+ Stops the TIME WAIT timeout.
+ '''
if self.timeWaitTimer is not None:
self.timeWaitTimer.cancel()
self.timeWaitTimer = None
def restartTimeWaitTimeout( self, timeout = 10.0 ):
+ '''
+ Stops, then starts the TIME WAIT timeout.
+ '''
self.stopTimeWaitTimeout()
self.timeWaitTimer = threading.Timer( timeout, TcpStateMachine.setState, args = ( self, CLOSED ) )
self.timeWaitTimer.start()
def forceResendUnackedData( self ):
- self.resetRetransmitTimer()
+ '''
+ Forces the Send thread to stop waiting and immediately send any perform
+ a send of pending data.
+ '''
+ self.__sendThread.timeout.trigger()
def resetRetransmitTimer( self ):
+ '''
+ Resets the retransmit timer.
+ '''
self.__sendThread.timeout.reset()
def debugWithState( self, x ):
+ '''
+ Prints a DEBUG statement that includes the current TCP state.
+ '''
self.log.debug( "[%s] %s" % ( self.state, x ) )
- def __init__( self, localhost = ( testconfig.localIP, testconfig.localPort ), remotehost = ( testconfig.remoteIP, testconfig.remotePort ) ):
+ def unackedFins( self ):
'''
- Initialize the TCP State Machine. The information for the local host and the
- remote host must be provided as a 2 - tuple, a string and an integer.
-
- Example:
- >>> t = TcpStateMachine( ( "127.0.0.1", 10000 ), ( "192.168.0.1", 80 ) )
+ Returns a list of all FIN sequences in the outbound queue that have
>>> TRUNCATED FOR MAIL (1000 lines) <<<
More information about the p4-projects
mailing list