Implemented fileevent support
authorPat Thoyts <patthoyts@users.sourceforge.net>
Thu, 14 Oct 2010 23:59:55 +0000 (00:59 +0100)
committerPat Thoyts <patthoyts@users.sourceforge.net>
Fri, 15 Oct 2010 00:28:04 +0000 (01:28 +0100)
Added tests for readable fileevent and asynchronous fcopy.

Signed-off-by: Pat Thoyts <patthoyts@users.sourceforge.net>
tclstorage.c
tests/tclstorage.test

index 33d9f6272cf126af9d6361c0b63e5ce62d974e37..a7763f389f971b370160e91768c4cb5f23e48096 100644 (file)
@@ -73,6 +73,7 @@ extern Tcl_ObjCmdProc PropertySetNamesCmd;
 
 static long UNIQUEID = 0;
 
+static Tcl_InterpDeleteProc PackageDeleteProc;
 static int GetItemInfo(Tcl_Interp *interp, IStorage *pstg, 
     Tcl_Obj *pathObj, STATSTG *pstatstg);
 static void TimeToFileTime(time_t t, LPFILETIME pft);
@@ -87,14 +88,40 @@ static Tcl_DriverWatchProc     StorageChannelWatch;
 static Tcl_DriverGetHandleProc StorageChannelGetHandle;
 static Tcl_DriverWideSeekProc  StorageChannelWideSeek;
 
