net.vattp.data
Class DataPath

java.lang.Object
  |
  +--net.vattp.data.DataPath
All Implemented Interfaces:
MsgHandler, TickReactor

class DataPath
extends Object
implements MsgHandler, TickReactor


Field Summary
private  long bytesReceived
          The total bytes of messages received over this DataPath
private  long bytesSent
          The total bytes of messages sent over this DataPath
private  long lastNetActivity
          The time we last received a part of an inbound message or sent a part of an outbound message.
private  int maxReceivedMessageSize
          The size of the largest message received over this DataPath
private  int maxSentMessageSize
          The size of the largest message sent over this DataPath
private  long messagesReceived
          The total number of messages received over this DataPath
private  long messagesSent
          The total number of messages sent over this DataPath
private  Clock myClock
          The Clock object we are using to timeout messages
private  VatTPMgr myConnMgr
          The VatTPMgr we report the remote identity to for an incoming connection.
private  VatTPConnection myDataConnection
          The VatTPConnection we report status changes to.
private  ConditionLock myEmbargoLock
          for debugging purposes
private  KeyPair myIdentityKeys
          The key pair which defines the identity of this vat
private  boolean myIsIncoming
          Whether we are initiating the connection or responding to a remotely initiated connection.
private  String myLocalFlattenedSearchPath
          The semicolon separated search path for this vat
private  NetAddr myLocalNetAddr
          The InetAddress of the local end, or null if no connection has been made
private  String myLocalVatID
          The local vatID.
private  MsgHandler[] myMsgHandlers
          Array of MsgHandlers indexed by message type
private  AuthSecrets myProtocolParms
          Values for autherization parameters and protocol versions
private  RecvThread myRecvThread
          The RecvThread receiving messages for this VatTPConnection
private  String myRemoteAddr
          The location the current Send/Recv Threads are (to) (have been) speaking to.
private  NetAddr myRemoteNetAddr
          The InetAddress of the remote end, or null if no connection has been made
private  int myRemotePortNumber
          The port number we are trying to connect to, or 0 for an inbound DataPath.
private  String myRemoteVatID
          The VatID of the other end of the connectiion.
private  String mySavedFlattenedSearchPath
          The search path the other end provided on an incoming connection.
private  SendThread mySendThread
          The SendThread we use for output.
private  StartUpProtocol myStartUpProtocol
          While connecting, the StartUpProtocol object.
private  Vat myVat
          The Vat to have the SendThread and RecvThread synchronize on when calling methods in this object.
private  SynchQueue myWriter
          SynchQueue for queueing outbound messages.
private static byte[] thePingMsg
          The ping message to send when a when no messages have been received within .
private static byte[] thePongMsg
          The pong message to send when a ping message is received.
(package private) static Object theShutDownToken
          enqueued to effectively close the SynchQueue
private  long timePingSent
          The time we last sent a ping, or zero if we have received a message which canceled the ping timeout.
 
Constructor Summary
(package private) DataPath(VatTPMgr connMgr, Socket tcpConnection, KeyPair identityKeys, String localVatID, Vat vat, String localFlattenedSearchPath, VatLocationLookup vls)
          Make a new DataPath for an incoming connection
(package private) DataPath(VatTPMgr connMgr, VatTPConnection connection, String remoteVatID, String remoteAddr, Hashtable addressesTried, KeyPair identityKeys, String localVatID, Vat vat, byte[] outgoingSuspendID, AuthSecrets protocolParms, String localFlattenedSearchPath)
          Make a new DataPath
 
Method Summary
(package private)  void acceptReceiver(RecvThread receiver, NetAddr remoteNetAddr, NetAddr localNetAddr)
          Accept the RecvThread from the SendThread when it has been created.
(package private)  void cantResume(String reason)
          Handle the case where the resume ID's don't match.
private  SynchQueue commonConstructionSetup(KeyPair identityKeys, String localVatID, byte[] suspendID, Vat vat, String localFlattenedSearchPath, VatLocationLookup vls)
          Do the common setup operations for the two constructor methods.
(package private)  String connectConnection(VatTPConnection connection, AuthSecrets protocolParms)
          Connect this DataPath to a VatTPConnection.
 void connectionDead(VatTPConnection connection, Throwable reason)
          Receive notification that the connection is dead.
(package private)  void connectToSelf()
          Notify the connectToVatAt caller that he has tried to connect to self
