core_mqtt.c 123 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393
  1. /*
  2. * coreMQTT v2.1.0
  3. * Copyright (C) 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  4. *
  5. * SPDX-License-Identifier: MIT
  6. *
  7. * Permission is hereby granted, free of charge, to any person obtaining a copy of
  8. * this software and associated documentation files (the "Software"), to deal in
  9. * the Software without restriction, including without limitation the rights to
  10. * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  11. * the Software, and to permit persons to whom the Software is furnished to do so,
  12. * subject to the following conditions:
  13. *
  14. * The above copyright notice and this permission notice shall be included in all
  15. * copies or substantial portions of the Software.
  16. *
  17. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  19. * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  20. * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  21. * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  22. * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  23. */
  24. /**
  25. * @file core_mqtt.c
  26. * @brief Implements the user-facing functions in core_mqtt.h.
  27. */
  28. #include <string.h>
  29. #include <assert.h>
  30. #include "core_mqtt.h"
  31. #include "core_mqtt_state.h"
  32. /* Include config defaults header to get default values of configs. */
  33. #include "core_mqtt_config_defaults.h"
  34. #include "core_mqtt_default_logging.h"
  35. #ifndef MQTT_PRE_SEND_HOOK
  36. /**
  37. * @brief Hook called before a 'send' operation is executed.
  38. */
  39. #define MQTT_PRE_SEND_HOOK( pContext )
  40. #endif /* !MQTT_PRE_SEND_HOOK */
  41. #ifndef MQTT_POST_SEND_HOOK
  42. /**
  43. * @brief Hook called after the 'send' operation is complete.
  44. */
  45. #define MQTT_POST_SEND_HOOK( pContext )
  46. #endif /* !MQTT_POST_SEND_HOOK */
  47. #ifndef MQTT_PRE_STATE_UPDATE_HOOK
  48. /**
  49. * @brief Hook called just before an update to the MQTT state is made.
  50. */
  51. #define MQTT_PRE_STATE_UPDATE_HOOK( pContext )
  52. #endif /* !MQTT_PRE_STATE_UPDATE_HOOK */
  53. #ifndef MQTT_POST_STATE_UPDATE_HOOK
  54. /**
  55. * @brief Hook called just after an update to the MQTT state has
  56. * been made.
  57. */
  58. #define MQTT_POST_STATE_UPDATE_HOOK( pContext )
  59. #endif /* !MQTT_POST_STATE_UPDATE_HOOK */
  60. /**
  61. * @brief Bytes required to encode any string length in an MQTT packet header.
  62. * Length is always encoded in two bytes according to the MQTT specification.
  63. */
  64. #define CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES ( 2U )
  65. /**
  66. * @brief Number of vectors required to encode one topic filter in a subscribe
  67. * request. Three vectors are required as there are three fields in the
  68. * subscribe request namely:
  69. * 1. Topic filter length; 2. Topic filter; and 3. QoS in this order.
  70. */
  71. #define CORE_MQTT_SUBSCRIBE_PER_TOPIC_VECTOR_LENGTH ( 3U )
  72. /**
  73. * @brief Number of vectors required to encode one topic filter in an
  74. * unsubscribe request. Two vectors are required as there are two fields in the
  75. * unsubscribe request namely:
  76. * 1. Topic filter length; and 2. Topic filter in this order.
  77. */
  78. #define CORE_MQTT_UNSUBSCRIBE_PER_TOPIC_VECTOR_LENGTH ( 2U )
  79. /*-----------------------------------------------------------*/
  80. /**
  81. * @brief Sends provided buffer to network using transport send.
  82. *
  83. * @brief param[in] pContext Initialized MQTT context.
  84. * @brief param[in] pBufferToSend Buffer to be sent to network.
  85. * @brief param[in] bytesToSend Number of bytes to be sent.
  86. *
  87. * @note This operation may call the transport send function
  88. * repeatedly to send bytes over the network until either:
  89. * 1. The requested number of bytes @a bytesToSend have been sent.
  90. * OR
  91. * 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this
  92. * function.
  93. * OR
  94. * 3. There is an error in sending data over the network.
  95. *
  96. * @return Total number of bytes sent, or negative value on network error.
  97. */
  98. static int32_t sendBuffer( MQTTContext_t * pContext,
  99. const uint8_t * pBufferToSend,
  100. size_t bytesToSend );
  101. /**
  102. * @brief Sends MQTT connect without copying the users data into any buffer.
  103. *
  104. * @brief param[in] pContext Initialized MQTT context.
  105. * @brief param[in] pConnectInfo MQTT CONNECT packet information.
  106. * @brief param[in] pWillInfo Last Will and Testament. Pass NULL if Last Will and
  107. * Testament is not used.
  108. * @brief param[in] remainingLength the length of the connect packet.
  109. *
  110. * @note This operation may call the transport send function
  111. * repeatedly to send bytes over the network until either:
  112. * 1. The requested number of bytes @a remainingLength have been sent.
  113. * OR
  114. * 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this
  115. * function.
  116. * OR
  117. * 3. There is an error in sending data over the network.
  118. *
  119. * @return #MQTTSendFailed or #MQTTSuccess.
  120. */
  121. static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext,
  122. const MQTTConnectInfo_t * pConnectInfo,
  123. const MQTTPublishInfo_t * pWillInfo,
  124. size_t remainingLength );
  125. /**
  126. * @brief Sends the vector array passed through the parameters over the network.
  127. *
  128. * @note The preference is given to 'writev' function if it is present in the
  129. * transport interface. Otherwise, a send call is made repeatedly to achieve the
  130. * result.
  131. *
  132. * @param[in] pContext Initialized MQTT context.
  133. * @param[in] pIoVec The vector array to be sent.
  134. * @param[in] ioVecCount The number of elements in the array.
  135. *
  136. * @note This operation may call the transport send or writev functions
  137. * repeatedly to send bytes over the network until either:
  138. * 1. The requested number of bytes have been sent.
  139. * OR
  140. * 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this
  141. * function.
  142. * OR
  143. * 3. There is an error in sending data over the network.
  144. *
  145. * @return The total number of bytes sent or the error code as received from the
  146. * transport interface.
  147. */
  148. static int32_t sendMessageVector( MQTTContext_t * pContext,
  149. TransportOutVector_t * pIoVec,
  150. size_t ioVecCount );
  151. /**
  152. * @brief Add a string and its length after serializing it in a manner outlined by
  153. * the MQTT specification.
  154. *
  155. * @param[in] serializedLength Array of two bytes to which the vector will point.
  156. * The array must remain in scope until the message has been sent.
  157. * @param[in] string The string to be serialized.
  158. * @param[in] length The length of the string to be serialized.
  159. * @param[in] iterator The iterator pointing to the first element in the
  160. * transport interface IO array.
  161. * @param[out] updatedLength This parameter will be added to with the number of
  162. * bytes added to the vector.
  163. *
  164. * @return The number of vectors added.
  165. */
  166. static size_t addEncodedStringToVector( uint8_t serializedLength[ CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES ],
  167. const char * const string,
  168. uint16_t length,
  169. TransportOutVector_t * iterator,
  170. size_t * updatedLength );
  171. /**
  172. * @brief Send MQTT SUBSCRIBE message without copying the user data into a buffer and
  173. * directly sending it.
  174. *
  175. * @param[in] pContext Initialized MQTT context.
  176. * @param[in] pSubscriptionList List of MQTT subscription info.
  177. * @param[in] subscriptionCount The count of elements in the list.
  178. * @param[in] packetId The packet ID of the subscribe packet
  179. * @param[in] remainingLength The remaining length of the subscribe packet.
  180. *
  181. * @return #MQTTSuccess or #MQTTSendFailed.
  182. */
  183. static MQTTStatus_t sendSubscribeWithoutCopy( MQTTContext_t * pContext,
  184. const MQTTSubscribeInfo_t * pSubscriptionList,
  185. size_t subscriptionCount,
  186. uint16_t packetId,
  187. size_t remainingLength );
  188. /**
  189. * @brief Send MQTT UNSUBSCRIBE message without copying the user data into a buffer and
  190. * directly sending it.
  191. *
  192. * @param[in] pContext Initialized MQTT context.
  193. * @param[in] pSubscriptionList MQTT subscription info.
  194. * @param[in] subscriptionCount The count of elements in the list.
  195. * @param[in] packetId The packet ID of the unsubscribe packet.
  196. * @param[in] remainingLength The remaining length of the unsubscribe packet.
  197. *
  198. * @return #MQTTSuccess or #MQTTSendFailed.
  199. */
  200. static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext,
  201. const MQTTSubscribeInfo_t * pSubscriptionList,
  202. size_t subscriptionCount,
  203. uint16_t packetId,
  204. size_t remainingLength );
  205. /**
  206. * @brief Calculate the interval between two millisecond timestamps, including
  207. * when the later value has overflowed.
  208. *
  209. * @note In C, the operands are promoted to signed integers in subtraction.
  210. * Using this function avoids the need to cast the result of subtractions back
  211. * to uint32_t.
  212. *
  213. * @param[in] later The later time stamp, in milliseconds.
  214. * @param[in] start The earlier time stamp, in milliseconds.
  215. *
  216. * @return later - start.
  217. */
  218. static uint32_t calculateElapsedTime( uint32_t later,
  219. uint32_t start );
  220. /**
  221. * @brief Convert a byte indicating a publish ack type to an #MQTTPubAckType_t.
  222. *
  223. * @param[in] packetType First byte of fixed header.
  224. *
  225. * @return Type of ack.
  226. */
  227. static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType );
  228. /**
  229. * @brief Receive bytes into the network buffer.
  230. *
  231. * @param[in] pContext Initialized MQTT Context.
  232. * @param[in] bytesToRecv Number of bytes to receive.
  233. *
  234. * @note This operation calls the transport receive function
  235. * repeatedly to read bytes from the network until either:
  236. * 1. The requested number of bytes @a bytesToRecv are read.
  237. * OR
  238. * 2. No data is received from the network for MQTT_RECV_POLLING_TIMEOUT_MS duration.
  239. *
  240. * OR
  241. * 3. There is an error in reading from the network.
  242. *
  243. *
  244. * @return Number of bytes received, or negative number on network error.
  245. */
  246. static int32_t recvExact( const MQTTContext_t * pContext,
  247. size_t bytesToRecv );
  248. /**
  249. * @brief Discard a packet from the transport interface.
  250. *
  251. * @param[in] pContext MQTT Connection context.
  252. * @param[in] remainingLength Remaining length of the packet to dump.
  253. * @param[in] timeoutMs Time remaining to discard the packet.
  254. *
  255. * @return #MQTTRecvFailed or #MQTTNoDataAvailable.
  256. */
  257. static MQTTStatus_t discardPacket( const MQTTContext_t * pContext,
  258. size_t remainingLength,
  259. uint32_t timeoutMs );
  260. /**
  261. * @brief Discard a packet from the MQTT buffer and the transport interface.
  262. *
  263. * @param[in] pContext MQTT Connection context.
  264. * @param[in] pPacketInfo Information struct of the packet to be discarded.
  265. *
  266. * @return #MQTTRecvFailed or #MQTTNoDataAvailable.
  267. */
  268. static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext,
  269. const MQTTPacketInfo_t * pPacketInfo );
  270. /**
  271. * @brief Receive a packet from the transport interface.
  272. *
  273. * @param[in] pContext MQTT Connection context.
  274. * @param[in] incomingPacket packet struct with remaining length.
  275. * @param[in] remainingTimeMs Time remaining to receive the packet.
  276. *
  277. * @return #MQTTSuccess or #MQTTRecvFailed.
  278. */
  279. static MQTTStatus_t receivePacket( const MQTTContext_t * pContext,
  280. MQTTPacketInfo_t incomingPacket,
  281. uint32_t remainingTimeMs );
  282. /**
  283. * @brief Get the correct ack type to send.
  284. *
  285. * @param[in] state Current state of publish.
  286. *
  287. * @return Packet Type byte of PUBACK, PUBREC, PUBREL, or PUBCOMP if one of
  288. * those should be sent, else 0.
  289. */
  290. static uint8_t getAckTypeToSend( MQTTPublishState_t state );
  291. /**
  292. * @brief Send acks for received QoS 1/2 publishes.
  293. *
  294. * @param[in] pContext MQTT Connection context.
  295. * @param[in] packetId packet ID of original PUBLISH.
  296. * @param[in] publishState Current publish state in record.
  297. *
  298. * @return #MQTTSuccess, #MQTTIllegalState or #MQTTSendFailed.
  299. */
  300. static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
  301. uint16_t packetId,
  302. MQTTPublishState_t publishState );
  303. /**
  304. * @brief Send a keep alive PINGREQ if the keep alive interval has elapsed.
  305. *
  306. * @param[in] pContext Initialized MQTT Context.
  307. *
  308. * @return #MQTTKeepAliveTimeout if a PINGRESP is not received in time,
  309. * #MQTTSendFailed if the PINGREQ cannot be sent, or #MQTTSuccess.
  310. */
  311. static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext );
  312. /**
  313. * @brief Handle received MQTT PUBLISH packet.
  314. *
  315. * @param[in] pContext MQTT Connection context.
  316. * @param[in] pIncomingPacket Incoming packet.
  317. *
  318. * @return MQTTSuccess, MQTTIllegalState or deserialization error.
  319. */
  320. static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
  321. MQTTPacketInfo_t * pIncomingPacket );
  322. /**
  323. * @brief Handle received MQTT publish acks.
  324. *
  325. * @param[in] pContext MQTT Connection context.
  326. * @param[in] pIncomingPacket Incoming packet.
  327. *
  328. * @return MQTTSuccess, MQTTIllegalState, or deserialization error.
  329. */
  330. static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
  331. MQTTPacketInfo_t * pIncomingPacket );
  332. /**
  333. * @brief Handle received MQTT ack.
  334. *
  335. * @param[in] pContext MQTT Connection context.
  336. * @param[in] pIncomingPacket Incoming packet.
  337. * @param[in] manageKeepAlive Flag indicating if PINGRESPs should not be given
  338. * to the application
  339. *
  340. * @return MQTTSuccess, MQTTIllegalState, or deserialization error.
  341. */
  342. static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
  343. MQTTPacketInfo_t * pIncomingPacket,
  344. bool manageKeepAlive );
  345. /**
  346. * @brief Run a single iteration of the receive loop.
  347. *
  348. * @param[in] pContext MQTT Connection context.
  349. * @param[in] manageKeepAlive Flag indicating if keep alive should be handled.
  350. *
  351. * @return #MQTTRecvFailed if a network error occurs during reception;
  352. * #MQTTSendFailed if a network error occurs while sending an ACK or PINGREQ;
  353. * #MQTTBadResponse if an invalid packet is received;
  354. * #MQTTKeepAliveTimeout if the server has not sent a PINGRESP before
  355. * #MQTT_PINGRESP_TIMEOUT_MS milliseconds;
  356. * #MQTTIllegalState if an incoming QoS 1/2 publish or ack causes an
  357. * invalid transition for the internal state machine;
  358. * #MQTTSuccess on success.
  359. */
  360. static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
  361. bool manageKeepAlive );
  362. /**
  363. * @brief Validates parameters of #MQTT_Subscribe or #MQTT_Unsubscribe.
  364. *
  365. * @param[in] pContext Initialized MQTT context.
  366. * @param[in] pSubscriptionList List of MQTT subscription info.
  367. * @param[in] subscriptionCount The number of elements in pSubscriptionList.
  368. * @param[in] packetId Packet identifier.
  369. *
  370. * @return #MQTTBadParameter if invalid parameters are passed;
  371. * #MQTTSuccess otherwise.
  372. */
  373. static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pContext,
  374. const MQTTSubscribeInfo_t * pSubscriptionList,
  375. size_t subscriptionCount,
  376. uint16_t packetId );
  377. /**
  378. * @brief Receives a CONNACK MQTT packet.
  379. *
  380. * @param[in] pContext Initialized MQTT context.
  381. * @param[in] timeoutMs Timeout for waiting for CONNACK packet.
  382. * @param[in] cleanSession Clean session flag set by application.
  383. * @param[out] pIncomingPacket List of MQTT subscription info.
  384. * @param[out] pSessionPresent Whether a previous session was present.
  385. * Only relevant if not establishing a clean session.
  386. *
  387. * @return #MQTTBadResponse if a bad response is received;
  388. * #MQTTNoDataAvailable if no data available for transport recv;
  389. * ##MQTTRecvFailed if transport recv failed;
  390. * #MQTTSuccess otherwise.
  391. */
  392. static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext,
  393. uint32_t timeoutMs,
  394. bool cleanSession,
  395. MQTTPacketInfo_t * pIncomingPacket,
  396. bool * pSessionPresent );
  397. /**
  398. * @brief Resends pending acks for a re-established MQTT session, or
  399. * clears existing state records for a clean session.
  400. *
  401. * @param[in] pContext Initialized MQTT context.
  402. * @param[in] sessionPresent Session present flag received from the MQTT broker.
  403. *
  404. * @return #MQTTSendFailed if transport send during resend failed;
  405. * #MQTTSuccess otherwise.
  406. */
  407. static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext,
  408. bool sessionPresent );
  409. /**
  410. * @brief Send the publish packet without copying the topic string and payload in
  411. * the buffer.
  412. *
  413. * @brief param[in] pContext Initialized MQTT context.
  414. * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
  415. * @brief param[in] pMqttHeader the serialized MQTT header with the header byte;
  416. * the encoded length of the packet; and the encoded length of the topic string.
  417. * @brief param[in] headerSize Size of the serialized PUBLISH header.
  418. * @brief param[in] packetId Packet Id of the publish packet.
  419. *
  420. * @return #MQTTSendFailed if transport send during resend failed;
  421. * #MQTTSuccess otherwise.
  422. */
  423. static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
  424. const MQTTPublishInfo_t * pPublishInfo,
  425. const uint8_t * pMqttHeader,
  426. size_t headerSize,
  427. uint16_t packetId );
  428. /**
  429. * @brief Function to validate #MQTT_Publish parameters.
  430. *
  431. * @brief param[in] pContext Initialized MQTT context.
  432. * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
  433. * @brief param[in] packetId Packet Id for the MQTT PUBLISH packet.
  434. *
  435. * @return #MQTTBadParameter if invalid parameters are passed;
  436. * #MQTTSuccess otherwise.
  437. */
  438. static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
  439. const MQTTPublishInfo_t * pPublishInfo,
  440. uint16_t packetId );
  441. /**
  442. * @brief Performs matching for special cases when a topic filter ends
  443. * with a wildcard character.
  444. *
  445. * When the topic name has been consumed but there are remaining characters to
  446. * to match in topic filter, this function handles the following 2 cases:
  447. * - When the topic filter ends with "/+" or "/#" characters, but the topic
  448. * name only ends with '/'.
  449. * - When the topic filter ends with "/#" characters, but the topic name
  450. * ends at the parent level.
  451. *
  452. * @note This function ASSUMES that the topic name been consumed in linear
  453. * matching with the topic filer, but the topic filter has remaining characters
  454. * to be matched.
  455. *
  456. * @param[in] pTopicFilter The topic filter containing the wildcard.
  457. * @param[in] topicFilterLength Length of the topic filter being examined.
  458. * @param[in] filterIndex Index of the topic filter being examined.
  459. *
  460. * @return Returns whether the topic filter and the topic name match.
  461. */
  462. static bool matchEndWildcardsSpecialCases( const char * pTopicFilter,
  463. uint16_t topicFilterLength,
  464. uint16_t filterIndex );
  465. /**
  466. * @brief Attempt to match topic name with a topic filter starting with a wildcard.
  467. *
  468. * If the topic filter starts with a '+' (single-level) wildcard, the function
  469. * advances the @a pNameIndex by a level in the topic name.
  470. * If the topic filter starts with a '#' (multi-level) wildcard, the function
  471. * concludes that both the topic name and topic filter match.
  472. *
  473. * @param[in] pTopicName The topic name to match.
  474. * @param[in] topicNameLength Length of the topic name.
  475. * @param[in] pTopicFilter The topic filter to match.
  476. * @param[in] topicFilterLength Length of the topic filter.
  477. * @param[in,out] pNameIndex Current index in the topic name being examined. It is
  478. * advanced by one level for `+` wildcards.
  479. * @param[in, out] pFilterIndex Current index in the topic filter being examined.
  480. * It is advanced to position of '/' level separator for '+' wildcard.
  481. * @param[out] pMatch Whether the topic filter and topic name match.
  482. *
  483. * @return `true` if the caller of this function should exit; `false` if the
  484. * caller should continue parsing the topics.
  485. */
  486. static bool matchWildcards( const char * pTopicName,
  487. uint16_t topicNameLength,
  488. const char * pTopicFilter,
  489. uint16_t topicFilterLength,
  490. uint16_t * pNameIndex,
  491. uint16_t * pFilterIndex,
  492. bool * pMatch );
  493. /**
  494. * @brief Match a topic name and topic filter allowing the use of wildcards.
  495. *
  496. * @param[in] pTopicName The topic name to check.
  497. * @param[in] topicNameLength Length of the topic name.
  498. * @param[in] pTopicFilter The topic filter to check.
  499. * @param[in] topicFilterLength Length of topic filter.
  500. *
  501. * @return `true` if the topic name and topic filter match; `false` otherwise.
  502. */
  503. static bool matchTopicFilter( const char * pTopicName,
  504. uint16_t topicNameLength,
  505. const char * pTopicFilter,
  506. uint16_t topicFilterLength );
  507. /*-----------------------------------------------------------*/
  508. static bool matchEndWildcardsSpecialCases( const char * pTopicFilter,
  509. uint16_t topicFilterLength,
  510. uint16_t filterIndex )
  511. {
  512. bool matchFound = false;
  513. assert( pTopicFilter != NULL );
  514. assert( topicFilterLength != 0U );
  515. /* Check if the topic filter has 2 remaining characters and it ends in
  516. * "/#". This check handles the case to match filter "sport/#" with topic
  517. * "sport". The reason is that the '#' wildcard represents the parent and
  518. * any number of child levels in the topic name.*/
  519. if( ( topicFilterLength >= 3U ) &&
  520. ( filterIndex == ( topicFilterLength - 3U ) ) &&
  521. ( pTopicFilter[ filterIndex + 1U ] == '/' ) &&
  522. ( pTopicFilter[ filterIndex + 2U ] == '#' ) )
  523. {
  524. matchFound = true;
  525. }
  526. /* Check if the next character is "#" or "+" and the topic filter ends in
  527. * "/#" or "/+". This check handles the cases to match:
  528. *
  529. * - Topic filter "sport/+" with topic "sport/".
  530. * - Topic filter "sport/#" with topic "sport/".
  531. */
  532. if( ( filterIndex == ( topicFilterLength - 2U ) ) &&
  533. ( pTopicFilter[ filterIndex ] == '/' ) )
  534. {
  535. /* Check that the last character is a wildcard. */
  536. matchFound = ( pTopicFilter[ filterIndex + 1U ] == '+' ) ||
  537. ( pTopicFilter[ filterIndex + 1U ] == '#' );
  538. }
  539. return matchFound;
  540. }
  541. /*-----------------------------------------------------------*/
  542. static bool matchWildcards( const char * pTopicName,
  543. uint16_t topicNameLength,
  544. const char * pTopicFilter,
  545. uint16_t topicFilterLength,
  546. uint16_t * pNameIndex,
  547. uint16_t * pFilterIndex,
  548. bool * pMatch )
  549. {
  550. bool shouldStopMatching = false;
  551. bool locationIsValidForWildcard;
  552. assert( pTopicName != NULL );
  553. assert( topicNameLength != 0U );
  554. assert( pTopicFilter != NULL );
  555. assert( topicFilterLength != 0U );
  556. assert( pNameIndex != NULL );
  557. assert( pFilterIndex != NULL );
  558. assert( pMatch != NULL );
  559. /* Wild card in a topic filter is only valid either at the starting position
  560. * or when it is preceded by a '/'.*/
  561. locationIsValidForWildcard = ( *pFilterIndex == 0u ) ||
  562. ( pTopicFilter[ *pFilterIndex - 1U ] == '/' );
  563. if( ( pTopicFilter[ *pFilterIndex ] == '+' ) && ( locationIsValidForWildcard == true ) )
  564. {
  565. bool nextLevelExistsInTopicName = false;
  566. bool nextLevelExistsinTopicFilter = false;
  567. /* Move topic name index to the end of the current level. The end of the
  568. * current level is identified by the last character before the next level
  569. * separator '/'. */
  570. while( *pNameIndex < topicNameLength )
  571. {
  572. /* Exit the loop if we hit the level separator. */
  573. if( pTopicName[ *pNameIndex ] == '/' )
  574. {
  575. nextLevelExistsInTopicName = true;
  576. break;
  577. }
  578. ( *pNameIndex )++;
  579. }
  580. /* Determine if the topic filter contains a child level after the current level
  581. * represented by the '+' wildcard. */
  582. if( ( *pFilterIndex < ( topicFilterLength - 1U ) ) &&
  583. ( pTopicFilter[ *pFilterIndex + 1U ] == '/' ) )
  584. {
  585. nextLevelExistsinTopicFilter = true;
  586. }
  587. /* If the topic name contains a child level but the topic filter ends at
  588. * the current level, then there does not exist a match. */
  589. if( ( nextLevelExistsInTopicName == true ) &&
  590. ( nextLevelExistsinTopicFilter == false ) )
  591. {
  592. *pMatch = false;
  593. shouldStopMatching = true;
  594. }
  595. /* If the topic name and topic filter have child levels, then advance the
  596. * filter index to the level separator in the topic filter, so that match
  597. * can be performed in the next level.
  598. * Note: The name index already points to the level separator in the topic
  599. * name. */
  600. else if( nextLevelExistsInTopicName == true )
  601. {
  602. ( *pFilterIndex )++;
  603. }
  604. else
  605. {
  606. /* If we have reached here, the the loop terminated on the
  607. * ( *pNameIndex < topicNameLength) condition, which means that have
  608. * reached past the end of the topic name, and thus, we decrement the
  609. * index to the last character in the topic name.*/
  610. ( *pNameIndex )--;
  611. }
  612. }
  613. /* '#' matches everything remaining in the topic name. It must be the
  614. * last character in a topic filter. */
  615. else if( ( pTopicFilter[ *pFilterIndex ] == '#' ) &&
  616. ( *pFilterIndex == ( topicFilterLength - 1U ) ) &&
  617. ( locationIsValidForWildcard == true ) )
  618. {
  619. /* Subsequent characters don't need to be checked for the
  620. * multi-level wildcard. */
  621. *pMatch = true;
  622. shouldStopMatching = true;
  623. }
  624. else
  625. {
  626. /* Any character mismatch other than '+' or '#' means the topic
  627. * name does not match the topic filter. */
  628. *pMatch = false;
  629. shouldStopMatching = true;
  630. }
  631. return shouldStopMatching;
  632. }
  633. /*-----------------------------------------------------------*/
  634. static bool matchTopicFilter( const char * pTopicName,
  635. uint16_t topicNameLength,
  636. const char * pTopicFilter,
  637. uint16_t topicFilterLength )
  638. {
  639. bool matchFound = false, shouldStopMatching = false;
  640. uint16_t nameIndex = 0, filterIndex = 0;
  641. assert( pTopicName != NULL );
  642. assert( topicNameLength != 0 );
  643. assert( pTopicFilter != NULL );
  644. assert( topicFilterLength != 0 );
  645. while( ( nameIndex < topicNameLength ) && ( filterIndex < topicFilterLength ) )
  646. {
  647. /* Check if the character in the topic name matches the corresponding
  648. * character in the topic filter string. */
  649. if( pTopicName[ nameIndex ] == pTopicFilter[ filterIndex ] )
  650. {
  651. /* If the topic name has been consumed but the topic filter has not
  652. * been consumed, match for special cases when the topic filter ends
  653. * with wildcard character. */
  654. if( nameIndex == ( topicNameLength - 1U ) )
  655. {
  656. matchFound = matchEndWildcardsSpecialCases( pTopicFilter,
  657. topicFilterLength,
  658. filterIndex );
  659. }
  660. }
  661. else
  662. {
  663. /* Check for matching wildcards. */
  664. shouldStopMatching = matchWildcards( pTopicName,
  665. topicNameLength,
  666. pTopicFilter,
  667. topicFilterLength,
  668. &nameIndex,
  669. &filterIndex,
  670. &matchFound );
  671. }
  672. if( ( matchFound == true ) || ( shouldStopMatching == true ) )
  673. {
  674. break;
  675. }
  676. /* Increment indexes. */
  677. nameIndex++;
  678. filterIndex++;
  679. }
  680. if( matchFound == false )
  681. {
  682. /* If the end of both strings has been reached, they match. This represents the
  683. * case when the topic filter contains the '+' wildcard at a non-starting position.
  684. * For example, when matching either of "sport/+/player" OR "sport/hockey/+" topic
  685. * filters with "sport/hockey/player" topic name. */
  686. matchFound = ( nameIndex == topicNameLength ) &&
  687. ( filterIndex == topicFilterLength );
  688. }
  689. return matchFound;
  690. }
  691. /*-----------------------------------------------------------*/
  692. static int32_t sendMessageVector( MQTTContext_t * pContext,
  693. TransportOutVector_t * pIoVec,
  694. size_t ioVecCount )
  695. {
  696. int32_t sendResult;
  697. uint32_t startTime;
  698. TransportOutVector_t * pIoVectIterator;
  699. size_t vectorsToBeSent = ioVecCount;
  700. size_t bytesToSend = 0U;
  701. int32_t bytesSentOrError = 0;
  702. assert( pContext != NULL );
  703. assert( pIoVec != NULL );
  704. assert( pContext->getTime != NULL );
  705. /* Send must always be defined */
  706. assert( pContext->transportInterface.send != NULL );
  707. /* Count the total number of bytes to be sent as outlined in the vector. */
  708. for( pIoVectIterator = pIoVec; pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ); pIoVectIterator++ )
  709. {
  710. bytesToSend += pIoVectIterator->iov_len;
  711. }
  712. /* Reset the iterator to point to the first entry in the array. */
  713. pIoVectIterator = pIoVec;
  714. /* Note the start time. */
  715. startTime = pContext->getTime();
  716. while( ( bytesSentOrError < ( int32_t ) bytesToSend ) && ( bytesSentOrError >= 0 ) )
  717. {
  718. if( pContext->transportInterface.writev != NULL )
  719. {
  720. sendResult = pContext->transportInterface.writev( pContext->transportInterface.pNetworkContext,
  721. pIoVectIterator,
  722. vectorsToBeSent );
  723. }
  724. else
  725. {
  726. sendResult = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext,
  727. pIoVectIterator->iov_base,
  728. pIoVectIterator->iov_len );
  729. }
  730. if( sendResult > 0 )
  731. {
  732. /* It is a bug in the application's transport send implementation if
  733. * more bytes than expected are sent. */
  734. assert( sendResult <= ( ( int32_t ) bytesToSend - bytesSentOrError ) );
  735. bytesSentOrError += sendResult;
  736. /* Set last transmission time. */
  737. pContext->lastPacketTxTime = pContext->getTime();
  738. LogDebug( ( "sendMessageVector: Bytes Sent=%ld, Bytes Remaining=%lu",
  739. ( long int ) sendResult,
  740. ( unsigned long ) ( bytesToSend - ( size_t ) bytesSentOrError ) ) );
  741. }
  742. else if( sendResult < 0 )
  743. {
  744. bytesSentOrError = sendResult;
  745. LogError( ( "sendMessageVector: Unable to send packet: Network Error." ) );
  746. }
  747. else
  748. {
  749. /* MISRA Empty body */
  750. }
  751. /* Check for timeout. */
  752. if( calculateElapsedTime( pContext->getTime(), startTime ) > MQTT_SEND_TIMEOUT_MS )
  753. {
  754. LogError( ( "sendMessageVector: Unable to send packet: Timed out." ) );
  755. break;
  756. }
  757. /* Update the send pointer to the correct vector and offset. */
  758. while( ( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) &&
  759. ( sendResult >= ( int32_t ) pIoVectIterator->iov_len ) )
  760. {
  761. sendResult -= ( int32_t ) pIoVectIterator->iov_len;
  762. pIoVectIterator++;
  763. /* Update the number of vector which are yet to be sent. */
  764. vectorsToBeSent--;
  765. }
  766. /* Some of the bytes from this vector were sent as well, update the length
  767. * and the pointer to data in this vector. */
  768. if( ( sendResult > 0 ) &&
  769. ( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) )
  770. {
  771. pIoVectIterator->iov_base = ( const void * ) &( ( ( const uint8_t * ) pIoVectIterator->iov_base )[ sendResult ] );
  772. pIoVectIterator->iov_len -= ( size_t ) sendResult;
  773. }
  774. }
  775. return bytesSentOrError;
  776. }
  777. static int32_t sendBuffer( MQTTContext_t * pContext,
  778. const uint8_t * pBufferToSend,
  779. size_t bytesToSend )
  780. {
  781. int32_t sendResult;
  782. uint32_t timeoutMs;
  783. int32_t bytesSentOrError = 0;
  784. const uint8_t * pIndex = pBufferToSend;
  785. assert( pContext != NULL );
  786. assert( pContext->getTime != NULL );
  787. assert( pContext->transportInterface.send != NULL );
  788. assert( pIndex != NULL );
  789. /* Set the timeout. */
  790. timeoutMs = pContext->getTime() + MQTT_SEND_TIMEOUT_MS;
  791. while( ( bytesSentOrError < ( int32_t ) bytesToSend ) && ( bytesSentOrError >= 0 ) )
  792. {
  793. sendResult = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext,
  794. pIndex,
  795. bytesToSend - ( size_t ) bytesSentOrError );
  796. if( sendResult > 0 )
  797. {
  798. /* It is a bug in the application's transport send implementation if
  799. * more bytes than expected are sent. */
  800. assert( sendResult <= ( ( int32_t ) bytesToSend - bytesSentOrError ) );
  801. bytesSentOrError += sendResult;
  802. pIndex = &pIndex[ sendResult ];
  803. /* Set last transmission time. */
  804. pContext->lastPacketTxTime = pContext->getTime();
  805. LogDebug( ( "sendBuffer: Bytes Sent=%ld, Bytes Remaining=%lu",
  806. ( long int ) sendResult,
  807. ( unsigned long ) ( bytesToSend - ( size_t ) bytesSentOrError ) ) );
  808. }
  809. else if( sendResult < 0 )
  810. {
  811. bytesSentOrError = sendResult;
  812. LogError( ( "sendBuffer: Unable to send packet: Network Error." ) );
  813. }
  814. else
  815. {
  816. /* MISRA Empty body */
  817. }
  818. /* Check for timeout. */
  819. if( pContext->getTime() >= timeoutMs )
  820. {
  821. LogError( ( "sendBuffer: Unable to send packet: Timed out." ) );
  822. break;
  823. }
  824. }
  825. return bytesSentOrError;
  826. }
  827. /*-----------------------------------------------------------*/
  828. static uint32_t calculateElapsedTime( uint32_t later,
  829. uint32_t start )
  830. {
  831. return later - start;
  832. }
  833. /*-----------------------------------------------------------*/
  834. static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType )
  835. {
  836. MQTTPubAckType_t ackType = MQTTPuback;
  837. switch( packetType )
  838. {
  839. case MQTT_PACKET_TYPE_PUBACK:
  840. ackType = MQTTPuback;
  841. break;
  842. case MQTT_PACKET_TYPE_PUBREC:
  843. ackType = MQTTPubrec;
  844. break;
  845. case MQTT_PACKET_TYPE_PUBREL:
  846. ackType = MQTTPubrel;
  847. break;
  848. case MQTT_PACKET_TYPE_PUBCOMP:
  849. default:
  850. /* This function is only called after checking the type is one of
  851. * the above four values, so packet type must be PUBCOMP here. */
  852. assert( packetType == MQTT_PACKET_TYPE_PUBCOMP );
  853. ackType = MQTTPubcomp;
  854. break;
  855. }
  856. return ackType;
  857. }
  858. /*-----------------------------------------------------------*/
  859. static int32_t recvExact( const MQTTContext_t * pContext,
  860. size_t bytesToRecv )
  861. {
  862. uint8_t * pIndex = NULL;
  863. size_t bytesRemaining = bytesToRecv;
  864. int32_t totalBytesRecvd = 0, bytesRecvd;
  865. uint32_t lastDataRecvTimeMs = 0U, timeSinceLastRecvMs = 0U;
  866. TransportRecv_t recvFunc = NULL;
  867. MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
  868. bool receiveError = false;
  869. assert( pContext != NULL );
  870. assert( bytesToRecv <= pContext->networkBuffer.size );
  871. assert( pContext->getTime != NULL );
  872. assert( pContext->transportInterface.recv != NULL );
  873. assert( pContext->networkBuffer.pBuffer != NULL );
  874. pIndex = pContext->networkBuffer.pBuffer;
  875. recvFunc = pContext->transportInterface.recv;
  876. getTimeStampMs = pContext->getTime;
  877. /* Part of the MQTT packet has been read before calling this function. */
  878. lastDataRecvTimeMs = getTimeStampMs();
  879. while( ( bytesRemaining > 0U ) && ( receiveError == false ) )
  880. {
  881. bytesRecvd = recvFunc( pContext->transportInterface.pNetworkContext,
  882. pIndex,
  883. bytesRemaining );
  884. if( bytesRecvd < 0 )
  885. {
  886. LogError( ( "Network error while receiving packet: ReturnCode=%ld.",
  887. ( long int ) bytesRecvd ) );
  888. totalBytesRecvd = bytesRecvd;
  889. receiveError = true;
  890. }
  891. else if( bytesRecvd > 0 )
  892. {
  893. /* Reset the starting time as we have received some data from the network. */
  894. lastDataRecvTimeMs = getTimeStampMs();
  895. /* It is a bug in the application's transport receive implementation
  896. * if more bytes than expected are received. To avoid a possible
  897. * overflow in converting bytesRemaining from unsigned to signed,
  898. * this assert must exist after the check for bytesRecvd being
  899. * negative. */
  900. assert( ( size_t ) bytesRecvd <= bytesRemaining );
  901. bytesRemaining -= ( size_t ) bytesRecvd;
  902. totalBytesRecvd += ( int32_t ) bytesRecvd;
  903. /* Increment the index. */
  904. pIndex = &pIndex[ bytesRecvd ];
  905. LogDebug( ( "BytesReceived=%ld, BytesRemaining=%lu, TotalBytesReceived=%ld.",
  906. ( long int ) bytesRecvd,
  907. ( unsigned long ) bytesRemaining,
  908. ( long int ) totalBytesRecvd ) );
  909. }
  910. else
  911. {
  912. /* No bytes were read from the network. */
  913. timeSinceLastRecvMs = calculateElapsedTime( getTimeStampMs(), lastDataRecvTimeMs );
  914. /* Check for timeout if we have been waiting to receive any byte on the network. */
  915. if( timeSinceLastRecvMs >= MQTT_RECV_POLLING_TIMEOUT_MS )
  916. {
  917. LogError( ( "Unable to receive packet: Timed out in transport recv." ) );
  918. receiveError = true;
  919. }
  920. }
  921. }
  922. return totalBytesRecvd;
  923. }
  924. /*-----------------------------------------------------------*/
  925. static MQTTStatus_t discardPacket( const MQTTContext_t * pContext,
  926. size_t remainingLength,
  927. uint32_t timeoutMs )
  928. {
  929. MQTTStatus_t status = MQTTRecvFailed;
  930. int32_t bytesReceived = 0;
  931. size_t bytesToReceive = 0U;
  932. uint32_t totalBytesReceived = 0U;
  933. uint32_t entryTimeMs = 0U;
  934. uint32_t elapsedTimeMs = 0U;
  935. MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
  936. bool receiveError = false;
  937. assert( pContext != NULL );
  938. assert( pContext->getTime != NULL );
  939. bytesToReceive = pContext->networkBuffer.size;
  940. getTimeStampMs = pContext->getTime;
  941. entryTimeMs = getTimeStampMs();
  942. while( ( totalBytesReceived < remainingLength ) && ( receiveError == false ) )
  943. {
  944. if( ( remainingLength - totalBytesReceived ) < bytesToReceive )
  945. {
  946. bytesToReceive = remainingLength - totalBytesReceived;
  947. }
  948. bytesReceived = recvExact( pContext, bytesToReceive );
  949. if( bytesReceived != ( int32_t ) bytesToReceive )
  950. {
  951. LogError( ( "Receive error while discarding packet."
  952. "ReceivedBytes=%ld, ExpectedBytes=%lu.",
  953. ( long int ) bytesReceived,
  954. ( unsigned long ) bytesToReceive ) );
  955. receiveError = true;
  956. }
  957. else
  958. {
  959. totalBytesReceived += ( uint32_t ) bytesReceived;
  960. elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs );
  961. /* Check for timeout. */
  962. if( elapsedTimeMs >= timeoutMs )
  963. {
  964. LogError( ( "Time expired while discarding packet." ) );
  965. receiveError = true;
  966. }
  967. }
  968. }
  969. if( totalBytesReceived == remainingLength )
  970. {
  971. LogError( ( "Dumped packet. DumpedBytes=%lu.",
  972. ( unsigned long ) totalBytesReceived ) );
  973. /* Packet dumped, so no data is available. */
  974. status = MQTTNoDataAvailable;
  975. }
  976. return status;
  977. }
  978. /*-----------------------------------------------------------*/
  979. static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext,
  980. const MQTTPacketInfo_t * pPacketInfo )
  981. {
  982. MQTTStatus_t status = MQTTRecvFailed;
  983. int32_t bytesReceived = 0;
  984. size_t bytesToReceive = 0U;
  985. uint32_t totalBytesReceived = 0U;
  986. bool receiveError = false;
  987. size_t mqttPacketSize = 0;
  988. size_t remainingLength;
  989. assert( pContext != NULL );
  990. assert( pPacketInfo != NULL );
  991. mqttPacketSize = pPacketInfo->remainingLength + pPacketInfo->headerLength;
  992. /* Assert that the packet being discarded is bigger than the
  993. * receive buffer. */
  994. assert( mqttPacketSize > pContext->networkBuffer.size );
  995. /* Discard these many bytes at a time. */
  996. bytesToReceive = pContext->networkBuffer.size;
  997. /* Number of bytes depicted by 'index' have already been received. */
  998. remainingLength = mqttPacketSize - pContext->index;
  999. while( ( totalBytesReceived < remainingLength ) && ( receiveError == false ) )
  1000. {
  1001. if( ( remainingLength - totalBytesReceived ) < bytesToReceive )
  1002. {
  1003. bytesToReceive = remainingLength - totalBytesReceived;
  1004. }
  1005. bytesReceived = recvExact( pContext, bytesToReceive );
  1006. if( bytesReceived != ( int32_t ) bytesToReceive )
  1007. {
  1008. LogError( ( "Receive error while discarding packet."
  1009. "ReceivedBytes=%ld, ExpectedBytes=%lu.",
  1010. ( long int ) bytesReceived,
  1011. ( unsigned long ) bytesToReceive ) );
  1012. receiveError = true;
  1013. }
  1014. else
  1015. {
  1016. totalBytesReceived += ( uint32_t ) bytesReceived;
  1017. }
  1018. }
  1019. if( totalBytesReceived == remainingLength )
  1020. {
  1021. LogError( ( "Dumped packet. DumpedBytes=%lu.",
  1022. ( unsigned long ) totalBytesReceived ) );
  1023. /* Packet dumped, so no data is available. */
  1024. status = MQTTNoDataAvailable;
  1025. }
  1026. /* Clear the buffer */
  1027. ( void ) memset( pContext->networkBuffer.pBuffer,
  1028. 0,
  1029. pContext->networkBuffer.size );
  1030. /* Reset the index. */
  1031. pContext->index = 0;
  1032. return status;
  1033. }
  1034. /*-----------------------------------------------------------*/
  1035. static MQTTStatus_t receivePacket( const MQTTContext_t * pContext,
  1036. MQTTPacketInfo_t incomingPacket,
  1037. uint32_t remainingTimeMs )
  1038. {
  1039. MQTTStatus_t status = MQTTSuccess;
  1040. int32_t bytesReceived = 0;
  1041. size_t bytesToReceive = 0U;
  1042. assert( pContext != NULL );
  1043. assert( pContext->networkBuffer.pBuffer != NULL );
  1044. if( incomingPacket.remainingLength > pContext->networkBuffer.size )
  1045. {
  1046. LogError( ( "Incoming packet will be dumped: "
  1047. "Packet length exceeds network buffer size."
  1048. "PacketSize=%lu, NetworkBufferSize=%lu.",
  1049. ( unsigned long ) incomingPacket.remainingLength,
  1050. ( unsigned long ) pContext->networkBuffer.size ) );
  1051. status = discardPacket( pContext,
  1052. incomingPacket.remainingLength,
  1053. remainingTimeMs );
  1054. }
  1055. else
  1056. {
  1057. bytesToReceive = incomingPacket.remainingLength;
  1058. bytesReceived = recvExact( pContext, bytesToReceive );
  1059. if( bytesReceived == ( int32_t ) bytesToReceive )
  1060. {
  1061. /* Receive successful, bytesReceived == bytesToReceive. */
  1062. LogDebug( ( "Packet received. ReceivedBytes=%ld.",
  1063. ( long int ) bytesReceived ) );
  1064. }
  1065. else
  1066. {
  1067. LogError( ( "Packet reception failed. ReceivedBytes=%ld, "
  1068. "ExpectedBytes=%lu.",
  1069. ( long int ) bytesReceived,
  1070. ( unsigned long ) bytesToReceive ) );
  1071. status = MQTTRecvFailed;
  1072. }
  1073. }
  1074. return status;
  1075. }
  1076. /*-----------------------------------------------------------*/
  1077. static uint8_t getAckTypeToSend( MQTTPublishState_t state )
  1078. {
  1079. uint8_t packetTypeByte = 0U;
  1080. switch( state )
  1081. {
  1082. case MQTTPubAckSend:
  1083. packetTypeByte = MQTT_PACKET_TYPE_PUBACK;
  1084. break;
  1085. case MQTTPubRecSend:
  1086. packetTypeByte = MQTT_PACKET_TYPE_PUBREC;
  1087. break;
  1088. case MQTTPubRelSend:
  1089. packetTypeByte = MQTT_PACKET_TYPE_PUBREL;
  1090. break;
  1091. case MQTTPubCompSend:
  1092. packetTypeByte = MQTT_PACKET_TYPE_PUBCOMP;
  1093. break;
  1094. case MQTTPubAckPending:
  1095. case MQTTPubCompPending:
  1096. case MQTTPubRecPending:
  1097. case MQTTPubRelPending:
  1098. case MQTTPublishDone:
  1099. case MQTTPublishSend:
  1100. case MQTTStateNull:
  1101. default:
  1102. /* Take no action for states that do not require sending an ack. */
  1103. break;
  1104. }
  1105. return packetTypeByte;
  1106. }
  1107. /*-----------------------------------------------------------*/
  1108. static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
  1109. uint16_t packetId,
  1110. MQTTPublishState_t publishState )
  1111. {
  1112. MQTTStatus_t status = MQTTSuccess;
  1113. MQTTPublishState_t newState = MQTTStateNull;
  1114. int32_t sendResult = 0;
  1115. uint8_t packetTypeByte = 0U;
  1116. MQTTPubAckType_t packetType;
  1117. MQTTFixedBuffer_t localBuffer;
  1118. uint8_t pubAckPacket[ MQTT_PUBLISH_ACK_PACKET_SIZE ];
  1119. localBuffer.pBuffer = pubAckPacket;
  1120. localBuffer.size = MQTT_PUBLISH_ACK_PACKET_SIZE;
  1121. assert( pContext != NULL );
  1122. packetTypeByte = getAckTypeToSend( publishState );
  1123. if( packetTypeByte != 0U )
  1124. {
  1125. packetType = getAckFromPacketType( packetTypeByte );
  1126. status = MQTT_SerializeAck( &localBuffer,
  1127. packetTypeByte,
  1128. packetId );
  1129. if( status == MQTTSuccess )
  1130. {
  1131. MQTT_PRE_SEND_HOOK( pContext );
  1132. /* Here, we are not using the vector approach for efficiency. There is just one buffer
  1133. * to be sent which can be achieved with a normal send call. */
  1134. sendResult = sendBuffer( pContext,
  1135. localBuffer.pBuffer,
  1136. MQTT_PUBLISH_ACK_PACKET_SIZE );
  1137. MQTT_POST_SEND_HOOK( pContext );
  1138. }
  1139. if( sendResult == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE )
  1140. {
  1141. pContext->controlPacketSent = true;
  1142. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  1143. status = MQTT_UpdateStateAck( pContext,
  1144. packetId,
  1145. packetType,
  1146. MQTT_SEND,
  1147. &newState );
  1148. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  1149. if( status != MQTTSuccess )
  1150. {
  1151. LogError( ( "Failed to update state of publish %hu.",
  1152. ( unsigned short ) packetId ) );
  1153. }
  1154. }
  1155. else
  1156. {
  1157. LogError( ( "Failed to send ACK packet: PacketType=%02x, SentBytes=%ld, "
  1158. "PacketSize=%lu.",
  1159. ( unsigned int ) packetTypeByte, ( long int ) sendResult,
  1160. MQTT_PUBLISH_ACK_PACKET_SIZE ) );
  1161. status = MQTTSendFailed;
  1162. }
  1163. }
  1164. return status;
  1165. }
  1166. /*-----------------------------------------------------------*/
  1167. static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext )
  1168. {
  1169. MQTTStatus_t status = MQTTSuccess;
  1170. uint32_t now = 0U;
  1171. uint32_t packetTxTimeoutMs = 0U;
  1172. uint32_t lastPacketTxTime = 0U;
  1173. assert( pContext != NULL );
  1174. assert( pContext->getTime != NULL );
  1175. now = pContext->getTime();
  1176. packetTxTimeoutMs = 1000U * ( uint32_t ) pContext->keepAliveIntervalSec;
  1177. if( PACKET_TX_TIMEOUT_MS < packetTxTimeoutMs )
  1178. {
  1179. packetTxTimeoutMs = PACKET_TX_TIMEOUT_MS;
  1180. }
  1181. /* If keep alive interval is 0, it is disabled. */
  1182. if( pContext->waitingForPingResp == true )
  1183. {
  1184. /* Has time expired? */
  1185. if( calculateElapsedTime( now, pContext->pingReqSendTimeMs ) >
  1186. MQTT_PINGRESP_TIMEOUT_MS )
  1187. {
  1188. status = MQTTKeepAliveTimeout;
  1189. }
  1190. }
  1191. else
  1192. {
  1193. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  1194. lastPacketTxTime = pContext->lastPacketTxTime;
  1195. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  1196. if( ( packetTxTimeoutMs != 0U ) && ( calculateElapsedTime( now, lastPacketTxTime ) >= packetTxTimeoutMs ) )
  1197. {
  1198. status = MQTT_Ping( pContext );
  1199. }
  1200. else
  1201. {
  1202. const uint32_t timeElapsed = calculateElapsedTime( now, pContext->lastPacketRxTime );
  1203. if( ( timeElapsed != 0U ) && ( timeElapsed >= PACKET_RX_TIMEOUT_MS ) )
  1204. {
  1205. status = MQTT_Ping( pContext );
  1206. }
  1207. }
  1208. }
  1209. return status;
  1210. }
  1211. /*-----------------------------------------------------------*/
  1212. static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
  1213. MQTTPacketInfo_t * pIncomingPacket )
  1214. {
  1215. MQTTStatus_t status = MQTTBadParameter;
  1216. MQTTPublishState_t publishRecordState = MQTTStateNull;
  1217. uint16_t packetIdentifier = 0U;
  1218. MQTTPublishInfo_t publishInfo;
  1219. MQTTDeserializedInfo_t deserializedInfo;
  1220. bool duplicatePublish = false;
  1221. assert( pContext != NULL );
  1222. assert( pIncomingPacket != NULL );
  1223. assert( pContext->appCallback != NULL );
  1224. status = MQTT_DeserializePublish( pIncomingPacket, &packetIdentifier, &publishInfo );
  1225. LogInfo( ( "De-serialized incoming PUBLISH packet: DeserializerResult=%s.",
  1226. MQTT_Status_strerror( status ) ) );
  1227. if( ( status == MQTTSuccess ) &&
  1228. ( pContext->incomingPublishRecords == NULL ) &&
  1229. ( publishInfo.qos > MQTTQoS0 ) )
  1230. {
  1231. LogError( ( "Incoming publish has QoS > MQTTQoS0 but incoming "
  1232. "publish records have not been initialized. Dropping the "
  1233. "incoming publish. Please call MQTT_InitStatefulQoS to enable "
  1234. "use of QoS1 and QoS2 publishes." ) );
  1235. status = MQTTRecvFailed;
  1236. }
  1237. if( status == MQTTSuccess )
  1238. {
  1239. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  1240. status = MQTT_UpdateStatePublish( pContext,
  1241. packetIdentifier,
  1242. MQTT_RECEIVE,
  1243. publishInfo.qos,
  1244. &publishRecordState );
  1245. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  1246. if( status == MQTTSuccess )
  1247. {
  1248. LogInfo( ( "State record updated. New state=%s.",
  1249. MQTT_State_strerror( publishRecordState ) ) );
  1250. }
  1251. /* Different cases in which an incoming publish with duplicate flag is
  1252. * handled are as listed below.
  1253. * 1. No collision - This is the first instance of the incoming publish
  1254. * packet received or an earlier received packet state is lost. This
  1255. * will be handled as a new incoming publish for both QoS1 and QoS2
  1256. * publishes.
  1257. * 2. Collision - The incoming packet was received before and a state
  1258. * record is present in the state engine. For QoS1 and QoS2 publishes
  1259. * this case can happen at 2 different cases and handling is
  1260. * different.
  1261. * a. QoS1 - If a PUBACK is not successfully sent for the incoming
  1262. * publish due to a connection issue, it can result in broker
  1263. * sending out a duplicate publish with dup flag set, when a
  1264. * session is reestablished. It can result in a collision in
  1265. * state engine. This will be handled by processing the incoming
  1266. * publish as a new publish ignoring the
  1267. * #MQTTStateCollision status from the state engine. The publish
  1268. * data is not passed to the application.
  1269. * b. QoS2 - If a PUBREC is not successfully sent for the incoming
  1270. * publish or the PUBREC sent is not successfully received by the
  1271. * broker due to a connection issue, it can result in broker
  1272. * sending out a duplicate publish with dup flag set, when a
  1273. * session is reestablished. It can result in a collision in
  1274. * state engine. This will be handled by ignoring the
  1275. * #MQTTStateCollision status from the state engine. The publish
  1276. * data is not passed to the application. */
  1277. else if( status == MQTTStateCollision )
  1278. {
  1279. status = MQTTSuccess;
  1280. duplicatePublish = true;
  1281. /* Calculate the state for the ack packet that needs to be sent out
  1282. * for the duplicate incoming publish. */
  1283. publishRecordState = MQTT_CalculateStatePublish( MQTT_RECEIVE,
  1284. publishInfo.qos );
  1285. LogDebug( ( "Incoming publish packet with packet id %hu already exists.",
  1286. ( unsigned short ) packetIdentifier ) );
  1287. if( publishInfo.dup == false )
  1288. {
  1289. LogError( ( "DUP flag is 0 for duplicate packet (MQTT-3.3.1.-1)." ) );
  1290. }
  1291. }
  1292. else
  1293. {
  1294. LogError( ( "Error in updating publish state for incoming publish with packet id %hu."
  1295. " Error is %s",
  1296. ( unsigned short ) packetIdentifier,
  1297. MQTT_Status_strerror( status ) ) );
  1298. }
  1299. }
  1300. if( status == MQTTSuccess )
  1301. {
  1302. /* Set fields of deserialized struct. */
  1303. deserializedInfo.packetIdentifier = packetIdentifier;
  1304. deserializedInfo.pPublishInfo = &publishInfo;
  1305. deserializedInfo.deserializationResult = status;
  1306. /* Invoke application callback to hand the buffer over to application
  1307. * before sending acks.
  1308. * Application callback will be invoked for all publishes, except for
  1309. * duplicate incoming publishes. */
  1310. if( duplicatePublish == false )
  1311. {
  1312. pContext->appCallback( pContext,
  1313. pIncomingPacket,
  1314. &deserializedInfo );
  1315. }
  1316. /* Send PUBACK or PUBREC if necessary. */
  1317. status = sendPublishAcks( pContext,
  1318. packetIdentifier,
  1319. publishRecordState );
  1320. }
  1321. return status;
  1322. }
  1323. /*-----------------------------------------------------------*/
  1324. static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
  1325. MQTTPacketInfo_t * pIncomingPacket )
  1326. {
  1327. MQTTStatus_t status = MQTTBadResponse;
  1328. MQTTPublishState_t publishRecordState = MQTTStateNull;
  1329. uint16_t packetIdentifier;
  1330. MQTTPubAckType_t ackType;
  1331. MQTTEventCallback_t appCallback;
  1332. MQTTDeserializedInfo_t deserializedInfo;
  1333. assert( pContext != NULL );
  1334. assert( pIncomingPacket != NULL );
  1335. assert( pContext->appCallback != NULL );
  1336. appCallback = pContext->appCallback;
  1337. ackType = getAckFromPacketType( pIncomingPacket->type );
  1338. status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL );
  1339. LogInfo( ( "Ack packet deserialized with result: %s.",
  1340. MQTT_Status_strerror( status ) ) );
  1341. if( status == MQTTSuccess )
  1342. {
  1343. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  1344. status = MQTT_UpdateStateAck( pContext,
  1345. packetIdentifier,
  1346. ackType,
  1347. MQTT_RECEIVE,
  1348. &publishRecordState );
  1349. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  1350. if( status == MQTTSuccess )
  1351. {
  1352. LogInfo( ( "State record updated. New state=%s.",
  1353. MQTT_State_strerror( publishRecordState ) ) );
  1354. }
  1355. else
  1356. {
  1357. LogError( ( "Updating the state engine for packet id %hu"
  1358. " failed with error %s.",
  1359. ( unsigned short ) packetIdentifier,
  1360. MQTT_Status_strerror( status ) ) );
  1361. }
  1362. }
  1363. if( status == MQTTSuccess )
  1364. {
  1365. /* Set fields of deserialized struct. */
  1366. deserializedInfo.packetIdentifier = packetIdentifier;
  1367. deserializedInfo.deserializationResult = status;
  1368. deserializedInfo.pPublishInfo = NULL;
  1369. /* Invoke application callback to hand the buffer over to application
  1370. * before sending acks. */
  1371. appCallback( pContext, pIncomingPacket, &deserializedInfo );
  1372. /* Send PUBREL or PUBCOMP if necessary. */
  1373. status = sendPublishAcks( pContext,
  1374. packetIdentifier,
  1375. publishRecordState );
  1376. }
  1377. return status;
  1378. }
  1379. /*-----------------------------------------------------------*/
  1380. static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
  1381. MQTTPacketInfo_t * pIncomingPacket,
  1382. bool manageKeepAlive )
  1383. {
  1384. MQTTStatus_t status = MQTTBadResponse;
  1385. uint16_t packetIdentifier = MQTT_PACKET_ID_INVALID;
  1386. MQTTDeserializedInfo_t deserializedInfo;
  1387. /* We should always invoke the app callback unless we receive a PINGRESP
  1388. * and are managing keep alive, or if we receive an unknown packet. We
  1389. * initialize this to false since the callback must be invoked before
  1390. * sending any PUBREL or PUBCOMP. However, for other cases, we invoke it
  1391. * at the end to reduce the complexity of this function. */
  1392. bool invokeAppCallback = false;
  1393. MQTTEventCallback_t appCallback = NULL;
  1394. assert( pContext != NULL );
  1395. assert( pIncomingPacket != NULL );
  1396. assert( pContext->appCallback != NULL );
  1397. appCallback = pContext->appCallback;
  1398. LogDebug( ( "Received packet of type %02x.",
  1399. ( unsigned int ) pIncomingPacket->type ) );
  1400. switch( pIncomingPacket->type )
  1401. {
  1402. case MQTT_PACKET_TYPE_PUBACK:
  1403. case MQTT_PACKET_TYPE_PUBREC:
  1404. case MQTT_PACKET_TYPE_PUBREL:
  1405. case MQTT_PACKET_TYPE_PUBCOMP:
  1406. /* Handle all the publish acks. The app callback is invoked here. */
  1407. status = handlePublishAcks( pContext, pIncomingPacket );
  1408. break;
  1409. case MQTT_PACKET_TYPE_PINGRESP:
  1410. status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL );
  1411. invokeAppCallback = ( status == MQTTSuccess ) && !manageKeepAlive;
  1412. if( ( status == MQTTSuccess ) && ( manageKeepAlive == true ) )
  1413. {
  1414. pContext->waitingForPingResp = false;
  1415. }
  1416. break;
  1417. case MQTT_PACKET_TYPE_SUBACK:
  1418. case MQTT_PACKET_TYPE_UNSUBACK:
  1419. /* Deserialize and give these to the app provided callback. */
  1420. status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL );
  1421. invokeAppCallback = ( status == MQTTSuccess ) || ( status == MQTTServerRefused );
  1422. break;
  1423. default:
  1424. /* Bad response from the server. */
  1425. LogError( ( "Unexpected packet type from server: PacketType=%02x.",
  1426. ( unsigned int ) pIncomingPacket->type ) );
  1427. status = MQTTBadResponse;
  1428. break;
  1429. }
  1430. if( invokeAppCallback == true )
  1431. {
  1432. /* Set fields of deserialized struct. */
  1433. deserializedInfo.packetIdentifier = packetIdentifier;
  1434. deserializedInfo.deserializationResult = status;
  1435. deserializedInfo.pPublishInfo = NULL;
  1436. appCallback( pContext, pIncomingPacket, &deserializedInfo );
  1437. /* In case a SUBACK indicated refusal, reset the status to continue the loop. */
  1438. status = MQTTSuccess;
  1439. }
  1440. return status;
  1441. }
  1442. /*-----------------------------------------------------------*/
  1443. static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
  1444. bool manageKeepAlive )
  1445. {
  1446. MQTTStatus_t status = MQTTSuccess;
  1447. MQTTPacketInfo_t incomingPacket = { 0 };
  1448. int32_t recvBytes;
  1449. size_t totalMQTTPacketLength = 0;
  1450. assert( pContext != NULL );
  1451. assert( pContext->networkBuffer.pBuffer != NULL );
  1452. /* Read as many bytes as possible into the network buffer. */
  1453. recvBytes = pContext->transportInterface.recv( pContext->transportInterface.pNetworkContext,
  1454. &( pContext->networkBuffer.pBuffer[ pContext->index ] ),
  1455. pContext->networkBuffer.size - pContext->index );
  1456. if( recvBytes < 0 )
  1457. {
  1458. /* The receive function has failed. Bubble up the error up to the user. */
  1459. status = MQTTRecvFailed;
  1460. }
  1461. else if( ( recvBytes == 0 ) && ( pContext->index == 0U ) )
  1462. {
  1463. /* No more bytes available since the last read and neither is anything in
  1464. * the buffer. */
  1465. status = MQTTNoDataAvailable;
  1466. }
  1467. /* Either something was received, or there is still data to be processed in the
  1468. * buffer, or both. */
  1469. else
  1470. {
  1471. /* Update the number of bytes in the MQTT fixed buffer. */
  1472. pContext->index += ( size_t ) recvBytes;
  1473. status = MQTT_ProcessIncomingPacketTypeAndLength( pContext->networkBuffer.pBuffer,
  1474. &( pContext->index ),
  1475. &incomingPacket );
  1476. totalMQTTPacketLength = incomingPacket.remainingLength + incomingPacket.headerLength;
  1477. }
  1478. /* No data was received, check for keep alive timeout. */
  1479. if( recvBytes == 0 )
  1480. {
  1481. if( manageKeepAlive == true )
  1482. {
  1483. /* Keep the copy of the status to be reset later. */
  1484. MQTTStatus_t statusCopy = status;
  1485. /* Assign status so an error can be bubbled up to application,
  1486. * but reset it on success. */
  1487. status = handleKeepAlive( pContext );
  1488. if( status == MQTTSuccess )
  1489. {
  1490. /* Reset the status. */
  1491. status = statusCopy;
  1492. }
  1493. else
  1494. {
  1495. LogError( ( "Handling of keep alive failed. Status=%s",
  1496. MQTT_Status_strerror( status ) ) );
  1497. }
  1498. }
  1499. }
  1500. /* Check whether there is data available before processing the packet further. */
  1501. if( ( status == MQTTNeedMoreBytes ) || ( status == MQTTNoDataAvailable ) )
  1502. {
  1503. /* Do nothing as there is nothing to be processed right now. The proper
  1504. * error code will be bubbled up to the user. */
  1505. }
  1506. /* Any other error code. */
  1507. else if( status != MQTTSuccess )
  1508. {
  1509. LogError( ( "Call to receiveSingleIteration failed. Status=%s",
  1510. MQTT_Status_strerror( status ) ) );
  1511. }
  1512. /* If the MQTT Packet size is bigger than the buffer itself. */
  1513. else if( totalMQTTPacketLength > pContext->networkBuffer.size )
  1514. {
  1515. /* Discard the packet from the receive buffer and drain the pending
  1516. * data from the socket buffer. */
  1517. status = discardStoredPacket( pContext,
  1518. &incomingPacket );
  1519. }
  1520. /* If the total packet is of more length than the bytes we have available. */
  1521. else if( totalMQTTPacketLength > pContext->index )
  1522. {
  1523. status = MQTTNeedMoreBytes;
  1524. }
  1525. else
  1526. {
  1527. /* MISRA else. */
  1528. }
  1529. /* Handle received packet. If incomplete data was read then this will not execute. */
  1530. if( status == MQTTSuccess )
  1531. {
  1532. incomingPacket.pRemainingData = &pContext->networkBuffer.pBuffer[ incomingPacket.headerLength ];
  1533. /* PUBLISH packets allow flags in the lower four bits. For other
  1534. * packet types, they are reserved. */
  1535. if( ( incomingPacket.type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH )
  1536. {
  1537. status = handleIncomingPublish( pContext, &incomingPacket );
  1538. }
  1539. else
  1540. {
  1541. status = handleIncomingAck( pContext, &incomingPacket, manageKeepAlive );
  1542. }
  1543. /* Update the index to reflect the remaining bytes in the buffer. */
  1544. pContext->index -= totalMQTTPacketLength;
  1545. /* Move the remaining bytes to the front of the buffer. */
  1546. ( void ) memmove( pContext->networkBuffer.pBuffer,
  1547. &( pContext->networkBuffer.pBuffer[ totalMQTTPacketLength ] ),
  1548. pContext->index );
  1549. if( status == MQTTSuccess )
  1550. {
  1551. pContext->lastPacketRxTime = pContext->getTime();
  1552. }
  1553. }
  1554. if( status == MQTTNoDataAvailable )
  1555. {
  1556. /* No data available is not an error. Reset to MQTTSuccess so the
  1557. * return code will indicate success. */
  1558. status = MQTTSuccess;
  1559. }
  1560. return status;
  1561. }
  1562. /*-----------------------------------------------------------*/
  1563. static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pContext,
  1564. const MQTTSubscribeInfo_t * pSubscriptionList,
  1565. size_t subscriptionCount,
  1566. uint16_t packetId )
  1567. {
  1568. MQTTStatus_t status = MQTTSuccess;
  1569. size_t iterator;
  1570. /* Validate all the parameters. */
  1571. if( ( pContext == NULL ) || ( pSubscriptionList == NULL ) )
  1572. {
  1573. LogError( ( "Argument cannot be NULL: pContext=%p, "
  1574. "pSubscriptionList=%p.",
  1575. ( void * ) pContext,
  1576. ( void * ) pSubscriptionList ) );
  1577. status = MQTTBadParameter;
  1578. }
  1579. else if( subscriptionCount == 0UL )
  1580. {
  1581. LogError( ( "Subscription count is 0." ) );
  1582. status = MQTTBadParameter;
  1583. }
  1584. else if( packetId == 0U )
  1585. {
  1586. LogError( ( "Packet Id for subscription packet is 0." ) );
  1587. status = MQTTBadParameter;
  1588. }
  1589. else
  1590. {
  1591. if( pContext->incomingPublishRecords == NULL )
  1592. {
  1593. for( iterator = 0; iterator < subscriptionCount; iterator++ )
  1594. {
  1595. if( pSubscriptionList->qos > MQTTQoS0 )
  1596. {
  1597. LogError( ( "The incoming publish record list is not "
  1598. "initialised for QoS1/QoS2 records. Please call "
  1599. " MQTT_InitStatefulQoS to enable use of QoS1 and "
  1600. " QoS2 packets." ) );
  1601. status = MQTTBadParameter;
  1602. break;
  1603. }
  1604. }
  1605. }
  1606. }
  1607. return status;
  1608. }
  1609. /*-----------------------------------------------------------*/
  1610. static size_t addEncodedStringToVector( uint8_t serializedLength[ CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES ],
  1611. const char * const string,
  1612. uint16_t length,
  1613. TransportOutVector_t * iterator,
  1614. size_t * updatedLength )
  1615. {
  1616. size_t packetLength = 0U;
  1617. TransportOutVector_t * pLocalIterator = iterator;
  1618. size_t vectorsAdded = 0U;
  1619. /* When length is non-zero, the string must be non-NULL. */
  1620. assert( ( length != 0U ) ? ( string != NULL ) : true );
  1621. serializedLength[ 0 ] = ( ( uint8_t ) ( ( length ) >> 8 ) );
  1622. serializedLength[ 1 ] = ( ( uint8_t ) ( ( length ) & 0x00ffU ) );
  1623. /* Add the serialized length of the string first. */
  1624. pLocalIterator[ 0 ].iov_base = serializedLength;
  1625. pLocalIterator[ 0 ].iov_len = CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES;
  1626. vectorsAdded++;
  1627. packetLength = CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES;
  1628. /* Sometimes the string can be NULL that is, of 0 length. In that case,
  1629. * only the length field should be encoded in the vector. */
  1630. if( ( string != NULL ) && ( length != 0U ) )
  1631. {
  1632. /* Then add the pointer to the string itself. */
  1633. pLocalIterator[ 1 ].iov_base = string;
  1634. pLocalIterator[ 1 ].iov_len = length;
  1635. vectorsAdded++;
  1636. packetLength += length;
  1637. }
  1638. ( *updatedLength ) = ( *updatedLength ) + packetLength;
  1639. return vectorsAdded;
  1640. }
  1641. /*-----------------------------------------------------------*/
  1642. static MQTTStatus_t sendSubscribeWithoutCopy( MQTTContext_t * pContext,
  1643. const MQTTSubscribeInfo_t * pSubscriptionList,
  1644. size_t subscriptionCount,
  1645. uint16_t packetId,
  1646. size_t remainingLength )
  1647. {
  1648. MQTTStatus_t status = MQTTSuccess;
  1649. uint8_t * pIndex;
  1650. TransportOutVector_t pIoVector[ MQTT_SUB_UNSUB_MAX_VECTORS ];
  1651. TransportOutVector_t * pIterator;
  1652. uint8_t serializedTopicFieldLength[ MQTT_SUB_UNSUB_MAX_VECTORS ][ CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES ];
  1653. size_t totalPacketLength = 0U;
  1654. size_t ioVectorLength = 0U;
  1655. size_t subscriptionsSent = 0U;
  1656. size_t vectorsAdded;
  1657. size_t topicFieldLengthIndex;
  1658. /* Maximum number of bytes required by the 'fixed' part of the SUBSCRIBE
  1659. * packet header according to the MQTT specification.
  1660. * MQTT Control Byte 0 + 1 = 1
  1661. * Remaining length (max) + 4 = 5
  1662. * Packet ID + 2 = 7 */
  1663. uint8_t subscribeheader[ 7U ];
  1664. /* The vector array should be at least three element long as the topic
  1665. * string needs these many vector elements to be stored. */
  1666. assert( MQTT_SUB_UNSUB_MAX_VECTORS >= CORE_MQTT_SUBSCRIBE_PER_TOPIC_VECTOR_LENGTH );
  1667. pIndex = subscribeheader;
  1668. pIterator = pIoVector;
  1669. pIndex = MQTT_SerializeSubscribeHeader( remainingLength,
  1670. pIndex,
  1671. packetId );
  1672. /* The header is to be sent first. */
  1673. pIterator->iov_base = subscribeheader;
  1674. /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */
  1675. /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */
  1676. /* coverity[misra_c_2012_rule_18_2_violation] */
  1677. /* coverity[misra_c_2012_rule_10_8_violation] */
  1678. pIterator->iov_len = ( size_t ) ( pIndex - subscribeheader );
  1679. totalPacketLength += pIterator->iov_len;
  1680. pIterator++;
  1681. ioVectorLength++;
  1682. while( ( status == MQTTSuccess ) && ( subscriptionsSent < subscriptionCount ) )
  1683. {
  1684. /* Reset the index for next iteration. */
  1685. topicFieldLengthIndex = 0;
  1686. /* Check whether the subscription topic (with QoS) will fit in the
  1687. * given vector. */
  1688. while( ( ioVectorLength <= ( MQTT_SUB_UNSUB_MAX_VECTORS - CORE_MQTT_SUBSCRIBE_PER_TOPIC_VECTOR_LENGTH ) ) &&
  1689. ( subscriptionsSent < subscriptionCount ) )
  1690. {
  1691. /* The topic filter and the filter length gets sent next. */
  1692. vectorsAdded = addEncodedStringToVector( serializedTopicFieldLength[ topicFieldLengthIndex ],
  1693. pSubscriptionList[ subscriptionsSent ].pTopicFilter,
  1694. pSubscriptionList[ subscriptionsSent ].topicFilterLength,
  1695. pIterator,
  1696. &totalPacketLength );
  1697. /* Update the pointer after the above operation. */
  1698. pIterator = &pIterator[ vectorsAdded ];
  1699. /* Lastly, the QoS gets sent. */
  1700. pIterator->iov_base = &( pSubscriptionList[ subscriptionsSent ].qos );
  1701. pIterator->iov_len = 1U;
  1702. totalPacketLength += pIterator->iov_len;
  1703. /* Increment the pointer. */
  1704. pIterator++;
  1705. /* Two slots get used by the topic string length and topic string.
  1706. * One slot gets used by the quality of service. */
  1707. ioVectorLength += vectorsAdded + 1U;
  1708. subscriptionsSent++;
  1709. /* The index needs to be updated for next iteration. */
  1710. topicFieldLengthIndex++;
  1711. }
  1712. if( sendMessageVector( pContext,
  1713. pIoVector,
  1714. ioVectorLength ) != ( int32_t ) totalPacketLength )
  1715. {
  1716. status = MQTTSendFailed;
  1717. }
  1718. /* Update the iterator for the next potential loop iteration. */
  1719. pIterator = pIoVector;
  1720. /* Reset the vector length for the next potential loop iteration. */
  1721. ioVectorLength = 0U;
  1722. /* Reset the packet length for the next potential loop iteration. */
  1723. totalPacketLength = 0U;
  1724. }
  1725. return status;
  1726. }
  1727. /*-----------------------------------------------------------*/
  1728. static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext,
  1729. const MQTTSubscribeInfo_t * pSubscriptionList,
  1730. size_t subscriptionCount,
  1731. uint16_t packetId,
  1732. size_t remainingLength )
  1733. {
  1734. MQTTStatus_t status = MQTTSuccess;
  1735. uint8_t * pIndex;
  1736. TransportOutVector_t pIoVector[ MQTT_SUB_UNSUB_MAX_VECTORS ];
  1737. TransportOutVector_t * pIterator;
  1738. uint8_t serializedTopicFieldLength[ MQTT_SUB_UNSUB_MAX_VECTORS ][ CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES ];
  1739. size_t totalPacketLength = 0U;
  1740. size_t unsubscriptionsSent = 0U;
  1741. size_t ioVectorLength = 0U;
  1742. size_t vectorsAdded;
  1743. size_t topicFieldLengthIndex;
  1744. /* Maximum number of bytes required by the 'fixed' part of the UNSUBSCRIBE
  1745. * packet header according to the MQTT specification.
  1746. * MQTT Control Byte 0 + 1 = 1
  1747. * Remaining length (max) + 4 = 5
  1748. * Packet ID + 2 = 7 */
  1749. uint8_t unsubscribeheader[ 7U ];
  1750. /* The vector array should be at least three element long as the topic
  1751. * string needs these many vector elements to be stored. */
  1752. assert( MQTT_SUB_UNSUB_MAX_VECTORS >= CORE_MQTT_UNSUBSCRIBE_PER_TOPIC_VECTOR_LENGTH );
  1753. pIndex = unsubscribeheader;
  1754. pIterator = pIoVector;
  1755. pIndex = MQTT_SerializeUnsubscribeHeader( remainingLength,
  1756. pIndex,
  1757. packetId );
  1758. /* The header is to be sent first. */
  1759. pIterator->iov_base = unsubscribeheader;
  1760. /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */
  1761. /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */
  1762. /* coverity[misra_c_2012_rule_18_2_violation] */
  1763. /* coverity[misra_c_2012_rule_10_8_violation] */
  1764. pIterator->iov_len = ( size_t ) ( pIndex - unsubscribeheader );
  1765. totalPacketLength += pIterator->iov_len;
  1766. pIterator++;
  1767. ioVectorLength++;
  1768. while( ( status == MQTTSuccess ) && ( unsubscriptionsSent < subscriptionCount ) )
  1769. {
  1770. /* Reset the index for next iteration. */
  1771. topicFieldLengthIndex = 0;
  1772. /* Check whether the subscription topic will fit in the given vector. */
  1773. while( ( ioVectorLength <= ( MQTT_SUB_UNSUB_MAX_VECTORS - CORE_MQTT_UNSUBSCRIBE_PER_TOPIC_VECTOR_LENGTH ) ) &&
  1774. ( unsubscriptionsSent < subscriptionCount ) )
  1775. {
  1776. /* The topic filter gets sent next. */
  1777. vectorsAdded = addEncodedStringToVector( serializedTopicFieldLength[ topicFieldLengthIndex ],
  1778. pSubscriptionList[ unsubscriptionsSent ].pTopicFilter,
  1779. pSubscriptionList[ unsubscriptionsSent ].topicFilterLength,
  1780. pIterator,
  1781. &totalPacketLength );
  1782. /* Update the iterator to point to the next empty location. */
  1783. pIterator = &pIterator[ vectorsAdded ];
  1784. /* Update the total count based on how many vectors were added. */
  1785. ioVectorLength += vectorsAdded;
  1786. unsubscriptionsSent++;
  1787. /* Update the index for next iteration. */
  1788. topicFieldLengthIndex++;
  1789. }
  1790. if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalPacketLength )
  1791. {
  1792. status = MQTTSendFailed;
  1793. }
  1794. /* Update the iterator for the next potential loop iteration. */
  1795. pIterator = pIoVector;
  1796. /* Reset the vector length for the next potential loop iteration. */
  1797. ioVectorLength = 0U;
  1798. /* Reset the packet length for the next potential loop iteration. */
  1799. totalPacketLength = 0U;
  1800. }
  1801. return status;
  1802. }
  1803. /*-----------------------------------------------------------*/
  1804. static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
  1805. const MQTTPublishInfo_t * pPublishInfo,
  1806. const uint8_t * pMqttHeader,
  1807. size_t headerSize,
  1808. uint16_t packetId )
  1809. {
  1810. MQTTStatus_t status = MQTTSuccess;
  1811. size_t ioVectorLength;
  1812. size_t totalMessageLength;
  1813. /* Bytes required to encode the packet ID in an MQTT header according to
  1814. * the MQTT specification. */
  1815. uint8_t serializedPacketID[ 2U ];
  1816. /* Maximum number of vectors required to encode and send a publish
  1817. * packet. The breakdown is shown below.
  1818. * Fixed header (including topic string length) 0 + 1 = 1
  1819. * Topic string + 1 = 2
  1820. * Packet ID (only when QoS > QoS0) + 1 = 3
  1821. * Payload + 1 = 4 */
  1822. TransportOutVector_t pIoVector[ 4U ];
  1823. /* The header is sent first. */
  1824. pIoVector[ 0U ].iov_base = pMqttHeader;
  1825. pIoVector[ 0U ].iov_len = headerSize;
  1826. totalMessageLength = headerSize;
  1827. /* Then the topic name has to be sent. */
  1828. pIoVector[ 1U ].iov_base = pPublishInfo->pTopicName;
  1829. pIoVector[ 1U ].iov_len = pPublishInfo->topicNameLength;
  1830. totalMessageLength += pPublishInfo->topicNameLength;
  1831. /* The next field's index should be 2 as the first two fields
  1832. * have been filled in. */
  1833. ioVectorLength = 2U;
  1834. if( pPublishInfo->qos > MQTTQoS0 )
  1835. {
  1836. /* Encode the packet ID. */
  1837. serializedPacketID[ 0 ] = ( ( uint8_t ) ( ( packetId ) >> 8 ) );
  1838. serializedPacketID[ 1 ] = ( ( uint8_t ) ( ( packetId ) & 0x00ffU ) );
  1839. pIoVector[ ioVectorLength ].iov_base = serializedPacketID;
  1840. pIoVector[ ioVectorLength ].iov_len = sizeof( serializedPacketID );
  1841. ioVectorLength++;
  1842. totalMessageLength += sizeof( serializedPacketID );
  1843. }
  1844. /* Publish packets are allowed to contain no payload. */
  1845. if( pPublishInfo->payloadLength > 0U )
  1846. {
  1847. pIoVector[ ioVectorLength ].iov_base = pPublishInfo->pPayload;
  1848. pIoVector[ ioVectorLength ].iov_len = pPublishInfo->payloadLength;
  1849. ioVectorLength++;
  1850. totalMessageLength += pPublishInfo->payloadLength;
  1851. }
  1852. if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength )
  1853. {
  1854. status = MQTTSendFailed;
  1855. }
  1856. return status;
  1857. }
  1858. /*-----------------------------------------------------------*/
  1859. static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext,
  1860. const MQTTConnectInfo_t * pConnectInfo,
  1861. const MQTTPublishInfo_t * pWillInfo,
  1862. size_t remainingLength )
  1863. {
  1864. MQTTStatus_t status = MQTTSuccess;
  1865. TransportOutVector_t * iterator;
  1866. size_t ioVectorLength = 0U;
  1867. size_t totalMessageLength = 0U;
  1868. int32_t bytesSentOrError;
  1869. uint8_t * pIndex;
  1870. uint8_t serializedClientIDLength[ 2 ];
  1871. uint8_t serializedTopicLength[ 2 ];
  1872. uint8_t serializedPayloadLength[ 2 ];
  1873. uint8_t serializedUsernameLength[ 2 ];
  1874. uint8_t serializedPasswordLength[ 2 ];
  1875. size_t vectorsAdded;
  1876. /* Maximum number of bytes required by the 'fixed' part of the CONNECT
  1877. * packet header according to the MQTT specification.
  1878. * MQTT Control Byte 0 + 1 = 1
  1879. * Remaining length (max) + 4 = 5
  1880. * Protocol Name Length + 2 = 7
  1881. * Protocol Name (MQTT) + 4 = 11
  1882. * Protocol level + 1 = 12
  1883. * Connect flags + 1 = 13
  1884. * Keep alive + 2 = 15 */
  1885. uint8_t connectPacketHeader[ 15U ];
  1886. /* The maximum vectors required to encode and send a connect packet. The
  1887. * breakdown is shown below.
  1888. * Fixed header 0 + 1 = 1
  1889. * Client ID + 2 = 3
  1890. * Will topic + 2 = 5
  1891. * Will payload + 2 = 7
  1892. * Username + 2 = 9
  1893. * Password + 2 = 11 */
  1894. TransportOutVector_t pIoVector[ 11U ];
  1895. iterator = pIoVector;
  1896. pIndex = connectPacketHeader;
  1897. /* Validate arguments. */
  1898. if( ( pWillInfo != NULL ) && ( pWillInfo->pTopicName == NULL ) )
  1899. {
  1900. LogError( ( "pWillInfo->pTopicName cannot be NULL if Will is present." ) );
  1901. status = MQTTBadParameter;
  1902. }
  1903. else
  1904. {
  1905. pIndex = MQTT_SerializeConnectFixedHeader( pIndex,
  1906. pConnectInfo,
  1907. pWillInfo,
  1908. remainingLength );
  1909. assert( ( pIndex - connectPacketHeader ) <= sizeof( connectPacketHeader ) );
  1910. /* The header gets sent first. */
  1911. iterator->iov_base = connectPacketHeader;
  1912. /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */
  1913. /* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */
  1914. /* coverity[misra_c_2012_rule_18_2_violation] */
  1915. /* coverity[misra_c_2012_rule_10_8_violation] */
  1916. iterator->iov_len = ( size_t ) ( pIndex - connectPacketHeader );
  1917. totalMessageLength += iterator->iov_len;
  1918. iterator++;
  1919. ioVectorLength++;
  1920. /* Serialize the client ID. */
  1921. vectorsAdded = addEncodedStringToVector( serializedClientIDLength,
  1922. pConnectInfo->pClientIdentifier,
  1923. pConnectInfo->clientIdentifierLength,
  1924. iterator,
  1925. &totalMessageLength );
  1926. /* Update the iterator to point to the next empty slot. */
  1927. iterator = &iterator[ vectorsAdded ];
  1928. ioVectorLength += vectorsAdded;
  1929. if( pWillInfo != NULL )
  1930. {
  1931. /* Serialize the topic. */
  1932. vectorsAdded = addEncodedStringToVector( serializedTopicLength,
  1933. pWillInfo->pTopicName,
  1934. pWillInfo->topicNameLength,
  1935. iterator,
  1936. &totalMessageLength );
  1937. /* Update the iterator to point to the next empty slot. */
  1938. iterator = &iterator[ vectorsAdded ];
  1939. ioVectorLength += vectorsAdded;
  1940. /* Serialize the payload. Payload of last will and testament can be NULL. */
  1941. vectorsAdded = addEncodedStringToVector( serializedPayloadLength,
  1942. pWillInfo->pPayload,
  1943. ( uint16_t ) pWillInfo->payloadLength,
  1944. iterator,
  1945. &totalMessageLength );
  1946. /* Update the iterator to point to the next empty slot. */
  1947. iterator = &iterator[ vectorsAdded ];
  1948. ioVectorLength += vectorsAdded;
  1949. }
  1950. /* Encode the user name if provided. */
  1951. if( pConnectInfo->pUserName != NULL )
  1952. {
  1953. /* Serialize the user name string. */
  1954. vectorsAdded = addEncodedStringToVector( serializedUsernameLength,
  1955. pConnectInfo->pUserName,
  1956. pConnectInfo->userNameLength,
  1957. iterator,
  1958. &totalMessageLength );
  1959. /* Update the iterator to point to the next empty slot. */
  1960. iterator = &iterator[ vectorsAdded ];
  1961. ioVectorLength += vectorsAdded;
  1962. }
  1963. /* Encode the password if provided. */
  1964. if( pConnectInfo->pPassword != NULL )
  1965. {
  1966. /* Serialize the user name string. */
  1967. vectorsAdded = addEncodedStringToVector( serializedPasswordLength,
  1968. pConnectInfo->pPassword,
  1969. pConnectInfo->passwordLength,
  1970. iterator,
  1971. &totalMessageLength );
  1972. /* Update the iterator to point to the next empty slot. */
  1973. iterator = &iterator[ vectorsAdded ];
  1974. ioVectorLength += vectorsAdded;
  1975. }
  1976. bytesSentOrError = sendMessageVector( pContext, pIoVector, ioVectorLength );
  1977. if( bytesSentOrError != ( int32_t ) totalMessageLength )
  1978. {
  1979. status = MQTTSendFailed;
  1980. }
  1981. }
  1982. return status;
  1983. }
  1984. /*-----------------------------------------------------------*/
  1985. static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext,
  1986. uint32_t timeoutMs,
  1987. bool cleanSession,
  1988. MQTTPacketInfo_t * pIncomingPacket,
  1989. bool * pSessionPresent )
  1990. {
  1991. MQTTStatus_t status = MQTTSuccess;
  1992. MQTTGetCurrentTimeFunc_t getTimeStamp = NULL;
  1993. uint32_t entryTimeMs = 0U, remainingTimeMs = 0U, timeTakenMs = 0U;
  1994. bool breakFromLoop = false;
  1995. uint16_t loopCount = 0U;
  1996. assert( pContext != NULL );
  1997. assert( pIncomingPacket != NULL );
  1998. assert( pContext->getTime != NULL );
  1999. getTimeStamp = pContext->getTime;
  2000. /* Get the entry time for the function. */
  2001. entryTimeMs = getTimeStamp();
  2002. do
  2003. {
  2004. /* Transport read for incoming CONNACK packet type and length.
  2005. * MQTT_GetIncomingPacketTypeAndLength is a blocking call and it is
  2006. * returned after a transport receive timeout, an error, or a successful
  2007. * receive of packet type and length. */
  2008. status = MQTT_GetIncomingPacketTypeAndLength( pContext->transportInterface.recv,
  2009. pContext->transportInterface.pNetworkContext,
  2010. pIncomingPacket );
  2011. /* The loop times out based on 2 conditions.
  2012. * 1. If timeoutMs is greater than 0:
  2013. * Loop times out based on the timeout calculated by getTime()
  2014. * function.
  2015. * 2. If timeoutMs is 0:
  2016. * Loop times out based on the maximum number of retries config
  2017. * MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT. This config will control
  2018. * maximum the number of retry attempts to read the CONNACK packet.
  2019. * A value of 0 for the config will try once to read CONNACK. */
  2020. if( timeoutMs > 0U )
  2021. {
  2022. breakFromLoop = calculateElapsedTime( getTimeStamp(), entryTimeMs ) >= timeoutMs;
  2023. }
  2024. else
  2025. {
  2026. breakFromLoop = loopCount >= MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT;
  2027. loopCount++;
  2028. }
  2029. /* Loop until there is data to read or if we have exceeded the timeout/retries. */
  2030. } while( ( status == MQTTNoDataAvailable ) && ( breakFromLoop == false ) );
  2031. if( status == MQTTSuccess )
  2032. {
  2033. /* Time taken in this function so far. */
  2034. timeTakenMs = calculateElapsedTime( getTimeStamp(), entryTimeMs );
  2035. if( timeTakenMs < timeoutMs )
  2036. {
  2037. /* Calculate remaining time for receiving the remainder of
  2038. * the packet. */
  2039. remainingTimeMs = timeoutMs - timeTakenMs;
  2040. }
  2041. /* Reading the remainder of the packet by transport recv.
  2042. * Attempt to read once even if the timeout has expired.
  2043. * Invoking receivePacket with remainingTime as 0 would attempt to
  2044. * recv from network once. If using retries, the remainder of the
  2045. * CONNACK packet is tried to be read only once. Reading once would be
  2046. * good as the packet type and remaining length was already read. Hence,
  2047. * the probability of the remaining 2 bytes available to read is very high. */
  2048. if( pIncomingPacket->type == MQTT_PACKET_TYPE_CONNACK )
  2049. {
  2050. status = receivePacket( pContext,
  2051. *pIncomingPacket,
  2052. remainingTimeMs );
  2053. }
  2054. else
  2055. {
  2056. LogError( ( "Incorrect packet type %X received while expecting"
  2057. " CONNACK(%X).",
  2058. ( unsigned int ) pIncomingPacket->type,
  2059. MQTT_PACKET_TYPE_CONNACK ) );
  2060. status = MQTTBadResponse;
  2061. }
  2062. }
  2063. if( status == MQTTSuccess )
  2064. {
  2065. /* Update the packet info pointer to the buffer read. */
  2066. pIncomingPacket->pRemainingData = pContext->networkBuffer.pBuffer;
  2067. /* Deserialize CONNACK. */
  2068. status = MQTT_DeserializeAck( pIncomingPacket, NULL, pSessionPresent );
  2069. }
  2070. /* If a clean session is requested, a session present should not be set by
  2071. * broker. */
  2072. if( status == MQTTSuccess )
  2073. {
  2074. if( ( cleanSession == true ) && ( *pSessionPresent == true ) )
  2075. {
  2076. LogError( ( "Unexpected session present flag in CONNACK response from broker."
  2077. " CONNECT request with clean session was made with broker." ) );
  2078. status = MQTTBadResponse;
  2079. }
  2080. }
  2081. if( status == MQTTSuccess )
  2082. {
  2083. LogDebug( ( "Received MQTT CONNACK successfully from broker." ) );
  2084. }
  2085. else
  2086. {
  2087. LogError( ( "CONNACK recv failed with status = %s.",
  2088. MQTT_Status_strerror( status ) ) );
  2089. }
  2090. return status;
  2091. }
  2092. /*-----------------------------------------------------------*/
  2093. static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext,
  2094. bool sessionPresent )
  2095. {
  2096. MQTTStatus_t status = MQTTSuccess;
  2097. MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
  2098. uint16_t packetId = MQTT_PACKET_ID_INVALID;
  2099. MQTTPublishState_t state = MQTTStateNull;
  2100. assert( pContext != NULL );
  2101. /* Reset the index and clear the buffer when a new session is established. */
  2102. pContext->index = 0;
  2103. ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size );
  2104. if( sessionPresent == true )
  2105. {
  2106. /* Get the next packet ID for which a PUBREL need to be resent. */
  2107. packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
  2108. /* Resend all the PUBREL acks after session is reestablished. */
  2109. while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
  2110. ( status == MQTTSuccess ) )
  2111. {
  2112. status = sendPublishAcks( pContext, packetId, state );
  2113. packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
  2114. }
  2115. }
  2116. else
  2117. {
  2118. /* Clear any existing records if a new session is established. */
  2119. if( pContext->outgoingPublishRecordMaxCount > 0U )
  2120. {
  2121. ( void ) memset( pContext->outgoingPublishRecords,
  2122. 0x00,
  2123. pContext->outgoingPublishRecordMaxCount * sizeof( *pContext->outgoingPublishRecords ) );
  2124. }
  2125. if( pContext->incomingPublishRecordMaxCount > 0U )
  2126. {
  2127. ( void ) memset( pContext->incomingPublishRecords,
  2128. 0x00,
  2129. pContext->incomingPublishRecordMaxCount * sizeof( *pContext->incomingPublishRecords ) );
  2130. }
  2131. }
  2132. return status;
  2133. }
  2134. static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
  2135. const MQTTPublishInfo_t * pPublishInfo,
  2136. uint16_t packetId )
  2137. {
  2138. MQTTStatus_t status = MQTTSuccess;
  2139. /* Validate arguments. */
  2140. if( ( pContext == NULL ) || ( pPublishInfo == NULL ) )
  2141. {
  2142. LogError( ( "Argument cannot be NULL: pContext=%p, "
  2143. "pPublishInfo=%p.",
  2144. ( void * ) pContext,
  2145. ( void * ) pPublishInfo ) );
  2146. status = MQTTBadParameter;
  2147. }
  2148. else if( ( pPublishInfo->qos != MQTTQoS0 ) && ( packetId == 0U ) )
  2149. {
  2150. LogError( ( "Packet Id is 0 for PUBLISH with QoS=%u.",
  2151. ( unsigned int ) pPublishInfo->qos ) );
  2152. status = MQTTBadParameter;
  2153. }
  2154. else if( ( pPublishInfo->payloadLength > 0U ) && ( pPublishInfo->pPayload == NULL ) )
  2155. {
  2156. LogError( ( "A nonzero payload length requires a non-NULL payload: "
  2157. "payloadLength=%lu, pPayload=%p.",
  2158. ( unsigned long ) pPublishInfo->payloadLength,
  2159. pPublishInfo->pPayload ) );
  2160. status = MQTTBadParameter;
  2161. }
  2162. else if( ( pContext->outgoingPublishRecords == NULL ) && ( pPublishInfo->qos > MQTTQoS0 ) )
  2163. {
  2164. LogError( ( "Trying to publish a QoS > MQTTQoS0 packet when outgoing publishes "
  2165. "for QoS1/QoS2 have not been enabled. Please, call MQTT_InitStatefulQoS "
  2166. "to initialize and enable the use of QoS1/QoS2 publishes." ) );
  2167. status = MQTTBadParameter;
  2168. }
  2169. else
  2170. {
  2171. /* MISRA else */
  2172. }
  2173. return status;
  2174. }
  2175. /*-----------------------------------------------------------*/
  2176. MQTTStatus_t MQTT_Init( MQTTContext_t * pContext,
  2177. const TransportInterface_t * pTransportInterface,
  2178. MQTTGetCurrentTimeFunc_t getTimeFunction,
  2179. MQTTEventCallback_t userCallback,
  2180. const MQTTFixedBuffer_t * pNetworkBuffer )
  2181. {
  2182. MQTTStatus_t status = MQTTSuccess;
  2183. /* Validate arguments. */
  2184. if( ( pContext == NULL ) || ( pTransportInterface == NULL ) ||
  2185. ( pNetworkBuffer == NULL ) )
  2186. {
  2187. LogError( ( "Argument cannot be NULL: pContext=%p, "
  2188. "pTransportInterface=%p, "
  2189. "pNetworkBuffer=%p",
  2190. ( void * ) pContext,
  2191. ( void * ) pTransportInterface,
  2192. ( void * ) pNetworkBuffer ) );
  2193. status = MQTTBadParameter;
  2194. }
  2195. else if( getTimeFunction == NULL )
  2196. {
  2197. LogError( ( "Invalid parameter: getTimeFunction is NULL" ) );
  2198. status = MQTTBadParameter;
  2199. }
  2200. else if( userCallback == NULL )
  2201. {
  2202. LogError( ( "Invalid parameter: userCallback is NULL" ) );
  2203. status = MQTTBadParameter;
  2204. }
  2205. else if( pTransportInterface->recv == NULL )
  2206. {
  2207. LogError( ( "Invalid parameter: pTransportInterface->recv is NULL" ) );
  2208. status = MQTTBadParameter;
  2209. }
  2210. else if( pTransportInterface->send == NULL )
  2211. {
  2212. LogError( ( "Invalid parameter: pTransportInterface->send is NULL" ) );
  2213. status = MQTTBadParameter;
  2214. }
  2215. else
  2216. {
  2217. ( void ) memset( pContext, 0x00, sizeof( MQTTContext_t ) );
  2218. pContext->connectStatus = MQTTNotConnected;
  2219. pContext->transportInterface = *pTransportInterface;
  2220. pContext->getTime = getTimeFunction;
  2221. pContext->appCallback = userCallback;
  2222. pContext->networkBuffer = *pNetworkBuffer;
  2223. /* Zero is not a valid packet ID per MQTT spec. Start from 1. */
  2224. pContext->nextPacketId = 1;
  2225. }
  2226. return status;
  2227. }
  2228. /*-----------------------------------------------------------*/
  2229. MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext,
  2230. MQTTPubAckInfo_t * pOutgoingPublishRecords,
  2231. size_t outgoingPublishCount,
  2232. MQTTPubAckInfo_t * pIncomingPublishRecords,
  2233. size_t incomingPublishCount )
  2234. {
  2235. MQTTStatus_t status = MQTTSuccess;
  2236. if( pContext == NULL )
  2237. {
  2238. LogError( ( "Argument cannot be NULL: pContext=%p\n",
  2239. ( void * ) pContext ) );
  2240. status = MQTTBadParameter;
  2241. }
  2242. /* Check whether the arguments make sense. Not equal here behaves
  2243. * like an exclusive-or operator for boolean values. */
  2244. else if( ( outgoingPublishCount == 0U ) !=
  2245. ( pOutgoingPublishRecords == NULL ) )
  2246. {
  2247. LogError( ( "Arguments do not match: pOutgoingPublishRecords=%p, "
  2248. "outgoingPublishCount=%lu",
  2249. ( void * ) pOutgoingPublishRecords,
  2250. outgoingPublishCount ) );
  2251. status = MQTTBadParameter;
  2252. }
  2253. /* Check whether the arguments make sense. Not equal here behaves
  2254. * like an exclusive-or operator for boolean values. */
  2255. else if( ( incomingPublishCount == 0U ) !=
  2256. ( pIncomingPublishRecords == NULL ) )
  2257. {
  2258. LogError( ( "Arguments do not match: pIncomingPublishRecords=%p, "
  2259. "incomingPublishCount=%lu",
  2260. ( void * ) pIncomingPublishRecords,
  2261. incomingPublishCount ) );
  2262. status = MQTTBadParameter;
  2263. }
  2264. else if( pContext->appCallback == NULL )
  2265. {
  2266. LogError( ( "MQTT_InitStatefulQoS must be called only after MQTT_Init has"
  2267. " been called successfully.\n" ) );
  2268. status = MQTTBadParameter;
  2269. }
  2270. else
  2271. {
  2272. pContext->incomingPublishRecordMaxCount = incomingPublishCount;
  2273. pContext->incomingPublishRecords = pIncomingPublishRecords;
  2274. pContext->outgoingPublishRecordMaxCount = outgoingPublishCount;
  2275. pContext->outgoingPublishRecords = pOutgoingPublishRecords;
  2276. }
  2277. return status;
  2278. }
  2279. /*-----------------------------------------------------------*/
  2280. MQTTStatus_t MQTT_CancelCallback( const MQTTContext_t * pContext,
  2281. uint16_t packetId )
  2282. {
  2283. MQTTStatus_t status = MQTTSuccess;
  2284. if( pContext == NULL )
  2285. {
  2286. LogWarn( ( "pContext is NULL\n" ) );
  2287. status = MQTTBadParameter;
  2288. }
  2289. else if( pContext->outgoingPublishRecords == NULL )
  2290. {
  2291. LogError( ( "QoS1/QoS2 is not initialized for use. Please, "
  2292. "call MQTT_InitStatefulQoS to enable QoS1 and QoS2 "
  2293. "publishes.\n" ) );
  2294. status = MQTTBadParameter;
  2295. }
  2296. else
  2297. {
  2298. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  2299. status = MQTT_RemoveStateRecord( pContext,
  2300. packetId );
  2301. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  2302. }
  2303. return status;
  2304. }
  2305. /*-----------------------------------------------------------*/
  2306. MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,
  2307. const MQTTConnectInfo_t * pConnectInfo,
  2308. const MQTTPublishInfo_t * pWillInfo,
  2309. uint32_t timeoutMs,
  2310. bool * pSessionPresent )
  2311. {
  2312. size_t remainingLength = 0UL, packetSize = 0UL;
  2313. MQTTStatus_t status = MQTTSuccess;
  2314. MQTTPacketInfo_t incomingPacket = { 0 };
  2315. incomingPacket.type = ( uint8_t ) 0;
  2316. if( ( pContext == NULL ) || ( pConnectInfo == NULL ) || ( pSessionPresent == NULL ) )
  2317. {
  2318. LogError( ( "Argument cannot be NULL: pContext=%p, "
  2319. "pConnectInfo=%p, pSessionPresent=%p.",
  2320. ( void * ) pContext,
  2321. ( void * ) pConnectInfo,
  2322. ( void * ) pSessionPresent ) );
  2323. status = MQTTBadParameter;
  2324. }
  2325. if( status == MQTTSuccess )
  2326. {
  2327. /* Get MQTT connect packet size and remaining length. */
  2328. status = MQTT_GetConnectPacketSize( pConnectInfo,
  2329. pWillInfo,
  2330. &remainingLength,
  2331. &packetSize );
  2332. LogDebug( ( "CONNECT packet size is %lu and remaining length is %lu.",
  2333. ( unsigned long ) packetSize,
  2334. ( unsigned long ) remainingLength ) );
  2335. }
  2336. if( status == MQTTSuccess )
  2337. {
  2338. MQTT_PRE_SEND_HOOK( pContext );
  2339. status = sendConnectWithoutCopy( pContext,
  2340. pConnectInfo,
  2341. pWillInfo,
  2342. remainingLength );
  2343. MQTT_POST_SEND_HOOK( pContext );
  2344. }
  2345. /* Read CONNACK from transport layer. */
  2346. if( status == MQTTSuccess )
  2347. {
  2348. status = receiveConnack( pContext,
  2349. timeoutMs,
  2350. pConnectInfo->cleanSession,
  2351. &incomingPacket,
  2352. pSessionPresent );
  2353. }
  2354. if( status == MQTTSuccess )
  2355. {
  2356. /* Resend PUBRELs when reestablishing a session, or clear records for new sessions. */
  2357. status = handleSessionResumption( pContext, *pSessionPresent );
  2358. }
  2359. if( status == MQTTSuccess )
  2360. {
  2361. LogInfo( ( "MQTT connection established with the broker." ) );
  2362. pContext->connectStatus = MQTTConnected;
  2363. /* Initialize keep-alive fields after a successful connection. */
  2364. pContext->keepAliveIntervalSec = pConnectInfo->keepAliveSeconds;
  2365. pContext->waitingForPingResp = false;
  2366. pContext->pingReqSendTimeMs = 0U;
  2367. }
  2368. else
  2369. {
  2370. LogError( ( "MQTT connection failed with status = %s.",
  2371. MQTT_Status_strerror( status ) ) );
  2372. }
  2373. return status;
  2374. }
  2375. /*-----------------------------------------------------------*/
  2376. MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext,
  2377. const MQTTSubscribeInfo_t * pSubscriptionList,
  2378. size_t subscriptionCount,
  2379. uint16_t packetId )
  2380. {
  2381. size_t remainingLength = 0UL, packetSize = 0UL;
  2382. /* Validate arguments. */
  2383. MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext,
  2384. pSubscriptionList,
  2385. subscriptionCount,
  2386. packetId );
  2387. if( status == MQTTSuccess )
  2388. {
  2389. /* Get the remaining length and packet size.*/
  2390. status = MQTT_GetSubscribePacketSize( pSubscriptionList,
  2391. subscriptionCount,
  2392. &remainingLength,
  2393. &packetSize );
  2394. LogDebug( ( "SUBSCRIBE packet size is %lu and remaining length is %lu.",
  2395. ( unsigned long ) packetSize,
  2396. ( unsigned long ) remainingLength ) );
  2397. }
  2398. if( status == MQTTSuccess )
  2399. {
  2400. MQTT_PRE_SEND_HOOK( pContext );
  2401. /* Send MQTT SUBSCRIBE packet. */
  2402. status = sendSubscribeWithoutCopy( pContext,
  2403. pSubscriptionList,
  2404. subscriptionCount,
  2405. packetId,
  2406. remainingLength );
  2407. MQTT_POST_SEND_HOOK( pContext );
  2408. }
  2409. return status;
  2410. }
  2411. /*-----------------------------------------------------------*/
  2412. MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
  2413. const MQTTPublishInfo_t * pPublishInfo,
  2414. uint16_t packetId )
  2415. {
  2416. size_t headerSize = 0UL;
  2417. size_t remainingLength = 0UL;
  2418. size_t packetSize = 0UL;
  2419. MQTTPublishState_t publishStatus = MQTTStateNull;
  2420. bool stateUpdateHookExecuted = false;
  2421. /* Maximum number of bytes required by the 'fixed' part of the PUBLISH
  2422. * packet header according to the MQTT specifications.
  2423. * Header byte 0 + 1 = 1
  2424. * Length (max) + 4 = 5
  2425. * Topic string length + 2 = 7
  2426. *
  2427. * Note that since publish is one of the most common operations in MQTT
  2428. * connection, we have moved the topic string length to the 'fixed' part of
  2429. * the header so efficiency. Otherwise, we would need an extra vector and
  2430. * an extra call to 'send' (in case writev is not defined) to send the
  2431. * topic length. */
  2432. uint8_t mqttHeader[ 7U ];
  2433. /* Validate arguments. */
  2434. MQTTStatus_t status = validatePublishParams( pContext, pPublishInfo, packetId );
  2435. if( status == MQTTSuccess )
  2436. {
  2437. /* Get the remaining length and packet size.*/
  2438. status = MQTT_GetPublishPacketSize( pPublishInfo,
  2439. &remainingLength,
  2440. &packetSize );
  2441. }
  2442. if( status == MQTTSuccess )
  2443. {
  2444. status = MQTT_SerializePublishHeaderWithoutTopic( pPublishInfo,
  2445. remainingLength,
  2446. mqttHeader,
  2447. &headerSize );
  2448. }
  2449. if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
  2450. {
  2451. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  2452. /* Set the flag so that the corresponding hook can be called later. */
  2453. stateUpdateHookExecuted = true;
  2454. status = MQTT_ReserveState( pContext,
  2455. packetId,
  2456. pPublishInfo->qos );
  2457. /* State already exists for a duplicate packet.
  2458. * If a state doesn't exist, it will be handled as a new publish in
  2459. * state engine. */
  2460. if( ( status == MQTTStateCollision ) && ( pPublishInfo->dup == true ) )
  2461. {
  2462. status = MQTTSuccess;
  2463. }
  2464. }
  2465. if( status == MQTTSuccess )
  2466. {
  2467. /* Take the mutex as multiple send calls are required for sending this
  2468. * packet. */
  2469. MQTT_PRE_SEND_HOOK( pContext );
  2470. status = sendPublishWithoutCopy( pContext,
  2471. pPublishInfo,
  2472. mqttHeader,
  2473. headerSize,
  2474. packetId );
  2475. /* Give the mutex away for the next taker. */
  2476. MQTT_POST_SEND_HOOK( pContext );
  2477. }
  2478. if( ( status == MQTTSuccess ) &&
  2479. ( pPublishInfo->qos > MQTTQoS0 ) )
  2480. {
  2481. /* Update state machine after PUBLISH is sent.
  2482. * Only to be done for QoS1 or QoS2. */
  2483. status = MQTT_UpdateStatePublish( pContext,
  2484. packetId,
  2485. MQTT_SEND,
  2486. pPublishInfo->qos,
  2487. &publishStatus );
  2488. if( status != MQTTSuccess )
  2489. {
  2490. LogError( ( "Update state for publish failed with status %s."
  2491. " However PUBLISH packet was sent to the broker."
  2492. " Any further handling of ACKs for the packet Id"
  2493. " will fail.",
  2494. MQTT_Status_strerror( status ) ) );
  2495. }
  2496. }
  2497. if( stateUpdateHookExecuted == true )
  2498. {
  2499. /* Regardless of the status, if the mutex was taken due to the
  2500. * packet being of QoS > QoS0, then it should be relinquished. */
  2501. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  2502. }
  2503. if( status != MQTTSuccess )
  2504. {
  2505. LogError( ( "MQTT PUBLISH failed with status %s.",
  2506. MQTT_Status_strerror( status ) ) );
  2507. }
  2508. return status;
  2509. }
  2510. /*-----------------------------------------------------------*/
  2511. MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
  2512. {
  2513. int32_t sendResult = 0;
  2514. MQTTStatus_t status = MQTTSuccess;
  2515. size_t packetSize = 0U;
  2516. /* MQTT ping packets are of fixed length. */
  2517. uint8_t pingreqPacket[ 2U ];
  2518. MQTTFixedBuffer_t localBuffer;
  2519. localBuffer.pBuffer = pingreqPacket;
  2520. localBuffer.size = sizeof( pingreqPacket );
  2521. if( pContext == NULL )
  2522. {
  2523. LogError( ( "pContext is NULL." ) );
  2524. status = MQTTBadParameter;
  2525. }
  2526. if( status == MQTTSuccess )
  2527. {
  2528. /* Get MQTT PINGREQ packet size. */
  2529. status = MQTT_GetPingreqPacketSize( &packetSize );
  2530. if( status == MQTTSuccess )
  2531. {
  2532. assert( packetSize == localBuffer.size );
  2533. LogDebug( ( "MQTT PINGREQ packet size is %lu.",
  2534. ( unsigned long ) packetSize ) );
  2535. }
  2536. else
  2537. {
  2538. LogError( ( "Failed to get the PINGREQ packet size." ) );
  2539. }
  2540. }
  2541. if( status == MQTTSuccess )
  2542. {
  2543. /* Serialize MQTT PINGREQ. */
  2544. status = MQTT_SerializePingreq( &localBuffer );
  2545. }
  2546. if( status == MQTTSuccess )
  2547. {
  2548. /* Take the mutex as the send call should not be interrupted in
  2549. * between. */
  2550. MQTT_PRE_SEND_HOOK( pContext );
  2551. /* Send the serialized PINGREQ packet to transport layer.
  2552. * Here, we do not use the vectored IO approach for efficiency as the
  2553. * Ping packet does not have numerous fields which need to be copied
  2554. * from the user provided buffers. Thus it can be sent directly. */
  2555. sendResult = sendBuffer( pContext,
  2556. localBuffer.pBuffer,
  2557. packetSize );
  2558. /* Give the mutex away. */
  2559. MQTT_POST_SEND_HOOK( pContext );
  2560. /* It is an error to not send the entire PINGREQ packet. */
  2561. if( sendResult < ( int32_t ) packetSize )
  2562. {
  2563. LogError( ( "Transport send failed for PINGREQ packet." ) );
  2564. status = MQTTSendFailed;
  2565. }
  2566. else
  2567. {
  2568. pContext->pingReqSendTimeMs = pContext->lastPacketTxTime;
  2569. pContext->waitingForPingResp = true;
  2570. LogDebug( ( "Sent %ld bytes of PINGREQ packet.",
  2571. ( long int ) sendResult ) );
  2572. }
  2573. }
  2574. return status;
  2575. }
  2576. /*-----------------------------------------------------------*/
  2577. MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
  2578. const MQTTSubscribeInfo_t * pSubscriptionList,
  2579. size_t subscriptionCount,
  2580. uint16_t packetId )
  2581. {
  2582. size_t remainingLength = 0UL, packetSize = 0UL;
  2583. /* Validate arguments. */
  2584. MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext,
  2585. pSubscriptionList,
  2586. subscriptionCount,
  2587. packetId );
  2588. if( status == MQTTSuccess )
  2589. {
  2590. /* Get the remaining length and packet size.*/
  2591. status = MQTT_GetUnsubscribePacketSize( pSubscriptionList,
  2592. subscriptionCount,
  2593. &remainingLength,
  2594. &packetSize );
  2595. LogDebug( ( "UNSUBSCRIBE packet size is %lu and remaining length is %lu.",
  2596. ( unsigned long ) packetSize,
  2597. ( unsigned long ) remainingLength ) );
  2598. }
  2599. if( status == MQTTSuccess )
  2600. {
  2601. /* Take the mutex because the below call should not be interrupted. */
  2602. MQTT_PRE_SEND_HOOK( pContext );
  2603. status = sendUnsubscribeWithoutCopy( pContext,
  2604. pSubscriptionList,
  2605. subscriptionCount,
  2606. packetId,
  2607. remainingLength );
  2608. /* Give the mutex away. */
  2609. MQTT_POST_SEND_HOOK( pContext );
  2610. }
  2611. return status;
  2612. }
  2613. /*-----------------------------------------------------------*/
  2614. MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
  2615. {
  2616. size_t packetSize = 0U;
  2617. int32_t sendResult = 0;
  2618. MQTTStatus_t status = MQTTSuccess;
  2619. MQTTFixedBuffer_t localBuffer;
  2620. uint8_t disconnectPacket[ 2U ];
  2621. localBuffer.pBuffer = disconnectPacket;
  2622. localBuffer.size = 2U;
  2623. /* Validate arguments. */
  2624. if( pContext == NULL )
  2625. {
  2626. LogError( ( "pContext cannot be NULL." ) );
  2627. status = MQTTBadParameter;
  2628. }
  2629. if( status == MQTTSuccess )
  2630. {
  2631. /* Get MQTT DISCONNECT packet size. */
  2632. status = MQTT_GetDisconnectPacketSize( &packetSize );
  2633. LogDebug( ( "MQTT DISCONNECT packet size is %lu.",
  2634. ( unsigned long ) packetSize ) );
  2635. }
  2636. if( status == MQTTSuccess )
  2637. {
  2638. /* Serialize MQTT DISCONNECT packet. */
  2639. status = MQTT_SerializeDisconnect( &localBuffer );
  2640. }
  2641. if( status == MQTTSuccess )
  2642. {
  2643. /* Take the mutex because the below call should not be interrupted. */
  2644. MQTT_PRE_SEND_HOOK( pContext );
  2645. /* Here we do not use vectors as the disconnect packet has fixed fields
  2646. * which do not reside in user provided buffers. Thus, it can be sent
  2647. * using a simple send call. */
  2648. sendResult = sendBuffer( pContext,
  2649. localBuffer.pBuffer,
  2650. packetSize );
  2651. /* Give the mutex away. */
  2652. MQTT_POST_SEND_HOOK( pContext );
  2653. if( sendResult < ( int32_t ) packetSize )
  2654. {
  2655. LogError( ( "Transport send failed for DISCONNECT packet." ) );
  2656. status = MQTTSendFailed;
  2657. }
  2658. else
  2659. {
  2660. LogDebug( ( "Sent %ld bytes of DISCONNECT packet.",
  2661. ( long int ) sendResult ) );
  2662. }
  2663. }
  2664. if( status == MQTTSuccess )
  2665. {
  2666. LogInfo( ( "Disconnected from the broker." ) );
  2667. pContext->connectStatus = MQTTNotConnected;
  2668. /* Reset the index and clean the buffer on a successful disconnect. */
  2669. pContext->index = 0;
  2670. ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size );
  2671. }
  2672. return status;
  2673. }
  2674. /*-----------------------------------------------------------*/
  2675. MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext )
  2676. {
  2677. MQTTStatus_t status = MQTTBadParameter;
  2678. if( pContext == NULL )
  2679. {
  2680. LogError( ( "Invalid input parameter: MQTT Context cannot be NULL." ) );
  2681. }
  2682. else if( pContext->getTime == NULL )
  2683. {
  2684. LogError( ( "Invalid input parameter: MQTT Context must have valid getTime." ) );
  2685. }
  2686. else if( pContext->networkBuffer.pBuffer == NULL )
  2687. {
  2688. LogError( ( "Invalid input parameter: The MQTT context's networkBuffer must not be NULL." ) );
  2689. }
  2690. else
  2691. {
  2692. pContext->controlPacketSent = false;
  2693. status = receiveSingleIteration( pContext, true );
  2694. }
  2695. return status;
  2696. }
  2697. /*-----------------------------------------------------------*/
  2698. MQTTStatus_t MQTT_ReceiveLoop( MQTTContext_t * pContext )
  2699. {
  2700. MQTTStatus_t status = MQTTBadParameter;
  2701. if( pContext == NULL )
  2702. {
  2703. LogError( ( "Invalid input parameter: MQTT Context cannot be NULL." ) );
  2704. }
  2705. else if( pContext->getTime == NULL )
  2706. {
  2707. LogError( ( "Invalid input parameter: MQTT Context must have a valid getTime function." ) );
  2708. }
  2709. else if( pContext->networkBuffer.pBuffer == NULL )
  2710. {
  2711. LogError( ( "Invalid input parameter: MQTT context's networkBuffer must not be NULL." ) );
  2712. }
  2713. else
  2714. {
  2715. status = receiveSingleIteration( pContext, false );
  2716. }
  2717. return status;
  2718. }
  2719. /*-----------------------------------------------------------*/
  2720. uint16_t MQTT_GetPacketId( MQTTContext_t * pContext )
  2721. {
  2722. uint16_t packetId = 0U;
  2723. if( pContext != NULL )
  2724. {
  2725. MQTT_PRE_STATE_UPDATE_HOOK( pContext );
  2726. packetId = pContext->nextPacketId;
  2727. /* A packet ID of zero is not a valid packet ID. When the max ID
  2728. * is reached the next one should start at 1. */
  2729. if( pContext->nextPacketId == ( uint16_t ) UINT16_MAX )
  2730. {
  2731. pContext->nextPacketId = 1;
  2732. }
  2733. else
  2734. {
  2735. pContext->nextPacketId++;
  2736. }
  2737. MQTT_POST_STATE_UPDATE_HOOK( pContext );
  2738. }
  2739. return packetId;
  2740. }
  2741. /*-----------------------------------------------------------*/
  2742. MQTTStatus_t MQTT_MatchTopic( const char * pTopicName,
  2743. const uint16_t topicNameLength,
  2744. const char * pTopicFilter,
  2745. const uint16_t topicFilterLength,
  2746. bool * pIsMatch )
  2747. {
  2748. MQTTStatus_t status = MQTTSuccess;
  2749. bool topicFilterStartsWithWildcard = false;
  2750. bool matchStatus = false;
  2751. if( ( pTopicName == NULL ) || ( topicNameLength == 0u ) )
  2752. {
  2753. LogError( ( "Invalid paramater: Topic name should be non-NULL and its "
  2754. "length should be > 0: TopicName=%p, TopicNameLength=%hu",
  2755. ( void * ) pTopicName,
  2756. ( unsigned short ) topicNameLength ) );
  2757. status = MQTTBadParameter;
  2758. }
  2759. else if( ( pTopicFilter == NULL ) || ( topicFilterLength == 0u ) )
  2760. {
  2761. LogError( ( "Invalid paramater: Topic filter should be non-NULL and "
  2762. "its length should be > 0: TopicName=%p, TopicFilterLength=%hu",
  2763. ( void * ) pTopicFilter,
  2764. ( unsigned short ) topicFilterLength ) );
  2765. status = MQTTBadParameter;
  2766. }
  2767. else if( pIsMatch == NULL )
  2768. {
  2769. LogError( ( "Invalid paramater: Output parameter, pIsMatch, is NULL" ) );
  2770. status = MQTTBadParameter;
  2771. }
  2772. else
  2773. {
  2774. /* Check for an exact match if the incoming topic name and the registered
  2775. * topic filter length match. */
  2776. if( topicNameLength == topicFilterLength )
  2777. {
  2778. matchStatus = strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0;
  2779. }
  2780. if( matchStatus == false )
  2781. {
  2782. /* If an exact match was not found, match against wildcard characters in
  2783. * topic filter.*/
  2784. /* Determine if topic filter starts with a wildcard. */
  2785. topicFilterStartsWithWildcard = ( pTopicFilter[ 0 ] == '+' ) ||
  2786. ( pTopicFilter[ 0 ] == '#' );
  2787. /* Note: According to the MQTT 3.1.1 specification, incoming PUBLISH topic names
  2788. * starting with "$" character cannot be matched against topic filter starting with
  2789. * a wildcard, i.e. for example, "$SYS/sport" cannot be matched with "#" or
  2790. * "+/sport" topic filters. */
  2791. if( !( ( pTopicName[ 0 ] == '$' ) && ( topicFilterStartsWithWildcard == true ) ) )
  2792. {
  2793. matchStatus = matchTopicFilter( pTopicName, topicNameLength, pTopicFilter, topicFilterLength );
  2794. }
  2795. }
  2796. /* Update the output parameter with the match result. */
  2797. *pIsMatch = matchStatus;
  2798. }
  2799. return status;
  2800. }
  2801. /*-----------------------------------------------------------*/
  2802. MQTTStatus_t MQTT_GetSubAckStatusCodes( const MQTTPacketInfo_t * pSubackPacket,
  2803. uint8_t ** pPayloadStart,
  2804. size_t * pPayloadSize )
  2805. {
  2806. MQTTStatus_t status = MQTTSuccess;
  2807. if( pSubackPacket == NULL )
  2808. {
  2809. LogError( ( "Invalid parameter: pSubackPacket is NULL." ) );
  2810. status = MQTTBadParameter;
  2811. }
  2812. else if( pPayloadStart == NULL )
  2813. {
  2814. LogError( ( "Invalid parameter: pPayloadStart is NULL." ) );
  2815. status = MQTTBadParameter;
  2816. }
  2817. else if( pPayloadSize == NULL )
  2818. {
  2819. LogError( ( "Invalid parameter: pPayloadSize is NULL." ) );
  2820. status = MQTTBadParameter;
  2821. }
  2822. else if( pSubackPacket->type != MQTT_PACKET_TYPE_SUBACK )
  2823. {
  2824. LogError( ( "Invalid parameter: Input packet is not a SUBACK packet: "
  2825. "ExpectedType=%02x, InputType=%02x",
  2826. ( int ) MQTT_PACKET_TYPE_SUBACK,
  2827. ( int ) pSubackPacket->type ) );
  2828. status = MQTTBadParameter;
  2829. }
  2830. else if( pSubackPacket->pRemainingData == NULL )
  2831. {
  2832. LogError( ( "Invalid parameter: pSubackPacket->pRemainingData is NULL" ) );
  2833. status = MQTTBadParameter;
  2834. }
  2835. /* A SUBACK must have a remaining length of at least 3 to accommodate the
  2836. * packet identifier and at least 1 return code. */
  2837. else if( pSubackPacket->remainingLength < 3U )
  2838. {
  2839. LogError( ( "Invalid parameter: Packet remaining length is invalid: "
  2840. "Should be greater than 2 for SUBACK packet: InputRemainingLength=%lu",
  2841. ( unsigned long ) pSubackPacket->remainingLength ) );
  2842. status = MQTTBadParameter;
  2843. }
  2844. else
  2845. {
  2846. /* According to the MQTT 3.1.1 protocol specification, the "Remaining Length" field is a
  2847. * length of the variable header (2 bytes) plus the length of the payload.
  2848. * Therefore, we add 2 positions for the starting address of the payload, and
  2849. * subtract 2 bytes from the remaining length for the length of the payload.*/
  2850. *pPayloadStart = &pSubackPacket->pRemainingData[ sizeof( uint16_t ) ];
  2851. *pPayloadSize = pSubackPacket->remainingLength - sizeof( uint16_t );
  2852. }
  2853. return status;
  2854. }
  2855. /*-----------------------------------------------------------*/
  2856. const char * MQTT_Status_strerror( MQTTStatus_t status )
  2857. {
  2858. const char * str = NULL;
  2859. switch( status )
  2860. {
  2861. case MQTTSuccess:
  2862. str = "MQTTSuccess";
  2863. break;
  2864. case MQTTBadParameter:
  2865. str = "MQTTBadParameter";
  2866. break;
  2867. case MQTTNoMemory:
  2868. str = "MQTTNoMemory";
  2869. break;
  2870. case MQTTSendFailed:
  2871. str = "MQTTSendFailed";
  2872. break;
  2873. case MQTTRecvFailed:
  2874. str = "MQTTRecvFailed";
  2875. break;
  2876. case MQTTBadResponse:
  2877. str = "MQTTBadResponse";
  2878. break;
  2879. case MQTTServerRefused:
  2880. str = "MQTTServerRefused";
  2881. break;
  2882. case MQTTNoDataAvailable:
  2883. str = "MQTTNoDataAvailable";
  2884. break;
  2885. case MQTTIllegalState:
  2886. str = "MQTTIllegalState";
  2887. break;
  2888. case MQTTStateCollision:
  2889. str = "MQTTStateCollision";
  2890. break;
  2891. case MQTTKeepAliveTimeout:
  2892. str = "MQTTKeepAliveTimeout";
  2893. break;
  2894. case MQTTNeedMoreBytes:
  2895. str = "MQTTNeedMoreBytes";
  2896. break;
  2897. default:
  2898. str = "Invalid MQTT Status code";
  2899. break;
  2900. }
  2901. return str;
  2902. }
  2903. /*-----------------------------------------------------------*/