Competition between threads

This is the forum for miscellaneous technical/programming questions.

Moderator: 2ffat

Competition between threads

Postby mark_c » Tue Jul 10, 2018 12:05 pm

I wrote this little example for didactic purposes to understand where there is competition between the two threads.
The SendMyMessage function is contended between the two threads and could cause malfunctions or mistakes?

thank you

Code: Select all
//---------------------------------------------------------------------------
#include <vcl\vcl.h>

#pragma hdrstop

#include "Unit1.h"

#include <windows.h>
#include <stdlib.h>
#include <stdio.h>

bool bAbort;

void SendMyMessage(char *msg);

DWORD WINAPI Thread1( LPVOID lpParameter );
DWORD WINAPI Thread2( LPVOID lpParameter );
DWORD ID1, ID20;
HANDLE Trd1, Trd2;

//---------------------------------------------------------------------------

#pragma link "Grids"
#pragma resource "*.dfm"
TForm1 *Form1;
//---------------------------------------------------------------------------
__fastcall TForm1::TForm1(TComponent* Owner)
   : TForm(Owner)
{
        ServerSocket1->Port = 5000;
        ServerSocket1->Active = True;
}
//---------------------------------------------------------------------------
void __fastcall TForm1::Button1Click(TObject *Sender)
{
        bAbort = false;
       
        Trd1 = CreateThread(NULL,0,Thread1,NULL,0,&ID1);
        Trd2 = CreateThread(NULL,0,Thread2,NULL,0,&ID2);

        Button1->Enabled=false;
        Button2->Enabled=true;
}
//---------------------------------------------------------------------------
void __fastcall TForm1::Button2Click(TObject *Sender)
{
        bAbort = true;

        Button1->Enabled=true;
        Button2->Enabled=false;
}
//---------------------------------------------------------------------------

DWORD WINAPI Thread1( LPVOID lpParameter )
{
        while(!bAbort)
        {
                SendMyMessage("aabbcc");
                Sleep(50);
                smf=1;
        }
        return 0;
}
//---------------------------------------------------------------------------

DWORD WINAPI Thread2( LPVOID lpParameter )
{
        while(!bAbort)
        {
                SendMyMessage("ddeeff");
                Sleep(50);
                smf=0;
        }
        return 0;
}

//---------------------------------------------------------------------------

void SendMyMessage(char *msg)
{
        try{
                for(int actconn = 0; actconn < Form1->ServerSocket1->Socket->ActiveConnections; actconn++)
                        Form1->ServerSocket1->Socket->Connections[actconn]->SendText(msg);

         } catch(...) { }

         msg[0]='\0';

}
//---------------------------------------------------------------------------

void __fastcall TForm1::FormDestroy(TObject *Sender)
{

        TerminateThread(Trd1,0);
        TerminateThread(Trd2,0);

        Form1->Caption = "Stopped.....";

        ServerSocket1->Active = False;
}
//---------------------------------------------------------------------------

void __fastcall TForm1::ServerSocket1ClientError(TObject *Sender,
      TCustomWinSocket *Socket, TErrorEvent ErrorEvent, int &ErrorCode)
{
        if (ErrorCode == 10053) {
                Socket->Close();
        }

        ErrorCode = 0;
}
//---------------------------------------------------------------------------

mark_c
BCBJ Guru
BCBJ Guru
 
Posts: 102
Joined: Thu Jun 21, 2012 1:13 am

Re: Competition between threads

Postby rlebeau » Tue Jul 10, 2018 1:43 pm

mark_c wrote:The SendMyMessage function is contended between the two threads and could cause malfunctions or mistakes?


Yes, very many mistakes.