(package private)  void duplicatePath(String reason, boolean isIdentified)
          Duplicate crossed connection DataPath exiting.
(package private)  void enqueue(Object message)
          Enqueue a message for output.
(package private)  void extendSearchPath(ConstList addresses)
          Add an address to the list of addresses to try.
(package private)  ConditionLock getEmbargoLock()
           
(package private)  String getRemoteAddress()
          Return the remote address we are connected to.
(package private)  int getStartupState()
          Get the startup state of this DataPath.
(package private)  int identifyIncoming(String localVatID, String remoteVatID, String remoteSearchPath)
          Identify the remote vat's vatID for an incoming connection.
(package private)  Object identifyOutgoing(String localVatID, String remoteVatID, String remoteFlattenedSearchPath)
          Identify the remote vat's vatID for an outgoing "to WHOEVER" connection.
(package private)  boolean isChoiceIncoming()
          Decide whether to keep incoming connection or outgoing when other end asks us to chose on an outgoing connection.
(package private)  void newIncomingMsg(byte[] message)
          Handle a new incoming message from the remote end.
(package private)  void noticeProblem(Throwable problem)
          Notice problem on the connection.
 void processMessage(byte[] message, VatTPConnection connection)
          Process an incoming message from the VatTPConnection.
(package private)  void recordSendProgress()
          Record that sending data has made progress for ping/pong logic.
(package private)  void registerMsgHandler(int msgType, MsgHandler handler)
          Register a MsgHandler for a particular message type.
(package private)  boolean resumeConnection(byte[] suspendID)
          Check if a connection should be resumed.
 void run(long tick)
          Called by Clocks on their targets after each tick.
(package private)  void sendFinished(int count, int length, StreamMessage msg)
          sendFinished called when a message has been completely passed to TCP
(package private)  void setAuthorizationSecrets(byte[] dhSecret, byte[] sendSequence, byte[] recvSequence)
          Set the initial sequences to be used with a connection.
(package private)  void shutDownFinished(byte[] sendSequence, Throwable reason)
          shutDownFinished called when all queued messages have been sent and the socket closed.
(package private)  void shutDownPath()
          Causes a shutdown of the path.
(package private)  void startupSuccessful(String eMsgProtocolVersion, String protocolSuite)
          Called by StartUpProtocol when a connection to a remote vat has been successfully made.
(package private)  void stopStartUpProtocol()
          Stop the start up protocol on this DataPath.
 String toString()
          Describe this path object.
(package private)  void tryNext(String reason)
          Stop TCP to this address and try the next on on the list.
(package private)  void unRegisterMsgHandler(int msgType, MsgHandler handler)
          Remove the registration for a handler for a particular message type.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

theShutDownToken

static final Object theShutDownToken
enqueued to effectively close the SynchQueue


thePongMsg

private static final byte[] thePongMsg
The pong message to send when a ping message is received. It is constant, so we only need one of them.


thePingMsg

private static final byte[] thePingMsg
The ping message to send when a when no messages have been received within .


myDataConnection

private VatTPConnection myDataConnection
The VatTPConnection we report status changes to. It is connected late in the startup protocol for an incoming connection


myConnMgr

private final VatTPMgr myConnMgr
The VatTPMgr we report the remote identity to for an incoming connection. Null for outgoing connections unless they are connectToVatAt connections.


myMsgHandlers

private MsgHandler[] myMsgHandlers
Array of MsgHandlers indexed by message type


myWriter

private SynchQueue myWriter
SynchQueue for queueing outbound messages. This is initialized during construction, and set to null in shutDownPath.


myRecvThread

private RecvThread myRecvThread
The RecvThread receiving messages for this VatTPConnection


mySendThread

private final SendThread mySendThread
The SendThread we use for output. We keep this reference so we can do a mySendThread.stop() if the ping timeout pops.


lastNetActivity

private long lastNetActivity
The time we last received a part of an inbound message or sent a part of an outbound message. Used to ensure that the connection is still connected, timeout the start up protocol etc. Initialized to object creation time to avoid invalid timeouts.


timePingSent

private long timePingSent
The time we last sent a ping, or zero if we have received a message which canceled the ping timeout.


myClock

private Clock myClock
The Clock object we are using to timeout messages


myRemoteAddr

private String myRemoteAddr
The location the current Send/Recv Threads are (to) (have been) speaking to.


myRemoteNetAddr

private NetAddr myRemoteNetAddr
The InetAddress of the remote end, or null if no connection has been made