-typedef struct {
+static int EventProc(Tcl_Event *evPtr, int flags);
+static void SetupProc(ClientData clientData, int flags);
+static void CheckProc(ClientData clientData, int flags);
+
+#define STORAGE_PACKAGE_KEY  "StoragePackageKey"
+#define STORAGE_FLAG_ASYNC   (1<<1)
+#define STORAGE_FLAG_PENDING (1<<2)
+
+struct Package;
+
+typedef struct StorageChannel {
+    Tcl_Channel chan;
+    struct Package *pkgPtr;
+    struct StorageChannel *nextPtr;
     Tcl_Interp *interp;
     DWORD grfMode;
     int watchmask;
     int validmask;
+    int flags;
     IStream *pstm;
 } StorageChannel;
 
+typedef struct Package {
+    struct StorageChannel *headPtr;
+    unsigned long count;
+    unsigned long uid;
+} Package;
+
+typedef struct ChannelEvent {
+    Tcl_Event header;
+    StorageChannel *instPtr;
+    int flags;
+} ChannelEvent;
+
 static Tcl_ChannelType StorageChannelType = {
     "storage",
     (Tcl_ChannelTypeVersion)TCL_CHANNEL_VERSION_2,
@@ -214,13 +241,19 @@ int
 Storage_Init(Tcl_Interp *interp)
 {
     EnsembleCmdData *dataPtr;
-    
-#ifdef USE_TCL_STUBS
+    Package *pkgPtr;
+
     if (Tcl_InitStubs(interp, "8.2", 0) == NULL) {
         return TCL_ERROR;
     }
-#endif
     
+    pkgPtr = (Package *)ckalloc(sizeof(Package));
+    pkgPtr->headPtr = NULL;
+    pkgPtr->count = 0;
+    pkgPtr->uid = 0;
+    Tcl_CreateEventSource(SetupProc, CheckProc, pkgPtr);
+    Tcl_SetAssocData(interp, STORAGE_PACKAGE_KEY, PackageDeleteProc, pkgPtr);
+
     dataPtr = (EnsembleCmdData *)ckalloc(sizeof(EnsembleCmdData));
     dataPtr->ensemble = StorageEnsemble;
     dataPtr->clientData = NULL;
@@ -275,6 +308,30 @@ StorageCmdDeleteProc(ClientData clientData)
     ckfree((char *)data);
 }
 \f
+/*
+ * ----------------------------------------------------------------------
+ *
+ * PackageDeleteProc -
+ *
+ *     Clean up the allocated memory associated with the package.
+ *
+ * Results:
+ *     None.
+ *
+ * Side effects:
+ *     Memory free'd and the event source removed.
+ *
+ * ----------------------------------------------------------------------
+ */
+
+static void
+PackageDeleteProc(ClientData clientData, Tcl_Interp *interp)
+{
+    Package *pkgPtr = clientData;
+    Tcl_DeleteEventSource(SetupProc, CheckProc, pkgPtr);
+    ckfree((char *)pkgPtr);
+}
+\f
 /*
  * ----------------------------------------------------------------------
  *
@@ -658,9 +715,9 @@ StorageOpenCmd(ClientData clientData, Tcl_Interp *interp,
             Tcl_SetObjResult(interp, errObj);
             r = TCL_ERROR;
         } else {
+           Package *pkgPtr;
             StorageChannel *inst;
             char name[3 + TCL_INTEGER_SPACE];
-            Tcl_Channel chan;
            
             _snprintf(name, 3 + TCL_INTEGER_SPACE, "stm%ld", 
                InterlockedIncrement(&UNIQUEID));
@@ -669,16 +726,25 @@ StorageOpenCmd(ClientData clientData, Tcl_Interp *interp,
             inst->grfMode = mode;
             inst->interp = interp;
             inst->watchmask = 0;
+           inst->flags = 0;
            /* bit0 set then not readable */
             inst->validmask = (mode & STGM_WRITE) ? 0 : TCL_READABLE;
             inst->validmask |= (mode & (STGM_WRITE|STGM_READWRITE)) 
                ? TCL_WRITABLE : 0;
-            chan = Tcl_CreateChannel(&StorageChannelType, name, 
-               inst, inst->validmask);
-            Tcl_RegisterChannel(interp, chan);
+            inst->chan = Tcl_CreateChannel(&StorageChannelType, name, 
+                   inst, inst->validmask);
+            Tcl_RegisterChannel(interp, inst->chan);
             if (mode & STGM_APPEND) {
-                Tcl_Seek(chan, 0, SEEK_END);
+                Tcl_Seek(inst->chan, 0, SEEK_END);
            }
+
+           /* insert at head of channels list */
+           pkgPtr = Tcl_GetAssocData(interp, STORAGE_PACKAGE_KEY, NULL);
+           inst->pkgPtr = pkgPtr;
+           inst->nextPtr = pkgPtr->headPtr;
+           pkgPtr->headPtr = inst;
+           ++pkgPtr->count;
+
             Tcl_SetObjResult(interp, Tcl_NewStringObj(name, -1));
             r = TCL_OK;
         }
@@ -915,6 +981,8 @@ StorageRemoveCmd(ClientData clientData, Tcl_Interp *interp,
  * StorageChannelClose -
  *
  *     Called by the Tcl channel layer to close the channel.
+ *     The channel must be removed from the linked list help
+ *     in the Package structure.
  *
  * Results:
  *     A standard Tcl result
@@ -928,11 +996,22 @@ StorageRemoveCmd(ClientData clientData, Tcl_Interp *interp,
 static int 
 StorageChannelClose(ClientData instanceData, Tcl_Interp *interp)
 {
-    StorageChannel *chan = (StorageChannel *)instanceData;
+    StorageChannel *instPtr = instanceData;
+    StorageChannel **tmpPtrPtr;
+    Package *pkgPtr = instPtr->pkgPtr;
     
-    if (chan->pstm)
-        chan->pstm->lpVtbl->Release(chan->pstm);
-    ckfree((char *)chan);
+    /* remove this channel from the package list */
+    tmpPtrPtr = &pkgPtr->headPtr;
+    while (*tmpPtrPtr && *tmpPtrPtr != instPtr) {
+       tmpPtrPtr = &(*tmpPtrPtr)->nextPtr;
+    }
+    *tmpPtrPtr = instPtr->nextPtr;
+    --pkgPtr->count;
+
+    /* free the stream and the memory */
+    if (instPtr->pstm)
+        instPtr->pstm->lpVtbl->Release(instPtr->pstm);
+    ckfree((char *)instPtr);
     
     return TCL_OK;
 }
@@ -1079,8 +1158,11 @@ StorageChannelWideSeek(ClientData instanceData, Tcl_WideInt offset,
  *
  * StorageChannelWatch -
  *
- *     Called by the Tcl channel layer to check that we are ready for
- *      file events. We are.
+ *     Called by the Tcl channel layer when someone calls 'fileevent' on
+ *     our channel handle. As we are always readable and writable, we
+ *     set the watchmask flag appropriately and set the blocktime to 0
+ *     This allows the notified to call SetupProc and CheckProc to
+ *     poll any of the channels from this package for events.
  *
  * Results:
  *     None.
@@ -1137,6 +1219,79 @@ StorageChannelGetHandle(ClientData instanceData,
     return SUCCEEDED(hr) ? TCL_OK : TCL_ERROR;
 }
 \f
+static int
+EventProc(Tcl_Event *evPtr, int flags)
+{
+    ChannelEvent *eventPtr = (ChannelEvent *)evPtr;
+    StorageChannel *chanPtr = eventPtr->instPtr;
+
+    if (!(flags & TCL_FILE_EVENTS)) {
+       return 0;
+    }
+    chanPtr->flags &= ~STORAGE_FLAG_PENDING;
+    Tcl_NotifyChannel(chanPtr->chan, chanPtr->watchmask & eventPtr->flags);
+    return 1;
+}
+\f
+/**
+ * This function is called to setup the notifier to monitor our
+ * channel for file events. Our CheckProc will be called anyway after some
+ * interval so we really only need to ensure that it is called at some 
+ * appropriate interval.
+ */
+
+static void
+SetupProc(ClientData clientData, int flags)
+{
+    Package *pkgPtr = clientData;
+    StorageChannel *chanPtr = NULL;
+    int msec = 10000;
+    Tcl_Time blockTime = {0, 0};
+    
+    if (!(flags & TCL_FILE_EVENTS)) {
+       return;
+    }
+    
+    for (chanPtr = pkgPtr->headPtr; chanPtr != NULL; chanPtr = chanPtr->nextPtr) {
+       msec = 10;
+    }
+    blockTime.sec = msec / 1000;
+    blockTime.usec = (msec % 1000) * 1000;
+    Tcl_SetMaxBlockTime(&blockTime);
+}
+\f
+static void
+CheckProc(ClientData clientData, int flags)
+{
+    Package *pkgPtr = clientData;
+    StorageChannel *chanPtr = NULL;
+    int mask;
+
+    if (!(flags & TCL_FILE_EVENTS)) {
+       return;
+    }
+
+    for (chanPtr = pkgPtr->headPtr; chanPtr != NULL; chanPtr = chanPtr->nextPtr) {
+       if (chanPtr->watchmask == 0) {
+           continue;
+       }
+
+       /* queue an event to trigger the notifier - we use an event
+        * for this to avoid starving other resources
+        * We are always writable and readable.
+        */
+       mask = TCL_WRITABLE | TCL_READABLE;
+       if (chanPtr->watchmask & mask) {
+           ChannelEvent *evPtr = (ChannelEvent *)ckalloc(sizeof(ChannelEvent));
+           chanPtr->flags |= STORAGE_FLAG_PENDING;
+           evPtr->header.proc = EventProc;
+           evPtr->instPtr = chanPtr;
+           evPtr->flags = mask;
+           Tcl_QueueEvent((Tcl_Event *)evPtr, TCL_QUEUE_TAIL);
+       }
+    }
+}
+\f
 /*
  * ----------------------------------------------------------------------
  *
index 4804c6dcffe647090339d428ac31959a309dafea..d77a1e1295ae431f8447a799826755b241f8d77e 100644 (file)
@@ -536,6 +536,129 @@ test storage-3.9 {open sub-storage for reading - fail write} \
     } \
     -result {1 {error opening "test": permission denied}}
 
+proc onRead {chan size cmd} {
+    set data [read $chan $size]
+    if {[set eof [eof $chan]]} {
+        fileevent $chan readable {}
+        set ::waiting eof
+    }
+    uplevel #0 [linsert $cmd end $eof $data]
+}
+
+test storage-4.0 {fileevent single readable} -setup {
+    set stg [storage open stg40.stg w+]
+    set stm [$stg open test.stm w+]
+    puts -nonewline $stm [string repeat a 655]
+    close $stm
+    set ::size 0
+    proc stg40 {eof data} {incr ::size [string length $data]}
+} -body {
+    set stm [$stg open test.stm r]
+    set len [string length [read $stm]]
+    seek $stm 0
+    fileevent $stm readable [list onRead $stm 1024 ::stg40]
+    set aid [after 1000 {set ::waiting timeout}]
+    vwait ::waiting
+    after cancel $aid
+    close $stm
+    list $len $::waiting $::size
+} -cleanup {
+    $stg close
+    file delete -force stg40.stg
+    unset ::size ::waiting
+    rename stg40 {}
+} -result {655 eof 655}
+
+test storage-4.1 {fileevent multiple readable} -setup {
+    set stg [storage open stg41.stg w+]
+    set stm [$stg open test.stm w+]
+    puts -nonewline $stm [string repeat \0\1\2\3\4 10240]
+    close $stm
+    set ::size 0
+    proc stg41 {eof data} {incr ::size [string length $data]}
+} -body {
+    set stm [$stg open test.stm r]
+    set len [string length [read $stm]]
+    seek $stm 0
+    fileevent $stm readable [list onRead $stm 1024 ::stg41]
+    set aid [after 1000 {set ::waiting timeout}]
+    vwait ::waiting
+    after cancel $aid
+    close $stm
+    list $len $::waiting $::size
+} -cleanup {
+    $stg close
+    file delete -force stg41.stg
+    unset ::size ::waiting
+    rename stg41 {}
+} -result {51200 eof 51200}
+
+test storage-5.0 {fcopy async single} -setup {
+    set stg [storage open stg50.stg w+]
+    set stm [$stg open test.stm w+]
+    puts -nonewline $stm [string repeat \0\1\2\3\4 10240]
+    close $stm
+    set outfile [makeFile {} test50.check]
+    set ::size 0
+    proc stg50 {count {err ok}} {set ::waiting $err}
+} -body {
+    set stm [$stg open test.stm r]
+    fconfigure $stm -translation binary -encoding binary -eofchar {}
+    set out [open $outfile w]
+    fconfigure $out -translation binary -encoding binary -eofchar {}
+    set len [string length [read $stm]]
+    seek $stm 0
+    set aid [after 1000 {set ::waiting timeout}]
+    fcopy $stm $out -command stg50
+    vwait ::waiting
+    after cancel $aid
+    close $stm
+    close $out
+    list $len $::waiting [file size $outfile]
+} -cleanup {
+    $stg close
+    file delete -force stg50.stg
+    unset ::size ::waiting
+    rename stg50 {}
+    removeFile $outfile
+} -result {51200 ok 51200}
+
+test storage-5.1 {fcopy async multiple} -setup {
+    set stg [storage open stg51.stg w+]
+    set stm [$stg open test.stm w+]
+    puts -nonewline $stm [string repeat \0\1\2\3\4 10240]
+    close $stm
+    set outfile [makeFile {} test51.check]
+    set ::size 0
+    proc stg51 {stm out count {err ok}} {
+        if {$err ne "ok" || [eof $stm]} {
+            set ::waiting $err
+        } else {
+            fcopy $stm $out -size 1024 -command [list stg51 $stm $out]
+        }
+    }
+} -body {
+    set stm [$stg open test.stm r]
+    fconfigure $stm -translation binary -encoding binary -eofchar {}
+    set out [open $outfile w]
+    fconfigure $out -translation binary -encoding binary -eofchar {}
+    set len [string length [read $stm]]
+    seek $stm 0
+    set aid [after 1000 {set ::waiting timeout}]
+    fcopy $stm $out -size 1024 -command [list stg51 $stm $out]
+    vwait ::waiting
+    after cancel $aid
+    close $stm
+    close $out
+    list $len $::waiting [file size $outfile]
+} -cleanup {
+    $stg close
+    file delete -force stg51.stg
+    unset ::size ::waiting
+    rename stg51 {}
+    removeFile $outfile
+} -result {51200 ok 51200}
+
 # -------------------------------------------------------------------------
 
 ::tcltest::cleanupTests