You are not providing ANY synchronization whatsoever when accessing the ServerSocket's 'ActiveCconnections' list. You have two threads writing data to the same clients at the same time without serializing writes to any given client. 'SendText()' is NOT guaranteed to send the String's entire data, especially in non-blocking mode (which is the server's default mode). So you have a VERY HIGH risk of 1) overlapping messages to any given client, and/or 2) sending partial messages to any given client, either one of which will corrupt your communications.

Also, clients are free to connect to, and disconnect from, your server at ANY time, even while 'SendMyMessage()' is running, so your 'for' loop itself is unsafe since the 'ActiveConnections' can change at ANY time, causing your 'actconn' index to go out of bounds, or you to access 'TCustomWinSocket' objects that are in progress of being destroyed, etc. The server has an internal 'TCriticalSection' to synchronize its own access to the 'ActiveConnections' list, but unfortunately you cannot access that 'TCriticalSection' in your code. As such:

  • in non-blocking mode (ServerType=stNonBlocking, which is the default), you will have to synchronize with the main UI thread when accessing the 'ActiveConnections' list.
  • in thread-blocking mode (ServerType=stThreadBlocking), do not use the 'ActiveConnections' list at all. Track the connected clients yourself in your own thread-safe list, using the server's 'OnClientConnect 'and 'OnClientDisconnect' events to update that list.

Also, "msg[0]='\0';" will DEFINITELY crash, since you are passing in read-only string literals to SendMyMessage(). Had you been using C++11, or at least declared 'msg' as 'const char*' (which you should), the compiler would have failed with errors to catch this mistake. Why are you nulling out the caller's message text at all? Don't do that, that logic does not belong in this code.

Also, what is 'smf'? You have a variable that is being shared by both threads and is being accessed without synchronization, either.

Also, DO NOT EVER use the 'OnDestroy' event in C++. It is a Delphi idiom that can be called AFTER the destructor, which is illegal in C++. Use the Form's actual destructor instead.

But, more importantly, DO NOT EVER use 'TerminateThread()'! Your threads are setup to monitor a 'bAbort' flag, so use that instead, and then you can use '(Msg)WaitForMultipleObjects()' to wait for the threads to fully terminate themselves cleanly. In fact, you should be doing that in your 'Button2Click()' event handler, too.

Also, you are leaking the thread handles, as you never call 'CloseHandle()' on them. Why are you using 'CreateThread()' directly instead of using the RTL's 'TThread' class?
Last edited by rlebeau on Wed Jul 11, 2018 10:37 am, edited 1 time in total.
Remy Lebeau (TeamB)
Lebeau Software
User avatar
rlebeau
BCBJ Author
BCBJ Author
 
Posts: 1508
Joined: Wed Jun 01, 2005 3:21 am
Location: California, USA

Re: Competition between threads

Postby mark_c » Tue Jul 10, 2018 11:07 pm

as always you provide comprehensive and complete answers, worthy of a teacher in this matter.
How to solve the problem of free disconnection of clients?
mark_c
BCBJ Guru
BCBJ Guru
 
Posts: 102
Joined: Thu Jun 21, 2012 1:13 am

Re: Competition between threads

Postby rlebeau » Wed Jul 11, 2018 11:52 am

mark_c wrote:How to solve the problem of free disconnection of clients?


I answered that in my previous reply:

rlebeau wrote:
  • in non-blocking mode (ServerType=stNonBlocking, which is the default), you will have to synchronize with the main UI thread when accessing the 'ActiveConnections' list.
  • in thread-blocking mode (ServerType=stThreadBlocking), do not use the 'ActiveConnections' list at all. Track the connected clients yourself in your own thread-safe list, using the server's 'OnClientConnect 'and 'OnClientDisconnect' events to update that list.


Assuming you are using stNonBlocking (since it is the default, and your code isn't handling events related to stThreadBlocking), try something like the following to have your worker threads delegate to the main UI thread when sending messages to clients. This way, the main UI thread is the only thread to ever touch the 'ActiveConnections' list directly:

Code: Select all
//---------------------------------------------------------------------------
#include <vcl.h>
#pragma hdrstop

#include "Unit1.h"

void SendMyMessage(const char *Msg);

DWORD WINAPI Thread1( LPVOID lpParameter );
DWORD WINAPI Thread2( LPVOID lpParameter );

HANDLE Trd1 = NULL, Trd2 = NULL;
bool bAbort = false;

//---------------------------------------------------------------------------

#pragma link "Grids"
#pragma resource "*.dfm"
TForm1 *Form1;
//---------------------------------------------------------------------------
__fastcall TForm1::TForm1(TComponent* Owner)
   : TForm(Owner)
{
    ServerSocket1->Port = 5000;
    ServerSocket1->Active = True;
}
//---------------------------------------------------------------------------
__fastcall TForm1::~TForm1()
{
    Caption = "Stopping.....";
    StopThreads();
    ServerSocket1->Active = False;
}
//---------------------------------------------------------------------------
void __fastcall TForm1::Button1Click(TObject *Sender)
{
    StartThreads();
    Button1->Enabled = false;
    Button2->Enabled = true;
}
//---------------------------------------------------------------------------
void __fastcall TForm1::Button2Click(TObject *Sender)
{
    StopThreads();
    Button1->Enabled = true;
    Button2->Enabled = false;
}
//---------------------------------------------------------------------------
void __fastcall TForm1::StartThreads()
{
    bAbort = false;
       
    DWORD ID1, ID2;
    Trd1 = CreateThread(NULL, 0,Thread1, NULL, 0, &ID1);
    Trd2 = CreateThread(NULL, 0,Thread2, NULL, 0, &ID2);
}
//---------------------------------------------------------------------------
void __fastcall TForm1::StopThreads()
{
    bAbort = true;

    if ((!Trd1) && (!Trd2))
        return;

    // wait for threads to terminate, processing sync requests in the meantime...

    HANDLE h[3];
    DWORD Num, WaitResult = 0;
    MSG Msg;

    h[0] = System::Classes::SyncEvent;
    Num = 1;

    if (Trd1)
        h[Num++] = Trd1;

    if (Trd2)
        h[Num++] = Trd2;

    do
    {
        WaitResult = MsgWaitForMultipleObjects(Num, h, FALSE, INFINITE, QS_SENDMESSAGE);
        if (WaitResult == WAIT_FAILED)
            break;

        DWORD Index = (WaitResult - WAIT_OBJECT_0);
        if (Index == 0)
            CheckSynchronize();
        else if (Index == Num)
            PeekMessage(&Msg, NULL, 0, 0, PM_NOREMOVE);
        else
        {
            for(DWORD i = Index+1; i < Num; ++i)
                h[i-1] = h[i];
            --Num;
        }
    }
    while (Num > 1);

    if (Trd1)
    {
        CloseHandle(Trd1);
        Trd1 = NULL;
    }

    if (Trd2)
    {
        CloseHandle(Trd2);
        Trd2 = NULL;
    }
}
//---------------------------------------------------------------------------
DWORD WINAPI Thread1( LPVOID lpParameter )
{
    while (!bAbort)
    {
        SendMyMessage("aabbcc");
        Sleep(50);
    }
    return 0;
}
//---------------------------------------------------------------------------
DWORD WINAPI Thread2( LPVOID lpParameter )
{
    while (!bAbort)
    {
        SendMyMessage("ddeeff");
        Sleep(50);
    }
    return 0;
}
//---------------------------------------------------------------------------
int SendRaw(TCustomWinSocket *Socket, void *Data, int Size)
{
    int Total = 0;
    BYTE *ptr = static_cast<BYTE*>(Data);
    while (Size > 0)
    {
        int Sent = Socket->SendBuf(ptr, Size);
        if (Sent <= 0)
            break;
        ptr += Sent;
        Size -= Sent;
        Total += Sent;
    }
    return Total;
}
//---------------------------------------------------------------------------
void __fastcall TForm1::SendMsg(const char *Msg)
{
    const int MsgLen = strlen(Msg);

    for(int actconn = 0; actconn < ServerSocket1->Socket->ActiveConnections; ++actconn)
    {
        TCustomWinSocket *Socket = ServerSocket1->Socket->Connections[actconn];

        const char *ptr = Msg;
        int len = MsgLen;

        // if the client has no pending data waiting to be sent, send the Msg now...

        TMemoryStream *Buffer = static_cast<TMemoryStream*>(Socket->Data);
        if (Buffer->Size == 0)
        {
            int Sent = SendRaw(Socket, ptr, len);
            if (Sent > 0)
            {
                ptr += Sent;
                len -= Sent;
            }
        }

        // buffer any data that hasn't been sent, and wait for the OnClientWrite event to try again...

        if (len > 0)
        {
            Buffer->Seek(soEnd, 0);
            Buffer->WriteBuffer(ptr, len);
        }
    }
}
//---------------------------------------------------------------------------
struct TSendMsg
{
    const char *fMsg;

    void __fastcall Send()
    {
        Form1->SendMsg(fMsg);
    }
};

void SendMyMessage(const char *Msg)
{
    // let the main UI thread send the Msg to all clients...
    TSendMsg sm;
    sm.fMsg = Msg;
    TThread::Synchronize(NULL, &(sm.Send));
}
//---------------------------------------------------------------------------
void __fastcall TForm1::ServerSocket1ClientConnect(TObject *Sender,
      TCustomWinSocket *Socket)
{
    // allocate a buffer for use with the OnClientWrite event...
    Socket->Data = new TMemoryStream;
}
//---------------------------------------------------------------------------
void __fastcall TForm1::ServerSocket1ClientDisconnect(TObject *Sender,
      TCustomWinSocket *Socket)
{
    // free the buffer...
    delete static_cast<TMemoryStream*>(Socket->Data);
    Socket->Data = NULL;
}
//---------------------------------------------------------------------------
void __fastcall TForm1::ServerSocket1ClientWrite(TObject *Sender,
      TCustomWinSocket *Socket)
{
    TMemoryStream *Buffer = static_cast<TMemoryStream*>(Socket->Data);
    if (!Buffer)
        return;

    // if the client has any pending data waiting to be sent, send it now and remove it from the buffer...

    __int64 Remaining = Buffer->Size;
    if (Remaining == 0)
        return;

    int Sent = SendRaw(Socket, Buffer->Memory, (int) min(Remaining, MaxInt));
    if (Sent <= 0)
        return;

    Remaining -= Sent;
    memmove(Buffer->Memory, static_cast<BYTE*>(Buffer->Memory) + Sent, Remaining);
    Buffer->Size = Remaining;
}
//---------------------------------------------------------------------------
void __fastcall TForm1::ServerSocket1ClientError(TObject *Sender,
      TCustomWinSocket *Socket, TErrorEvent ErrorEvent, int &ErrorCode)
{
    // close the client socket...
    Socket->Close();
    ErrorCode = 0;
}
//---------------------------------------------------------------------------
Remy Lebeau (TeamB)
Lebeau Software
User avatar
rlebeau
BCBJ Author
BCBJ Author
 
Posts: 1508
Joined: Wed Jun 01, 2005 3:21 am
Location: California, USA

Re: Competition between threads

Postby mark_c » Wed Jul 11, 2018 10:51 pm

sorry but what is not clear to me is, referring to my initial code, if three clients connect on port 5000 the messages produced by trd1 are divided between the three threads, right?

example: if Trd1 produces 100 messages per second, hypothetically each client receives 33 messages per second?
mark_c
BCBJ Guru
BCBJ Guru
 
Posts: 102
Joined: Thu Jun 21, 2012 1:13 am


Return to Technical

Who is online

Users browsing this forum: Bing [Bot] and 10 guests

cron