myLocalNetAddr

private NetAddr myLocalNetAddr
The InetAddress of the local end, or null if no connection has been made


myRemotePortNumber

private int myRemotePortNumber
The port number we are trying to connect to, or 0 for an inbound DataPath. This port will be used to modify the search path after a successful connection, so when the connection is resumed, the first address tried will be the IP:port we successfully connected to.


myRemoteVatID

private String myRemoteVatID
The VatID of the other end of the connectiion. May be null.


myVat

private Vat myVat
The Vat to have the SendThread and RecvThread synchronize on when calling methods in this object.


myIdentityKeys

private KeyPair myIdentityKeys
The key pair which defines the identity of this vat


myLocalVatID

private String myLocalVatID
The local vatID. The hash of the local vat's public key


myLocalFlattenedSearchPath

private String myLocalFlattenedSearchPath
The semicolon separated search path for this vat


mySavedFlattenedSearchPath

private String mySavedFlattenedSearchPath
The search path the other end provided on an incoming connection. We save this and give it to the VatTPConnection at the end of the startup protocol so we have a search path to use to resume the connection after it has been suspended.


myStartUpProtocol

private StartUpProtocol myStartUpProtocol
While connecting, the StartUpProtocol object. Otherwise null


myIsIncoming

private final boolean myIsIncoming
Whether we are initiating the connection or responding to a remotely initiated connection.


myProtocolParms

private AuthSecrets myProtocolParms
Values for autherization parameters and protocol versions


bytesReceived

private long bytesReceived
The total bytes of messages received over this DataPath


messagesReceived

private long messagesReceived
The total number of messages received over this DataPath


maxReceivedMessageSize

private int maxReceivedMessageSize
The size of the largest message received over this DataPath


bytesSent

private long bytesSent
The total bytes of messages sent over this DataPath


messagesSent

private long messagesSent
The total number of messages sent over this DataPath


maxSentMessageSize

private int maxSentMessageSize
The size of the largest message sent over this DataPath


myEmbargoLock

private final ConditionLock myEmbargoLock
for debugging purposes

Constructor Detail

DataPath

DataPath(VatTPMgr connMgr,
         Socket tcpConnection,
         KeyPair identityKeys,
         String localVatID,
         Vat vat,
         String localFlattenedSearchPath,
         VatLocationLookup vls)
Make a new DataPath for an incoming connection

Parameters:
connMgr - The VatTPMgr to notify of status changes.
tcpConnection - The Socket to the TCP connection.
identityKeys - is the KeyPair which defines the identity of this vat.
vat - is the Vat whose thread we use to synchronize calls into the DataPath object from the SendThread and ReceiveThread
localFlattenedSearchPath - Is the search path we publish for references to us, flattened into a string.

DataPath

DataPath(VatTPMgr connMgr,
         VatTPConnection connection,
         String remoteVatID,
         String remoteAddr,
         Hashtable addressesTried,
         KeyPair identityKeys,
         String localVatID,
         Vat vat,
         byte[] outgoingSuspendID,
         AuthSecrets protocolParms,
         String localFlattenedSearchPath)
Make a new DataPath

Parameters:
connMgr - is the VatTPMgr if this DataPath is supporting a "connectToVatAt" operation. Otherwise it is null.
connection - The VatTPConnection to notify of status changes.
remoteVatID - the vatID of the remote vat
remoteAddr - is the dotted IP address or DNS name and port number of the place to try for the remote vat.
addressesTried - is a Hashtable of the InetAddresses already tried to locate the vat which failed due to host, rather than port related problems, and are not to be tried again.
identityKeys - is the KeyPair which defines the identity of this vat.
vat - is the Vat whose thread we use to synchronize calls into the DataPath object from the SendThread and ReceiveThread
outgoingSuspendID - Is the Suspend ID we are to send to the remote vat for resuming a connection, or null for a new connection.
protocolParms - is the AuthSecrets object which has the resume state for protocol versions an protocol parameters for resuming a connection, or null for a new connection.
localFlattenedSearchPath - Is the search path we publish for references to us, flattened into a string.
Method Detail

getEmbargoLock

ConditionLock getEmbargoLock()

acceptReceiver

void acceptReceiver(RecvThread receiver,
                    NetAddr remoteNetAddr,
                    NetAddr localNetAddr)
Accept the RecvThread from the SendThread when it has been created.

