multithreading - goroutine consuming the same line more than once -
currently have scenario have huge file (for example im going 500k lines of text) , idea use worker (threads) process them 100 each thread. after running code, still wonder why goroutines consume same line more once? im guessing it's racing job done.
here's code
package main import ( "log" "bufio" "fmt" "encoding/csv" "encoding/json" "io" "os" "sync" ) type imdbdatamodel struct { color string `json:"color"` directorname string `json:"director_name"` numcriticforreviews string `json:"num_critic_for_reviews"` duration string `json:"duration"` directorfacebooklikes string `json:"director_facebook_likes"` actor3facebooklikes string `json:"actor_3_facebook_likes"` actor2name string `json:"actor_2_name"` actor1facebooklikes string `json:"actor_1_facebook_likes"` gross string `json:"gross"` genre string `json:"genres"` actor1name string `json:"actor_1_name"` movietitle string `json:"movie_title"` numvoteduser string `json:"num_voted_users"` casttotalfacebooklikes string `json:"cast_total_facebook_likes"` actor3name string `json:"actor_3_name"` facenumberinposter string `json:"facenumber_in_poster"` plotkeywords string `json:"plot_keywords"` movieimdblink string `json:"movie_imdb_link"` numuserforreviews string `json:"num_user_for_reviews"` language string `json:"language"` country string `json:"country"` contentrating string `json:"content_rating"` budget string `json:"budget"` titleyear string `json:"title_year"` actor2facebooklikes string `json:"actor_2_facebook_likes"` imdbscore string `json:"imdb_score"` aspectratio string `json:"aspect_ratio"` moviefacebooklikes string `json:"movie_facebook_likes"` } var iterated int64 var out []*imdbdatamodel func populatestring(input []imdbdatamodel, out []*imdbdatamodel, wg *sync.waitgroup) { _ , data := range input { out = append(out, &data) } wg.done() } func consumedata(input <-chan *imdbdatamodel, wg *sync.waitgroup){ defer wg.done() data := range input { iterated++ fmt.printf("%d : %s\n", iterated, data.movietitle) out = append(out, data) } fmt.println("output size : ", len(out)) } func processcsv(path string) (imdblist []imdbdatamodel){ csvfile, _ := os.open(path) reader := csv.newreader(bufio.newreader(csvfile)) { line, error := reader.read() if error == io.eof { break } else if error != nil { log.fatal(error) } imdblist = append(imdblist, imdbdatamodel{ color: line[0], directorname: line[1], numcriticforreviews : line[2], duration: line[3], directorfacebooklikes: line[4], actor3facebooklikes: line[5], actor2name: line[6], actor1facebooklikes: line[7], gross: line[8], genre: line[9], actor1name: line[10], movietitle: line[11], numvoteduser: line[12], casttotalfacebooklikes: line[13], actor3name: line[14], facenumberinposter: line[15], plotkeywords: line[16], movieimdblink: line[17], numuserforreviews: line[18], language: line[19], country: line[20], contentrating: line[21], budget: line[22], titleyear: line[23], actor2facebooklikes: line[24], imdbscore: line[25], aspectratio: line[26], moviefacebooklikes: line[27], }, ) } imdbjson, err := json.marshal(imdblist) if err != nil { log.println(imdbjson) } return } func main() { imdblist := processcsv("movie_metadata.csv") imdbchannel := make(chan *imdbdatamodel, 100) // buffer var wg sync.waitgroup := 0; < 5;i++ { wg.add(1) go consumedata(imdbchannel,&wg) } _ ,task := range imdblist { imdbchannel <- &task } close(imdbchannel) wg.wait() // _, item := range out { // fmt.println(item.movietitle) // } fmt.println("total channel :", len(imdbchannel)) fmt.println("total imdb :", len(imdblist)) fmt.println("total data: ", len(out)) fmt.println("iterated : ", iterated) fmt.println("goroutines finished..") }
edited: after few suggestions on adding mutex , channel, modified consume function
func consumedata(input <-chan *imdbdatamodel, output chan *imdbdatamodel, wg *sync.waitgroup) { defer wg.done() data := range input { iterated++ // outlock.lock() // out = append(out, data) // outlock.unlock() output <- data } }
however still consuming same line (race occured) more once.
.... date drew date drew date drew date drew date drew total channel : 0 total imdb : 5044 total data: 4944 iterated : 5000 goroutines finished..
you issues with:
var out []*imdbdatamodel func consumedata(input <-chan *imdbdatamodel, wg *sync.waitgroup){ defer wg.done() data := range input { iterated++ fmt.printf("%d : %s\n", iterated, data.movietitle) out = append(out, data) } fmt.println("output size : ", len(out)) }
you appending "out" multiple threads:
try adding lock around places write "out" this:
var out []*imdbdatamodel var outlock sync.mutex func consumedata(input <-chan *imdbdatamodel, wg *sync.waitgroup){ defer wg.done() data := range input { iterated++ fmt.printf("%d : %s\n", iterated, data.movietitle) outlock.lock() out = append(out, &data) outlock.unlock() } outlock.lock() fmt.println("output size : ", len(out)) outlock.unlock() }
Comments
Post a Comment