multithreading - goroutine consuming the same line more than once -


currently have scenario have huge file (for example im going 500k lines of text) , idea use worker (threads) process them 100 each thread. after running code, still wonder why goroutines consume same line more once? im guessing it's racing job done.

here's code

package main  import (      "log"      "bufio"      "fmt"      "encoding/csv"      "encoding/json"      "io"      "os"      "sync" )  type imdbdatamodel struct {      color                  string `json:"color"`      directorname           string `json:"director_name"`      numcriticforreviews    string `json:"num_critic_for_reviews"`      duration               string `json:"duration"`      directorfacebooklikes  string `json:"director_facebook_likes"`      actor3facebooklikes    string `json:"actor_3_facebook_likes"`      actor2name             string `json:"actor_2_name"`      actor1facebooklikes    string `json:"actor_1_facebook_likes"`      gross                  string `json:"gross"`      genre                  string `json:"genres"`      actor1name             string `json:"actor_1_name"`      movietitle             string `json:"movie_title"`      numvoteduser           string `json:"num_voted_users"`      casttotalfacebooklikes string `json:"cast_total_facebook_likes"`      actor3name             string `json:"actor_3_name"`      facenumberinposter     string `json:"facenumber_in_poster"`      plotkeywords           string `json:"plot_keywords"`      movieimdblink          string `json:"movie_imdb_link"`      numuserforreviews      string `json:"num_user_for_reviews"`      language               string `json:"language"`      country                string `json:"country"`      contentrating          string `json:"content_rating"`      budget                 string `json:"budget"`      titleyear              string `json:"title_year"`      actor2facebooklikes    string `json:"actor_2_facebook_likes"`      imdbscore              string `json:"imdb_score"`      aspectratio            string `json:"aspect_ratio"`      moviefacebooklikes     string `json:"movie_facebook_likes"` }  var iterated int64 var out []*imdbdatamodel  func populatestring(input []imdbdatamodel, out []*imdbdatamodel, wg *sync.waitgroup) {      _ , data := range input {                     out = append(out, &data)      }           wg.done() }  func consumedata(input <-chan *imdbdatamodel, wg *sync.waitgroup){      defer wg.done()      data := range input {                     iterated++                     fmt.printf("%d : %s\n", iterated, data.movietitle)           out = append(out, data)      }      fmt.println("output size : ", len(out))  }  func processcsv(path string) (imdblist []imdbdatamodel){      csvfile, _ := os.open(path)      reader := csv.newreader(bufio.newreader(csvfile))       {                     line, error := reader.read()           if error == io.eof {                break           } else if error != nil {                log.fatal(error)           }           imdblist = append(imdblist,                 imdbdatamodel{                     color: line[0],                     directorname: line[1],                     numcriticforreviews : line[2],                     duration: line[3],                     directorfacebooklikes: line[4],                     actor3facebooklikes: line[5],                     actor2name: line[6],                     actor1facebooklikes: line[7],                     gross: line[8],                     genre: line[9],                     actor1name: line[10],                     movietitle: line[11],                     numvoteduser: line[12],                     casttotalfacebooklikes: line[13],                     actor3name: line[14],                     facenumberinposter: line[15],                     plotkeywords: line[16],                     movieimdblink: line[17],                     numuserforreviews: line[18],                     language: line[19],                     country: line[20],                     contentrating: line[21],                     budget: line[22],                     titleyear: line[23],                     actor2facebooklikes: line[24],                     imdbscore: line[25],                     aspectratio: line[26],                     moviefacebooklikes: line[27],                },           )                }      imdbjson, err := json.marshal(imdblist)      if err != nil {           log.println(imdbjson)      }       return  }  func main() {           imdblist := processcsv("movie_metadata.csv")           imdbchannel  := make(chan *imdbdatamodel, 100) // buffer       var wg sync.waitgroup      := 0; < 5;i++ {           wg.add(1)           go consumedata(imdbchannel,&wg)           }       _ ,task := range imdblist {                     imdbchannel <- &task                     }       close(imdbchannel)           wg.wait()       // _, item := range out {      //      fmt.println(item.movietitle)      // }       fmt.println("total channel :", len(imdbchannel))       fmt.println("total imdb :", len(imdblist))      fmt.println("total data: ", len(out))      fmt.println("iterated : ", iterated)      fmt.println("goroutines finished..")   } 

edited: after few suggestions on adding mutex , channel, modified consume function

func consumedata(input <-chan *imdbdatamodel, output chan *imdbdatamodel, wg *sync.waitgroup) {     defer wg.done()     data := range input {         iterated++         // outlock.lock()         // out = append(out, data)         // outlock.unlock()         output <- data     } } 

however still consuming same line (race occured) more once.

.... date drew  date drew  date drew  date drew  date drew  total channel : 0 total imdb : 5044 total data:  4944 iterated :  5000 goroutines finished.. 

you issues with:

var out []*imdbdatamodel  func consumedata(input <-chan *imdbdatamodel, wg *sync.waitgroup){      defer wg.done()      data := range input {                     iterated++                     fmt.printf("%d : %s\n", iterated, data.movietitle)           out = append(out, data)      }      fmt.println("output size : ", len(out))  } 

you appending "out" multiple threads:

try adding lock around places write "out" this:

var out []*imdbdatamodel var outlock sync.mutex  func consumedata(input <-chan *imdbdatamodel, wg *sync.waitgroup){      defer wg.done()      data := range input {                     iterated++                     fmt.printf("%d : %s\n", iterated, data.movietitle)           outlock.lock()           out = append(out, &data)           outlock.unlock()      }      outlock.lock()      fmt.println("output size : ", len(out))      outlock.unlock()  } 

Comments

Popular posts from this blog

Is there a better way to structure post methods in Class Based Views -

performance - Why is XCHG reg, reg a 3 micro-op instruction on modern Intel architectures? -

c# - Asp.net web api : redirect unauthorized requst to forbidden page -