Parameters:
receiver - is the RecvThread object.
remoteNetAddr - is the remote end's address.
localNetAddr - is the local end's address.

cantResume

void cantResume(String reason)
Handle the case where the resume ID's don't match.

Parameters:
reason - The reason for abandoning the attempt.

commonConstructionSetup

private SynchQueue commonConstructionSetup(KeyPair identityKeys,
                                           String localVatID,
                                           byte[] suspendID,
                                           Vat vat,
                                           String localFlattenedSearchPath,
                                           VatLocationLookup vls)
Do the common setup operations for the two constructor methods.

Parameters:
identityKeys - is the public/private key pair that define this vat.
localVatID - is the vatID of this vat.
suspendID - is the suspend ID to send during the startup protocol, or null if this is an incoming connection or a new connection.
vat - is the Vat to synchronize on when other threads call methods which are "in vat".
localFlattenedSearchPath - is the search path for this vat as a String, the elements separated by semicolons.
vls - is the object which gives VLS look functionality, or null if this vat doesn't support that functionality or if this is an outgoing connection.
Returns:
is the SynchQueue object for reading the output queue.

connectConnection

String connectConnection(VatTPConnection connection,
                         AuthSecrets protocolParms)
Connect this DataPath to a VatTPConnection. Used for incoming sockets and anonymous outgoing sockets to connect the DataPath after the remote end has been identified.

Parameters:
connection - is the VatTPConnection object to connect to.
protocolParms - is the protocol versions and parameters to use for resuming a connection, or null for none.
Returns:
is the flattened search path to use to find the remote end.

connectionDead

public void connectionDead(VatTPConnection connection,
                           Throwable reason)
Receive notification that the connection is dead. Either the connection has been shut down with VatTPConnection.shutDownConnection, the underlying TCP connection has failed, a bad MAC has been received on the connection, or an internal error has caused the connection to fail.

If an object is registered to handle more than one message type, it will receive one connectionDead notification for each message type for which it is registered.

Specified by:
connectionDead in interface MsgHandler
Parameters:
connection - The VatTPConnection object which has just died. For this use, it may be null.
reason - is a Throwable describing why the connection was shut down.

connectToSelf

void connectToSelf()
Notify the connectToVatAt caller that he has tried to connect to self


duplicatePath

void duplicatePath(String reason,
                   boolean isIdentified)
Duplicate crossed connection DataPath exiting.

Parameters:
reason - The reason for abandoning the attempt.

enqueue

void enqueue(Object message)
Enqueue a message for output.

Parameters:
message - the message to queue

extendSearchPath

void extendSearchPath(ConstList addresses)
Add an address to the list of addresses to try.

Parameters:
addresses - An array of Strings of the addresses to add.

getRemoteAddress

String getRemoteAddress()
Return the remote address we are connected to.


getStartupState

int getStartupState()
Get the startup state of this DataPath.

Returns:
the current state of this DataPath

identifyIncoming

int identifyIncoming(String localVatID,
                     String remoteVatID,
                     String remoteSearchPath)
               throws IOException
Identify the remote vat's vatID for an incoming connection.

Parameters:
remoteVatID - the VatID of the remote vat.
remoteSearchPath - the search path to use to find the remote vat in array form.
Returns:
the code received from VatTPMgr.newIncomingConnectionIdentified
IOException

identifyOutgoing

Object identifyOutgoing(String localVatID,
                        String remoteVatID,
                        String remoteFlattenedSearchPath)
                  throws IOException
Identify the remote vat's vatID for an outgoing "to WHOEVER" connection. Called by the StartUpProtocol.

Parameters:
remoteVatID - is the VatID of the remote vat.
remoteFlattenedSearchPath - is the search path to use to find the remote vat.
Returns:
an object the type of which determines what to do next. If the object is a String, then the startup protocol should terminate the connection attempt. If the object is a byte array, then there is a suspended connection to the remote vat and the array is the suspendID. If the object is null, then the connection is a new connection and the StartUpProtocol should proceed.
IOException

isChoiceIncoming

boolean isChoiceIncoming()
Decide whether to keep incoming connection or outgoing when other end asks us to chose on an outgoing connection.

Returns:
true - Keep the incoming connection of the pair.
false - Keep the outgoing connection of the pair.

newIncomingMsg

void newIncomingMsg(byte[] message)
Handle a new incoming message from the remote end.

Parameters:
message - the incoming message as a byte array. The first character is the message type as defined in class Msg.

noticeProblem

void noticeProblem(Throwable problem)
Notice problem on the connection. This method is called by the Send and Recv threads when they encounter a fatal error and give up.

Parameters:
problem - The exception which describes the problem

run

public void run(long tick)
Called by Clocks on their targets after each tick.

Specified by:
run in interface TickReactor
Parameters:
tick - is the current tick count for the Clock - ignored

processMessage

public void processMessage(byte[] message,
                           VatTPConnection connection)
Process an incoming message from the VatTPConnection.

Specified by:
processMessage in interface MsgHandler
Parameters:
message - is the incoming message. The first byte (message[0]) is the message type (see class Msg). A handler which is registered for more than one message can use the initial byte to determine what is the message type of the current message.
connection - is the VatTPConnection object on which the the message arrived. For this use, it may be null.
See Also:
Msg, VatTPConnection

recordSendProgress

void recordSendProgress()
Record that sending data has made progress for ping/pong logic.


registerMsgHandler

void registerMsgHandler(int msgType,
                        MsgHandler handler)
                  throws IOException
Register a MsgHandler for a particular message type. It will be called whenever a new message of the specified type arrives. It will also be called when the connection dies.

Parameters:
msgType - is the message type. It must be chosen from the types defined in the definition class Msg. It is an error to register more than one handler for a message type. It is a fatal error if the msgType is out of range. It is a fatal error to register a handler after the reactToNewConnection method of the object registered with the VatTPMgr has returned.
handler - is an object which implements the MsgHandler interface. It will be called with processMessage when a message of msgType is received from the remote vat. It will be called with connectionDead when this connection dies.
Throws:
IOException - is thrown if there is already a handler registered for this message type.
See Also:
Msg, MsgHandler, VatTPMgr

resumeConnection

boolean resumeConnection(byte[] suspendID)
Check if a connection should be resumed.

Parameters:
suspendID - The secret that the other side must know to resume a connection or null if this is a new connection.
Returns:
true if the connection should be completed.

false if the connection should be abandoned.


sendFinished

void sendFinished(int count,
                  int length,
                  StreamMessage msg)
sendFinished called when a message has been completely passed to TCP

Parameters:
length - The size of the message passed to TCP.

setAuthorizationSecrets

void setAuthorizationSecrets(byte[] dhSecret,
                             byte[] sendSequence,
                             byte[] recvSequence)
Set the initial sequences to be used with a connection. Called by StartUpProtocol when they are calculated.

Parameters:
dhSecret - is the Diffie Hellman shared secret.
sendSequence - is the first sequence number to be used to send data to the remote vat.
recvSequence - is the first sequence number to be used to receive data from the remote vat.

shutDownFinished

void shutDownFinished(byte[] sendSequence,
                      Throwable reason)
shutDownFinished called when all queued messages have been sent and the socket closed.

Parameters:
sendSequence - is the next sequence number to use on the send path after resume.
reason - is a Throwable which describes why the shutdown.

shutDownPath

void shutDownPath()
Causes a shutdown of the path. Any queued messages will be sent.


startupSuccessful

void startupSuccessful(String eMsgProtocolVersion,
                       String protocolSuite)
                 throws IOException
Called by StartUpProtocol when a connection to a remote vat has been successfully made. The StartUpProtocol object has unregistered its MsgHandlers and is ready to become garbage.

Parameters:
eMsgProtocolVersion - the version of the E message protocl selected.
protocolSuite - the type of authentication selected.
IOException

stopStartUpProtocol

void stopStartUpProtocol()
Stop the start up protocol on this DataPath. This method is called by the VatTPMgr when it determines that there are two connections being built between this vat and another vat, and that this connection is the one that should be abandonded.


toString

public String toString()
Describe this path object.

Overrides:
toString in class Object
Returns:
a string representation of the object.

tryNext

void tryNext(String reason)
Stop TCP to this address and try the next on on the list.

Parameters:
reason - is a message describing the reason for abandoning this try.

unRegisterMsgHandler

void unRegisterMsgHandler(int msgType,
                          MsgHandler handler)
                    throws IOException
Remove the registration for a handler for a particular message type.

Parameters:
msgType - is the message type. It must be chosen from the types defined in the definition class Msg. It is a fatal error if the msgType is out of range.
handler - is the object currently registered as the handler.
Throws:
IOException - is thrown if the handler passed is not the handler registered for this message type.
See Also:
Msg, MsgHandler


comments?